S2 Elixir SDK

Elixir client for the S2 durable stream API. Protobuf data plane (append, read, check tail) over Mint HTTP/2 + JSON control plane (basins, streams, access tokens, metrics) over Req.

Installation

def deps do
  [
    {:s2_client, "~> 1.0"}
  ]
end

Example: Chat App

A chat app where each room gets its own stream. Messages are Ecto embedded schemas, durably ordered, and listeners tail the stream in real time.

alias MyApp.Chat
alias MyApp.Chat.Message

# Create rooms (each becomes its own S2 stream)
Chat.create_room("general")
Chat.create_room("random")

# Append typed messages
Chat.append("general", Message.new(user: "alice", text: "hey everyone!"))
Chat.append("general", Message.new(user: "bob", text: "hi alice!"))
Chat.append("random", Message.new(user: "alice", text: "anyone here?"))

# Listen to a room — tails the stream, calling your function for each message
{:ok, listener} = Chat.listen("general", fn %Message{} = msg ->
  IO.puts("[#{msg.ts}] #{msg.user}: #{msg.text}")
end)
# [2026-02-25T14:30:00Z] alice: hey everyone!
# [2026-02-25T14:30:01Z] bob: hi alice!
# ... stays open, prints new messages as they arrive

# Stop listening when done
Chat.stop_listener(listener)

# Listen for only new messages (skip history)
Chat.listen("general", fn msg -> IO.inspect(msg) end, from: :tail)

Here's the implementation. S2.Store manages connections, serialization, and session lifecycle — like Ecto.Repo for streams. It handles chunking, framing, and deduplication automatically (see Patterns), so you just work with your own types.

# config/config.exs
config :my_app, MyApp.S2,
  base_url: "https://aws.s2.dev",
  token: System.get_env("S2_TOKEN"),
  max_retries: :infinity, # reconnection attempts before giving up (default: :infinity)
  base_delay: 500,      # base delay in ms for exponential backoff (default: 500)
  max_queue_size: 1000   # pending appends per stream before {:error, :overloaded} (default: 1000)
# lib/my_app/s2.ex
defmodule MyApp.S2 do
  use S2.Store,
    otp_app: :my_app,
    basin: "my-app"
end
# lib/my_app/chat/message.ex
defmodule MyApp.Chat.Message do
  use Ecto.Schema
  import Ecto.Changeset

  @derive Jason.Encoder
  @primary_key false
  embedded_schema do
    field :user, :string
    field :text, :string
    field :ts, :utc_datetime
  end

  def new(attrs) do
    %__MODULE__{ts: DateTime.utc_now()}
    |> changeset(Map.new(attrs))
    |> apply_action!(:new)
  end

  def changeset(message \\ %__MODULE__{}, attrs) do
    message
    |> cast(attrs, [:user, :text, :ts])
    |> validate_required([:user, :text])
  end

  def serializer do
    %{
      serialize: &Jason.encode!/1,
      deserialize: fn json ->
        attrs = Jason.decode!(json)
        %__MODULE__{} |> cast(attrs, [:user, :text, :ts]) |> apply_action!(:load)
      end
    }
  end
end
# lib/my_app/application.ex
def start(_type, _args) do
  children = [
    MyApp.S2
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end
# lib/my_app/chat.ex
defmodule MyApp.Chat do
  use MyApp.S2, serializer: MyApp.Chat.Message.serializer()

  def create_room(room), do: create_stream("chat/#{room}")
end

Control Plane

All control plane functions take an opts keyword list with server: client (and basin: "name" where required).

First, create a client:

config = S2.Config.new(base_url: "https://aws.s2.dev", token: "my-token")
client = S2.Client.new(config)

Basins

{:ok, basins}  = S2.Basins.list_basins(server: client)
{:ok, basin}   = S2.Basins.create_basin(%S2.CreateBasinRequest{basin: "my-basin"}, server: client)
{:ok, config}  = S2.Basins.get_basin_config("my-basin", server: client)
:ok            = S2.Basins.delete_basin("my-basin", server: client)

Streams

{:ok, streams} = S2.Streams.list_streams(server: client, basin: "my-basin")
{:ok, stream}  = S2.Streams.create_stream(%S2.CreateStreamRequest{stream: "my-stream"}, server: client, basin: "my-basin")
{:ok, config}  = S2.Streams.get_stream_config("my-stream", server: client, basin: "my-basin")
:ok            = S2.Streams.delete_stream("my-stream", server: client, basin: "my-basin")

Access Tokens

Issue scoped, expiring tokens for clients. Useful for giving a browser read-only access to a specific stream (e.g. for real-time updates over SSE or a WebSocket bridge).

# Issue a read-only token scoped to a single basin
{:ok, resp} = S2.AccessTokens.issue_access_token(
  %S2.AccessTokenInfo{
    expires_at: DateTime.add(DateTime.utc_now(), 3600, :second),
    scope: %S2.AccessTokenScope{
      basins: %{"my-basin" => %{}},
      op_groups: %S2.PermittedOperationGroups{
        stream: %S2.ReadWritePermissions{read: true, write: false}
      }
    }
  },
  server: client
)

# resp.access_token is the bearer token string — send it to the client
# The client can then connect to the S2 data plane directly to read streams

{:ok, tokens} = S2.AccessTokens.list_access_tokens(server: client)
:ok           = S2.AccessTokens.revoke_access_token("token-id", server: client)

You can also scope tokens to specific operations via the ops field (e.g. ["read", "check-tail"]) or to specific streams via the streams field.

Metrics

{:ok, metrics} = S2.Metrics.account_metrics(server: client)
{:ok, metrics} = S2.Metrics.basin_metrics("my-basin", server: client)
{:ok, metrics} = S2.Metrics.stream_metrics("my-basin", "my-stream", server: client)

Data Plane

Data plane operations use S2S-framed protobuf over a persistent Mint HTTP/2 connection. All calls return an updated conn for connection reuse.

Single Request

{:ok, conn} = S2.S2S.Connection.open("https://aws.s2.dev")

# Append records
input = %S2.V1.AppendInput{records: [%S2.V1.AppendRecord{body: "hello"}]}
{:ok, ack, conn} = S2.S2S.Append.call(conn, "my-basin", "my-stream", input)

# Read records
{:ok, batch, conn} = S2.S2S.Read.call(conn, "my-basin", "my-stream", seq_num: 0)

# Check tail position
{:ok, position, conn} = S2.S2S.CheckTail.call(conn, "my-basin", "my-stream")

Streaming Append

{:ok, session} = S2.S2S.AppendSession.open(conn, "my-basin", "my-stream")

input = %S2.V1.AppendInput{records: [%S2.V1.AppendRecord{body: "event-1"}]}
{:ok, ack, session} = S2.S2S.AppendSession.append(session, input)

{:ok, session} = S2.S2S.AppendSession.close(session)

Streaming Read

{:ok, session} = S2.S2S.ReadSession.open(conn, "my-basin", "my-stream", seq_num: 0)

{:ok, batch, session} = S2.S2S.ReadSession.next_batch(session)
# batch.records contains the records

{:ok, session} = S2.S2S.ReadSession.close(session)

Read options: :seq_num, :count, :wait, :until, :clamp, :tail_offset.

Process Affinity

Streaming sessions are not safe to share across processes. The underlying Mint connection delivers TCP messages to the owning process's mailbox. Create and use sessions within the same process.

Patterns

S2.Store automatically handles all of this for you. You only need this section if you're using the data plane directly.

Under the hood, every append and listen call runs through a pipeline that handles chunking, framing, deduplication, and serialization — mirroring the TypeScript SDK patterns.

Step Write side Read side
1 Serialize term to binary Filter duplicate records
2 Chunk binary into sub-1 MiB pieces Reassemble chunks into complete message
3 Frame chunks with reassembly headers Deserialize binary back to term
4 Stamp with writer ID + dedupe sequence

If you're working with the data plane directly, you can use the patterns modules yourself:

alias S2.Patterns.Serialization

serializer = %{serialize: &Jason.encode!/1, deserialize: &Jason.decode!/1}

# Writing — serialize, chunk, frame, and stamp for dedup
writer = Serialization.writer()
{input, writer} = Serialization.prepare(writer, %{"event" => "signup"}, serializer)
{:ok, ack, conn} = S2.S2S.Append.call(conn, "my-basin", "my-stream", input)

# Reading — dedup, reassemble, and deserialize
reader = Serialization.reader()
{:ok, batch, conn} = S2.S2S.Read.call(conn, "my-basin", "my-stream", seq_num: 0)
{messages, reader} = Serialization.decode(reader, batch.records, serializer)

Architecture

How S2.Store works

When you call MyApp.S2.append("chat/general", message), here's what happens:

MyApp.S2 (Supervisor)
├── Registry          — maps stream names to worker pids
├── DynamicSupervisor — owns stream workers
│   ├── StreamWorker("chat/general")  — own connection + open AppendSession
│   ├── StreamWorker("chat/random")   — own connection + open AppendSession
│   └── ...started lazily on first append
├── ControlPlane      — shared JSON client for create/delete stream
└── listener Tasks    — each spawned with own connection + ReadSession

Protocol layers

Layer Transport Encoding Library
S2.Store Managed Managed
Control plane (basins, streams, tokens, metrics) HTTP/1.1 or 2 JSON Req
Data plane (append, read, check tail) HTTP/2 S2S-framed Protobuf Mint

S2.Store is the recommended way to use the SDK. The control and data plane modules below it are available if you need lower-level access.

Guarantees

From S2 (the server)

From this SDK

Testing

98.8% test coverage (with Toxiproxy network fault tests). The remaining uncovered lines are exhaustive pattern match arms that can't be triggered — see test/test_helper.exs for details.

Coverage threshold is set to 95% — CI fails if coverage drops below that.

License

MIT