EventoDBEx

Elixir client for EventoDB - a simple, fast message store.

Installation

Add eventodb_ex to your list of dependencies in mix.exs:

def deps do
  [
    {:eventodb_ex, "~> 0.1.0"}
  ]
end

Quick Start

# Create a client
client = EventodbEx.Client.new("http://localhost:8080", token: "ns_...")

# Write a message
{:ok, result, client} = EventodbEx.stream_write(
  client,
  "account-123",
  %{type: "Deposited", data: %{amount: 100}}
)

IO.puts("Written at position: #{result.position}")

# Read messages from stream
{:ok, messages, client} = EventodbEx.stream_get(client, "account-123")

Enum.each(messages, fn [id, type, position, global_position, data, metadata, time] ->
  IO.puts("Message #{id}: #{type} at position #{position}")
end)

Features

Usage

Creating a Client

# With token
client = EventodbEx.Client.new("http://localhost:8080", token: "ns_...")

# Without token (will auto-capture in test mode)
client = EventodbEx.Client.new("http://localhost:8080")

Writing Messages

# Simple write
{:ok, result, client} = EventodbEx.stream_write(
  client,
  "account-123",
  %{
    type: "Deposited",
    data: %{amount: 100}
  }
)

# With metadata
{:ok, result, client} = EventodbEx.stream_write(
  client,
  "account-123",
  %{
    type: "Deposited",
    data: %{amount: 100},
    metadata: %{
      correlationStreamName: "workflow-456",
      causationMessageId: "msg-123"
    }
  }
)

# With optimistic locking
{:ok, result, client} = EventodbEx.stream_write(
  client,
  "account-123",
  %{type: "Deposited", data: %{amount: 100}},
  %{expected_version: 5}
)

Reading Messages

# Read all messages
{:ok, messages, client} = EventodbEx.stream_get(client, "account-123")

# Read from position
{:ok, messages, client} = EventodbEx.stream_get(
  client,
  "account-123",
  %{position: 10}
)

# Read with batch size
{:ok, messages, client} = EventodbEx.stream_get(
  client,
  "account-123",
  %{batch_size: 100}
)

# Pattern match on message structure
{:ok, messages, client} = EventodbEx.stream_get(client, "account-123")

Enum.each(messages, fn [id, type, pos, gpos, data, metadata, time] ->
  # Process message
end)

Last Message

# Get last message
{:ok, message, client} = EventodbEx.stream_last(client, "account-123")

# Get last message of specific type
{:ok, message, client} = EventodbEx.stream_last(
  client,
  "account-123",
  %{type: "Deposited"}
)

Stream Version

{:ok, version, client} = EventodbEx.stream_version(client, "account-123")
# version is 0-based, so version 5 means 6 messages (positions 0-5)

Category Operations

# Read all messages in a category
{:ok, messages, client} = EventodbEx.category_get(client, "account")

# With consumer group (for scaling)
{:ok, messages, client} = EventodbEx.category_get(
  client,
  "account",
  %{
    consumer_group: %{
      member: 0,  # This consumer's index
      size: 4     # Total number of consumers
    }
  }
)

# With correlation filter
{:ok, messages, client} = EventodbEx.category_get(
  client,
  "account",
  %{correlation: "workflow"}
)

# Category messages have 8 elements (includes streamName)
{:ok, [msg], client} = EventodbEx.category_get(client, "account")
[id, stream_name, type, pos, gpos, data, metadata, time] = msg

Namespace Management

# Create namespace
{:ok, result, client} = EventodbEx.namespace_create(
  client,
  "my-app",
  %{description: "My application namespace"}
)

# Use the token
client = EventodbEx.Client.set_token(client, result.token)

# List namespaces
{:ok, namespaces, client} = EventodbEx.namespace_list(client)

# Get namespace info
{:ok, info, client} = EventodbEx.namespace_info(client, "my-app")

# Delete namespace (⚠️ irreversible!)
{:ok, result, client} = EventodbEx.namespace_delete(client, "my-app")

System Operations

# Get server version
{:ok, version, client} = EventodbEx.system_version(client)

# Get health status
{:ok, health, client} = EventodbEx.system_health(client)

Error Handling

The SDK uses Elixir's standard {:ok, result} and {:error, error} tuples:

case EventodbEx.stream_write(client, stream, message, %{expected_version: 5}) do
  {:ok, result, client} ->
    # Success
    IO.puts("Written at position #{result.position}")
    
  {:error, %EventodbEx.Error{code: "STREAM_VERSION_CONFLICT"}} ->
    # Handle conflict
    IO.puts("Version conflict - retry")
    
  {:error, error} ->
    # Other error
    IO.puts("Error: #{error.message}")
end

Common error codes:

Testing

Tests run against a live EventoDB server. Each test creates its own namespace for isolation.

# Start EventoDB server
docker-compose up -d

# Run tests
mix test

# With custom server URL
EVENTODB_URL=http://localhost:8080 mix test

# Run specific test file
mix test test/write_test.exs

# Run with coverage
mix test --cover

Message Format

Stream Messages

Stream messages are 7-element lists:

[
  id,              # String - Message UUID
  type,            # String - Event type
  position,        # Integer - Stream position (0-based)
  global_position, # Integer - Global sequence number
  data,            # Map - Event payload
  metadata,        # Map or nil - Message metadata
  time             # String - ISO 8601 timestamp (UTC)
]

Category Messages

Category messages are 8-element lists (includes stream name):

[
  id,              # String - Message UUID
  stream_name,     # String - Full stream name
  type,            # String - Event type
  position,        # Integer - Stream position (0-based)
  global_position, # Integer - Global sequence number
  data,            # Map - Event payload
  metadata,        # Map or nil - Message metadata
  time             # String - ISO 8601 timestamp (UTC)
]

Advanced Patterns

Consumer Groups

Distribute category processing across multiple consumers:

# Consumer 1
{:ok, messages, client} = EventodbEx.category_get(
  client,
  "account",
  %{consumer_group: %{member: 0, size: 4}}
)

# Consumer 2
{:ok, messages, client} = EventodbEx.category_get(
  client,
  "account",
  %{consumer_group: %{member: 1, size: 4}}
)

# Each consumer gets a deterministic subset of streams

Optimistic Locking

Prevent concurrent write conflicts:

# Read current version
{:ok, version, client} = EventodbEx.stream_version(client, "account-123")

# Write with expected version
case EventodbEx.stream_write(
  client,
  "account-123",
  message,
  %{expected_version: version}
) do
  {:ok, result, client} -> {:ok, result, client}
  {:error, %{code: "STREAM_VERSION_CONFLICT"}} -> retry()
end

Correlation

Track related messages across streams:

# Write with correlation
{:ok, _, client} = EventodbEx.stream_write(
  client,
  "account-123",
  %{
    type: "Deposited",
    data: %{amount: 100},
    metadata: %{correlationStreamName: "workflow-456"}
  }
)

# Query by correlation
{:ok, messages, client} = EventodbEx.category_get(
  client,
  "account",
  %{correlation: "workflow"}
)

License

MIT - see LICENSE

Links