AWSEventStream

Pure-Elixir codec for the AWS vnd.amazon.eventstream binary protocol.

General-purpose and symmetric: encodes and decodes all header types, validates CRC32 checksums on both the prelude and the whole message, and surfaces every frame error explicitly instead of silently dropping bad data. The design mirrors aws-sdk-go-v2/aws/protocol/eventstream and botocore — same wire format, same header-type set, same signed big-endian numerics.

The core codec is payload-agnostic (payloads are opaque bytes). An optional JSON convenience layer classifies frames by :message-type and unwraps Bedrock-style JSON payloads.

Installation

Add aws_event_stream to your mix.exs:

{:aws_event_stream, "~> 0.1"}

The jason dependency is only needed if you use AWSEventStream.JSON. Add it explicitly if so:

{:aws_event_stream, "~> 0.1"},
{:jason, "~> 1.4"},

Core encode / decode

alias AWSEventStream.{Header, Message}
# Build a message with a string header and a binary payload.
msg = %Message{
headers: [
%Header{name: ":message-type", type: :string, value: "event"},
%Header{name: ":event-type", type: :string, value: "contentBlockDelta"}
],
payload: ~s({"delta":{"text":"hello"}})
}
# Encode to a complete event-stream frame (returns iodata).
frame = IO.iodata_to_binary(AWSEventStream.encode(msg))
# Decode a buffer — returns {results, leftover_bytes}.
# Feed leftover bytes into the next call when streaming chunks.
{[{:ok, ^msg}], <<>>} = AWSEventStream.decode(frame)

decode/1,2 is incremental: call it on each incoming chunk, prepend the returned rest to the next chunk, and repeat.

# Streaming example: simulate two chunks split across a frame boundary.
{part1, part2} = :erlang.split_binary(frame, 10)
{[], rest1} = AWSEventStream.decode(part1)
{[{:ok, msg}], <<>>} = AWSEventStream.decode(rest1 <> part2)

Error handling

Corrupt frames are returned as {:error, {reason, raw_bytes}} rather than raised — your stream is never interrupted:

{[{:error, {:invalid_message_crc, _raw}}], <<>>} = AWSEventStream.decode(corrupt_bytes)
# Pass on_error: :skip to silently drop corrupt frames.
{[], <<>>} = AWSEventStream.decode(corrupt_bytes, on_error: :skip)

Possible reason atoms: :invalid_prelude_crc, :invalid_message_crc, :invalid_message_length.

JSON layer (AWSEventStream.JSON)

Bedrock and other AWS streaming APIs attach a :message-type header ("event" / "exception" / "error") to every frame. Without explicit classification, a consumer receiving unexpected data cannot tell whether it got a server-side throttling exception or a normal event that happens to contain a map with a field named "error". AWSEventStream.JSON.decode/2 resolves this: it classifies each frame before inspecting the payload, so the tagged tuple is always authoritative.

alias AWSEventStream.JSON
# Decode a buffer of raw event-stream bytes.
{classified, rest} = JSON.decode(buffer)
# Each result is one of:
# {:event, event_type :: String.t() | nil, payload :: map()}
# {:exception, exception_type :: String.t() | nil, payload :: map()}
# {:error, error_code :: String.t() | nil, message :: String.t() | nil}
# {:malformed_payload, %Message{}, reason}
# {:malformed_frame, reason :: atom(), raw_bytes :: binary()}
for result <- classified do
case result do
{:event, type, payload} ->
IO.puts("event #{type}: #{inspect(payload)}")
{:exception, type, payload} ->
# Server signalled an exception — distinct from a normal event.
IO.puts("exception #{type}: #{inspect(payload)}")
{:error, code, message} ->
# Wire-level error (e.g. InternalFailure) — also distinct from {:event, ...}.
IO.puts("error #{code}: #{message}")
{:malformed_payload, _msg, reason} ->
IO.puts("payload could not be decoded: #{inspect(reason)}")
{:malformed_frame, reason, _raw} ->
IO.puts("corrupt frame: #{inspect(reason)}")
end
end

Disambiguating server-side early closes

AWS streaming services terminate abnormal streams with an exception or error frame rather than just closing the connection. Without explicit :message-type classification a consumer cannot distinguish:

The JSON layer makes this distinction at the decode step so application code can pattern-match cleanly instead of introspecting raw headers. A frame's classification is fixed by its headers, so an exception/error frame is always surfaced as {:exception, …} / {:error, …} — even when its body is non-JSON or empty (a server-side close may legitimately send one). For an exception frame whose body isn't a JSON object the raw bytes are preserved under "raw", e.g. {:exception, "ValidationException", %{"raw" => "..."}}. Only normal event frames degrade to {:malformed_payload, …} on an undecodable payload.

Bedrock payloads are automatically unwrapped: if the outer JSON is {"bytes": "<base64>"}, the inner bytes are base64-decoded and the nested JSON object is returned as the payload map.

Frame format

All numbers are big-endian. CRCs use Erlang's :erlang.crc32/1.

0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| total_length (u32) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| headers_length (u32) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| prelude_crc (u32) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| headers (variable) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| payload (variable) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message_crc (u32) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Header format

Each header is: name_len (u8) + name (UTF-8) + type_byte (u8) + value.

Type atomType byteWire value encoding
:bool0 or 1no value bytes; 0 = true, 1 = false
:byte21 byte, signed
:short32 bytes, signed big-endian
:integer44 bytes, signed big-endian
:long58 bytes, signed big-endian
:bytes6len (u16) + raw bytes
:string7len (u16) + UTF-8 bytes
:timestamp88 bytes, signed big-endian milliseconds since epoch
:uuid916 raw bytes

Public modules

ModulePurpose
AWSEventStreamFacade: encode/1, decode/1, decode/2
AWSEventStream.EncoderWire frame serialisation
AWSEventStream.DecoderIncremental frame deserialisation with CRC validation
AWSEventStream.Message%Message{headers, payload} struct + header/2
AWSEventStream.Header%Header{name, type, value} struct + wire codec
AWSEventStream.JSONOptional JSON layer: classify + unwrap Bedrock frames

Status / Non-goals

Status:0.1.0 — codec is complete and tested. 26 tests covering all header types, encoder, decoder, round-trip property test, chunk-boundary splitting, JSON classification, and golden vectors captured from a live Bedrock stream.

Non-goals for this library:

Future work: