Aether

Elixir library for the AT Protocol (ATProto)

Aether provides clean, idiomatic Elixir abstractions for building applications on the AT Protocol, the decentralized social networking protocol that powers Bluesky.

Features

Installation

Add aether to your list of dependencies in mix.exs:

def deps do
  [
    {:aether, "~> 0.1.0"}
  ]
end

Quick Start

Making XRPC Calls (Client)

Call ATProto services from your application:

# Resolve a handle to a DID
{:ok, did} = Aether.Handle.resolve_handle("https://bsky.social", "fullstack.ing")

# Get a record
{:ok, record} = Aether.Repo.get_record("https://bsky.social",  "at://#{did}/app.bsky.feed.post/3jwdwj2ctlk26")

# Create a post (requires authentication)
{:ok, result} = Aether.Repo.create_record(
  "https://bsky.social",
  "did:plc:abc123",
  "app.bsky.feed.post",
  %{
    "text" => "Hello from Elixir!",
    "createdAt" => DateTime.utc_now() |> DateTime.to_iso8601()
  },
  jwt: my_jwt_token
)

Exposing XRPC Endpoints (Router)

Mount ATProto endpoints in your Phoenix application:

# In your Phoenix router (lib/my_app_web/router.ex)
defmodule MyAppWeb.Router do
  use Phoenix.Router
  import Aether.XRPC.Router

  pipeline :api do
    plug :accepts, ["json"]
  end

  scope "/api" do
    pipe_through :api
    xrpc_routes "/xrpc", handler: MyApp.XRPCHandler
  end
end

Then implement your XRPC handler:

# lib/my_app/xrpc_handler.ex
defmodule MyApp.XRPCHandler do
  @behaviour Aether.XRPC.Handler

  @impl true
  def handle_xrpc(nsid, params, opts) do
    # Verify authentication
    with {:ok, user} <- authenticate(opts[:jwt]) do
      handle_authenticated_request(nsid, params, user)
    end
  end

  defp handle_authenticated_request("com.atproto.repo.getRecord", params, _user) do
    # Fetch record from your database
    case MyApp.Records.get(params["repo"], params["collection"], params["rkey"]) do
      {:ok, record} ->
        {:ok, %{
          uri: "at://#{params["repo"]}/#{params["collection"]}/#{params["rkey"]}",
          cid: record.cid,
          value: record.data
        }}

      {:error, :not_found} ->
        {:error, :not_found}
    end
  end

  defp handle_authenticated_request("com.atproto.repo.createRecord", params, user) do
    # Validate user owns the repo
    if params["repo"] == user.did do
      # Validate with lexicon
      case validate_record(params["collection"], params["record"]) do
        :ok ->
          # Store in your database
          {:ok, record} = MyApp.Records.create(params)
          {:ok, %{uri: record.uri, cid: record.cid}}

        {:error, reason} ->
          {:error, {:invalid_request, "Validation failed: #{inspect(reason)}"}}
      end
    else
      {:error, :forbidden}
    end
  end

  defp handle_authenticated_request(_nsid, _params, _user) do
    {:error, :method_not_implemented}
  end

  defp authenticate(nil), do: {:error, :auth_required}
  defp authenticate(jwt) do
    # Use Joken or your auth library
    case MyApp.Auth.verify_jwt(jwt) do
      {:ok, claims} -> {:ok, %{did: claims["sub"]}}
      {:error, _} -> {:error, :auth_required}
    end
  end

  defp validate_record(collection, record) do
    # Load lexicon and validate
    case Aether.Lexicon.load_schema_map(get_schema(collection)) do
      {:ok, lexicon} -> Aether.Lexicon.validate(lexicon, record)
      {:error, _} -> :ok  # Skip validation if schema not found
    end
  end

  defp get_schema("app.bsky.feed.post") do
    %{
      "type" => "object",
      "required" => ["text", "createdAt"],
      "properties" => %{
        "text" => %{"type" => "string", "maxLength" => 300},
        "createdAt" => %{"type" => "string"}
      }
    }
  end
  defp get_schema(_), do: %{"type" => "unknown"}
end

Core Concepts

Identities

# DIDs (Decentralized Identifiers)
{:ok, did} = Aether.DID.parse_did("did:plc:figync3rbx3pfr2w2duxlq4a")
true = Aether.DID.valid_did?("did:plc:figync3rbx3pfr2w2duxlq4a")

# Handles
{:ok, did} = Aether.Handle.resolve_handle("https://bsky.social", "fullstack.ing")
true = Aether.Handle.valid_handle?("fullstack.ing")

# Handle verification (bidirectional)
{:ok, :verified} = Aether.Handle.verify_handle("alice.example.com", "did:web:alice.example.com")

# Check handle availability
{:ok, false} = Aether.Handle.available?("alice.bsky.social")

# Verify DNS configuration
{:ok, :verified} = Aether.Handle.verify_dns("alice.example.com", "did:web:alice.example.com")

# Verify HTTPS configuration
{:ok, :verified} = Aether.Handle.verify_https("alice.example.com", "did:web:alice.example.com")

DID Documents

# Resolve a DID to its document
{:ok, doc} = Aether.DID.Document.resolve("did:plc:z72i7hdynmk6n5znkwn5hbxx")

# Extract PDS endpoint
{:ok, pds_url} = Aether.DID.Document.get_pds_endpoint(doc)
#=> "https://bsky.social"

# Get the user&#39;s handle
handle = Aether.DID.Document.get_handle(doc)
#=> "bsky.app"

# Get any service by type
{:ok, service} = Aether.DID.Document.get_service(doc, "AtprotoPersonalDataServer")
service.serviceEndpoint #=> "https://bsky.social"

# Get signing key
{:ok, key} = Aether.DID.Document.get_signing_key(doc)
key["publicKeyMultibase"] #=> "zQ3sh..."

# Resolve did:web
{:ok, doc} = Aether.DID.Document.resolve("did:web:example.com")

Serving DID Documents

For did:web support in your PDS, serve DID documents at /.well-known/did.json:

# In your Phoenix endpoint or router
defmodule MyPDSWeb.Endpoint do
  use Phoenix.Endpoint, otp_app: :my_pds

  # Serve DID documents for users on your PDS
  plug Aether.DID.DocumentPlug,
    resolver: &MyPDS.Identity.get_did_document/1

  plug MyPDSWeb.Router
end

# Implement resolver function
defmodule MyPDS.Identity do
  alias Aether.DID.Document

  def get_did_document(hostname) do
    case MyPDS.Repo.get_by(User, hostname: hostname) do
      %User{} = user ->
        doc = Document.create(
          user.did,
          handle: user.handle,
          pds_endpoint: "https://#{user.hostname}",
          signing_key: user.signing_key
        )
        {:ok, doc}

      nil ->
        {:error, :not_found}
    end
  end
end

# Create DID documents for new users
doc = Aether.DID.Document.create_web(
  "alice.example.com",
  signing_key: "zQ3shZc2QzApp2oymGvQbzP8eKheVshBHbU4ZYjeXqwSKEn2N",
  service_endpoint: "https://alice.example.com",
  also_known_as: ["at://alice.example.com"]
)

# Add custom service endpoints
doc = doc
|> Aether.DID.Document.add_service(
  id: "#atproto_labeler",
  type: "AtprotoLabeler",
  endpoint: "https://labeler.example.com"
)

# Rotate signing keys
doc = Aether.DID.Document.update_signing_key(doc, new_key)

See examples/did_document_serving.ex for comprehensive examples including multi-tenant setups, caching, and production deployment.

AT URIs

# Parse AT URIs
{:ok, uri} = Aether.AtUri.parse_at_uri(
  "at://did:plc:abc123/app.bsky.feed.post/3jwdwj2ctlk26"
)

uri.authority  #=> "did:plc:abc123"
uri.collection #=> "app.bsky.feed.post"
uri.rkey       #=> "3jwdwj2ctlk26"

# Create AT URIs
uri = %Aether.AtUri{
  authority: "did:plc:abc123",
  collection: "app.bsky.feed.post",
  rkey: "3jwdwj2ctlk26"
}
Aether.AtUri.at_uri_to_string(uri)
#=> "at://did:plc:abc123/app.bsky.feed.post/3jwdwj2ctlk26"

NSIDs (Namespace Identifiers)

# Parse NSIDs
{:ok, nsid} = Aether.NSID.parse_nsid("com.atproto.repo.createRecord")
nsid.authority #=> "com.atproto.repo"
nsid.name      #=> "createRecord"

# Validate NSIDs
true = Aether.NSID.valid_nsid?("app.bsky.feed.post")

TIDs (Timestamp Identifiers)

Generate sortable, unique identifiers for record keys and revisions:

# Generate a new TID
tid = Aether.TID.new()
#=> "3jzfcijpj2z2a"

# Validate TID format
true = Aether.TID.valid_tid?(tid)

# Parse timestamp from TID
{:ok, timestamp_us} = Aether.TID.parse_timestamp(tid)

# TIDs from specific timestamps
tid = Aether.TID.from_timestamp(1_700_000_000_000_000)

# Compare TIDs chronologically
:lt = Aether.TID.compare(tid1, tid2)

# Use as record keys (automatically sorted)
{:ok, repo, _} = Aether.Repository.put_record(
  repo,
  "app.bsky.feed.post",
  Aether.TID.new(),
  %{"text" => "Hello!"}
)

# TIDs are lexicographically sortable
tids = [tid3, tid1, tid2]
sorted_tids = Enum.sort(tids)  # Chronological order!

See examples/tid.ex for comprehensive examples including pagination, commit revisions, and production patterns.

Repository & Block Store (Server-Side)

For building a PDS, manage records locally with MST storage and content-addressed block storage:

# Create a block store for persistent storage
{:ok, store} = Aether.BlockStore.start_link()

# Create a repository with block store
{:ok, repo} = Aether.Repository.create("did:plc:user123", block_store: store)

# Add a record (automatically stored in block store)
{:ok, repo, record_cid} = Aether.Repository.put_record(
  repo,
  "app.bsky.feed.post",
  "abc123",
  %{
    "text" => "Hello, ATProto!",
    "createdAt" => "2024-01-01T00:00:00Z"
  }
)

# Retrieve record data (from block store)
{:ok, post_data} = Aether.Repository.get_record(repo, "app.bsky.feed.post", "abc123")

# List records in a collection
records = Aether.Repository.list_records(repo, "app.bsky.feed.post")
#=> [{"abc123", cid}]

# Delete a record
{:ok, repo} = Aether.Repository.delete_record(repo, "app.bsky.feed.post", "abc123")

# Generate a commit
operations = [{:create, "app.bsky.feed.post/abc123", record_cid}]
{:ok, commit} = Aether.Repository.commit(repo, operations)

# Block store operations
true = Aether.BlockStore.has?(store, record_cid)
stats = Aether.BlockStore.stat(store)
IO.puts("Blocks: #{stats.block_count}, Size: #{stats.total_size}")

# Multiple repos can share one block store (like a real PDS)
{:ok, repo2} = Aether.Repository.create("did:plc:bob", block_store: store)

See examples/repository_and_mst.ex and examples/block_store.ex for comprehensive examples including GenServer integration, multi-tenant PDS, batch operations, and production considerations.

CAR Files (Content Archives)

Export and import repositories using CAR files for backup, migration, and sync:

# Export repository to CAR file
{:ok, car} = Aether.Repository.export_car(repo)
{:ok, car_binary} = Aether.CAR.encode(car)
File.write!("backup.car", car_binary)

# Import repository from CAR file
car_binary = File.read!("backup.car")
{:ok, car} = Aether.CAR.decode(car_binary)
{:ok, repo} = Aether.Repository.import_car("did:plc:user123", car, block_store: store)

# Work with CAR blocks directly
{:ok, block} = Aether.CAR.get_block(car, cid)
blocks = Aether.CAR.list_blocks(car)

# Manual CAR creation
blocks = [
  %Aether.CAR.Block{cid: cid1, data: data1},
  %Aether.CAR.Block{cid: cid2, data: data2}
]

car = %Aether.CAR{
  version: 1,
  roots: [commit_cid],
  blocks: blocks
}

See examples/car_files.ex for comprehensive examples including backups, migrations, incremental backups, and validation.

Blob Storage (Images, Videos, Files)

Store and serve binary content like images and videos:

# Start blob storage
{:ok, storage} = Aether.Blob.MemoryStorage.start_link()

# Upload an image
image_data = File.read!("photo.jpg")
{:ok, blob} = Aether.Blob.upload(image_data, "image/jpeg", Aether.Blob.MemoryStorage, storage: storage)

# Blob reference for records
blob.ref       #=> CID of the blob
blob.mime_type #=> "image/jpeg"
blob.size      #=> 52428

# Download a blob
{:ok, blob, data} = Aether.Blob.download(cid, Aether.Blob.MemoryStorage, storage: storage)

# Create post with image
post_data = %{
  "text" => "Check this out!",
  "embed" => %{
    "$type" => "app.bsky.embed.images",
    "images" => [
      %{
        "alt" => "Photo description",
        "image" => Aether.Blob.to_map(blob)
      }
    ]
  }
}

# Store in repository
{:ok, repo, _cid} = Aether.Repository.put_record(
  repo,
  "app.bsky.feed.post",
  Aether.TID.new(),
  post_data
)

# Implement custom storage backend
defmodule MyApp.S3BlobStorage do
  @behaviour Aether.Blob.Storage

  @impl true
  def put_blob(_storage, blob, data, _opts) do
    # Upload to S3
    ExAws.S3.put_object("my-bucket", blob_key(blob.ref), data)
    |> ExAws.request!()

    {:ok, blob.ref}
  end

  @impl true
  def get_blob(_storage, cid, _opts) do
    # Download from S3
    case ExAws.S3.get_object("my-bucket", blob_key(cid)) |> ExAws.request() do
      {:ok, %{body: data}} -> {:ok, data}
      {:error, _} -> {:error, :not_found}
    end
  end

  defp blob_key(cid), do: "blobs/#{Aether.CID.cid_to_string(cid)}"
end

See examples/blob_storage.ex for comprehensive examples including file uploads, post images, custom storage backends, size limits, and garbage collection.

Records

# Create a record struct
record = %Aether.Record{
  type: "app.bsky.feed.post",
  data: %{
    "text" => "Hello, ATProto!",
    "createdAt" => "2024-01-15T12:00:00Z"
  }
}

# Convert to/from maps
map = Aether.Record.to_map(record)
#=> %{"$type" => "app.bsky.feed.post", "text" => "Hello, ATProto!", ...}

{:ok, record} = Aether.Record.from_map(map)

Lexicon (Schema Validation)

# Define a schema
schema = %{
  "type" => "object",
  "required" => ["text", "createdAt"],
  "properties" => %{
    "text" => %{"type" => "string", "maxLength" => 300},
    "createdAt" => %{"type" => "string"}
  }
}

{:ok, lexicon} = Aether.Lexicon.load_schema_map(schema)

# Validate data
post = %{
  "text" => "Hello!",
  "createdAt" => "2024-01-15T12:00:00Z"
}

{:ok, ^post} = Aether.Lexicon.validate(lexicon, post)

# Invalid data
{:error, errors} = Aether.Lexicon.validate(lexicon, %{"text" => "x" * 400})
#=> {:error, [%{path: ["text"], message: "string length 400 exceeds maximum 300"}]}

MST (Merkle Search Tree)

# Create a new MST
mst = %Aether.MST{}

# Add records (key-value pairs)
{:ok, post_cid} = Aether.CID.parse_cid("bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454")
{:ok, mst} = Aether.MST.add(mst, "app.bsky.feed.post/abc123", post_cid)
{:ok, mst} = Aether.MST.add(mst, "app.bsky.feed.post/xyz789", post_cid)

# Get a record
{:ok, cid} = Aether.MST.get(mst, "app.bsky.feed.post/abc123")

# List all entries in sorted order
entries = Aether.MST.list(mst)
#=> [{"app.bsky.feed.post/abc123", cid}, {"app.bsky.feed.post/xyz789", cid}]

# Update a record
{:ok, new_cid} = Aether.CID.parse_cid("bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily")
{:ok, mst} = Aether.MST.add(mst, "app.bsky.feed.post/abc123", new_cid)

# Delete a record
{:ok, mst} = Aether.MST.delete(mst, "app.bsky.feed.post/abc123")

# Use with storage backend
{:ok, store} = Aether.MST.MemoryStore.start_link()
{:ok, node_cid} = Aether.MST.MemoryStore.put_node(store, mst)
{:ok, retrieved_mst} = Aether.MST.MemoryStore.get_node(store, node_cid)

Commits and TIDs

# Generate a TID (Timestamp Identifier)
tid = Aether.TID.new()
#=> "3jzfcijpj2z2a"

# TIDs are sortable and unique
Aether.TID.valid_tid?(tid)
#=> true

# Create a commit
{:ok, mst_cid} = Aether.CID.parse_cid("bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454")
commit = Aether.Commit.create("did:plc:abc123", mst_cid)

# Sign a commit with your signing function
signing_fn = fn bytes ->
  # Use your crypto library (e.g., :crypto, ex_crypto)
  :crypto.sign(:eddsa, :sha512, bytes, private_key)
end

{:ok, signed_commit} = Aether.Commit.sign(commit, signing_fn)

# Verify a commit
verify_fn = fn bytes, sig ->
  :crypto.verify(:eddsa, :sha512, bytes, sig, public_key)
end

{:ok, true} = Aether.Commit.verify(signed_commit, verify_fn)

# Create a commit chain
{:ok, commit1_cid} = calculate_commit_cid(signed_commit)
commit2 = Aether.Commit.create_next("did:plc:abc123", new_mst_cid, commit1_cid)

CAR Files (Content Addressable aRchives)

# Create blocks for export
{:ok, commit_cid} = Aether.CID.parse_cid("bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454")
{:ok, mst_cid} = Aether.CID.parse_cid("bafyreibvjvcv745gig4mvqs4hctx4zfkono4rjejm2ta6gtyzkqxfjeily")

blocks = [
  %Aether.CAR.Block{cid: commit_cid, data: commit_data},
  %Aether.CAR.Block{cid: mst_cid, data: mst_data}
]

# Create CAR with commit as root
car = %Aether.CAR{
  roots: [commit_cid],
  blocks: blocks
}

# Encode to binary
{:ok, binary} = Aether.CAR.encode(car)
File.write!("repo.car", binary)

# Decode from binary
binary = File.read!("repo.car")
{:ok, car} = Aether.CAR.decode(binary)

# Get blocks by CID
{:ok, block} = Aether.CAR.get_block(car, commit_cid)

# List all blocks
blocks = Aether.CAR.list_blocks(car)

Events & Federation (Firehose)

Build a self-hosted PDS or consume events from the network:

Broadcasting Events (PDS Server)

# In your application.ex supervision tree
def start(_type, _args) do
  children = [
    MyPDSWeb.Endpoint,
    {Phoenix.PubSub, name: MyPDS.PubSub},

    # Add the event broadcaster
    {Aether.Events.Broadcaster,
     pubsub: MyPDS.PubSub,
     topic: "firehose",
     name: MyPDS.EventBroadcaster}
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end

# In your XRPC handler after creating a record
def handle_xrpc("com.atproto.repo.createRecord", params, opts) do
  # ... create record, update MST, create commit ...

  # Broadcast the commit event to all subscribers
  Aether.Events.Broadcaster.broadcast_commit(
    params["repo"],
    commit,
    [%{action: :create, path: "#{collection}/#{rkey}", cid: record_cid}],
    car_blocks
  )

  {:ok, %{uri: uri, cid: cid}}
end

# Serve WebSocket firehose endpoint
defmodule MyPDSWeb.FirehoseChannel do
  use Phoenix.Channel

  def join("firehose", _params, socket) do
    Aether.Events.Broadcaster.subscribe()
    {:ok, socket}
  end

  def handle_info({:firehose_event, frame}, socket) do
    push(socket, "event", %{data: Base.encode64(frame)})
    {:noreply, socket}
  end
end

Consuming Events (AppView / Feed Generator)

# In your application.ex supervision tree
def start(_type, _args) do
  children = [
    MyAppViewWeb.Endpoint,
    MyAppView.Repo,

    # Subscribe to a PDS firehose
    {Aether.Events.Consumer,
     pds_url: "wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos",
     name: MyAppView.FirehoseConsumer},

    # Your event processor
    {MyAppView.EventProcessor,
     subscribe_to: [{MyAppView.FirehoseConsumer, max_demand: 100}]}
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end

# Process events with GenStage
defmodule MyAppView.EventProcessor do
  use GenStage

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(opts) do
    subscribe_to = Keyword.fetch!(opts, :subscribe_to)
    {:consumer, %{}, subscribe_to: subscribe_to}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      handle_event(event)
    end

    {:noreply, [], state}
  end

  # Handle commit events (new/updated/deleted records)
  defp handle_event({:commit, event}) do
    # Decode CAR blocks to get record data
    {:ok, {_roots, blocks}} = Aether.Events.decode_commit_blocks(event.blocks)

    for op <- event.ops do
      case op.action do
        :create -> index_record(event.repo, op.path, op.cid, blocks)
        :update -> update_record(event.repo, op.path, op.cid, blocks)
        :delete -> delete_record(event.repo, op.path)
      end
    end
  end

  # Handle other event types
  defp handle_event({:handle, event}) do
    update_handle(event.did, event.handle)
  end

  defp handle_event({:identity, event}) do
    refresh_did_document(event.did)
  end

  defp handle_event({:account, event}) do
    update_account_status(event.did, event.active)
  end

  defp handle_event({:tombstone, event}) do
    delete_all_records(event.did)
  end

  # Your indexing logic...
end

See guides/events_and_federation.md for complete examples.

Architecture

Aether is designed as a library, providing both pure functions and supervised processes that your application controls:

Your Application Owns:

Aether Provides:

This design lets you integrate ATProto into your application architecture however you prefer, while providing production-ready processes for federation.

Authentication Example

Aether doesn't handle authentication - your application does. Here's a typical pattern using Joken:

# In your deps
{:joken, "~> 2.6"}

# Create a session (login)
defmodule MyApp.Auth do
  use Joken.Config

  def create_session(identifier, password) do
    # Verify credentials against your database
    case MyApp.Accounts.authenticate(identifier, password) do
      {:ok, user} ->
        # Generate JWT
        claims = %{
          "sub" => user.did,
          "exp" => DateTime.utc_now() |> DateTime.add(24 * 3600) |> DateTime.to_unix()
        }

        token = generate_and_sign!(claims, signer())

        # Call PDS to create session if needed
        {:ok, %{access_jwt: token, did: user.did}}

      {:error, _} ->
        {:error, :invalid_credentials}
    end
  end

  def verify_jwt(token) do
    verify_and_validate(token, signer())
  end

  defp signer do
    Joken.Signer.create("HS256", "your-secret-key")
  end
end

# In your XRPC handler
def handle_xrpc(nsid, params, opts) do
  case MyApp.Auth.verify_jwt(opts[:jwt]) do
    {:ok, claims} ->
      handle_with_auth(nsid, params, claims)

    {:error, _} ->
      {:error, :auth_required}
  end
end

Testing

Aether includes comprehensive tests. Run them with:

# All tests
mix test

# Exclude integration tests (require network)
mix test --exclude integration

Documentation

Generate docs with:

mix docs

Then open doc/index.html in your browser.

Contributing

Contributions welcome! Please follow these guidelines:

  1. One module at a time - complete implementation, tests, and docs
  2. Follow Elixir best practices - pattern matching, with statements
  3. Test-driven development - write tests first
  4. 100% test coverage for new code
  5. Format with mix format

License

[Add your license here]

Resources

Acknowledgments

Built with ❤️ for the ATProto and Elixir communities.