EventoDBEx
Elixir client for EventoDB - a simple, fast message store.
Installation
Add eventodb_ex to your list of dependencies in mix.exs:
def deps do
[
{:eventodb_ex, "~> 0.1.0"}
]
endQuick Start
# Create a client
client = EventodbEx.Client.new("http://localhost:8080", token: "ns_...")
# Write a message
{:ok, result, client} = EventodbEx.stream_write(
client,
"account-123",
%{type: "Deposited", data: %{amount: 100}}
)
IO.puts("Written at position: #{result.position}")
# Read messages from stream
{:ok, messages, client} = EventodbEx.stream_get(client, "account-123")
Enum.each(messages, fn [id, type, position, global_position, data, metadata, time] ->
IO.puts("Message #{id}: #{type} at position #{position}")
end)Features
- ✅ Stream operations (write, read, last, version)
- ✅ Category operations with consumer groups and correlation
- ✅ Namespace management
- ✅ System health and version
- ✅ Optimistic locking with expected version
-
✅ Minimal dependencies (just
reqandjason) - ✅ Idiomatic Elixir patterns with pattern matching
- ✅ Comprehensive test suite
Usage
Creating a Client
# With token
client = EventodbEx.Client.new("http://localhost:8080", token: "ns_...")
# Without token (will auto-capture in test mode)
client = EventodbEx.Client.new("http://localhost:8080")Writing Messages
# Simple write
{:ok, result, client} = EventodbEx.stream_write(
client,
"account-123",
%{
type: "Deposited",
data: %{amount: 100}
}
)
# With metadata
{:ok, result, client} = EventodbEx.stream_write(
client,
"account-123",
%{
type: "Deposited",
data: %{amount: 100},
metadata: %{
correlationStreamName: "workflow-456",
causationMessageId: "msg-123"
}
}
)
# With optimistic locking
{:ok, result, client} = EventodbEx.stream_write(
client,
"account-123",
%{type: "Deposited", data: %{amount: 100}},
%{expected_version: 5}
)Reading Messages
# Read all messages
{:ok, messages, client} = EventodbEx.stream_get(client, "account-123")
# Read from position
{:ok, messages, client} = EventodbEx.stream_get(
client,
"account-123",
%{position: 10}
)
# Read with batch size
{:ok, messages, client} = EventodbEx.stream_get(
client,
"account-123",
%{batch_size: 100}
)
# Pattern match on message structure
{:ok, messages, client} = EventodbEx.stream_get(client, "account-123")
Enum.each(messages, fn [id, type, pos, gpos, data, metadata, time] ->
# Process message
end)Last Message
# Get last message
{:ok, message, client} = EventodbEx.stream_last(client, "account-123")
# Get last message of specific type
{:ok, message, client} = EventodbEx.stream_last(
client,
"account-123",
%{type: "Deposited"}
)Stream Version
{:ok, version, client} = EventodbEx.stream_version(client, "account-123")
# version is 0-based, so version 5 means 6 messages (positions 0-5)Category Operations
# Read all messages in a category
{:ok, messages, client} = EventodbEx.category_get(client, "account")
# With consumer group (for scaling)
{:ok, messages, client} = EventodbEx.category_get(
client,
"account",
%{
consumer_group: %{
member: 0, # This consumer's index
size: 4 # Total number of consumers
}
}
)
# With correlation filter
{:ok, messages, client} = EventodbEx.category_get(
client,
"account",
%{correlation: "workflow"}
)
# Category messages have 8 elements (includes streamName)
{:ok, [msg], client} = EventodbEx.category_get(client, "account")
[id, stream_name, type, pos, gpos, data, metadata, time] = msgNamespace Management
# Create namespace
{:ok, result, client} = EventodbEx.namespace_create(
client,
"my-app",
%{description: "My application namespace"}
)
# Use the token
client = EventodbEx.Client.set_token(client, result.token)
# List namespaces
{:ok, namespaces, client} = EventodbEx.namespace_list(client)
# Get namespace info
{:ok, info, client} = EventodbEx.namespace_info(client, "my-app")
# Delete namespace (⚠️ irreversible!)
{:ok, result, client} = EventodbEx.namespace_delete(client, "my-app")System Operations
# Get server version
{:ok, version, client} = EventodbEx.system_version(client)
# Get health status
{:ok, health, client} = EventodbEx.system_health(client)Error Handling
The SDK uses Elixir's standard {:ok, result} and {:error, error} tuples:
case EventodbEx.stream_write(client, stream, message, %{expected_version: 5}) do
{:ok, result, client} ->
# Success
IO.puts("Written at position #{result.position}")
{:error, %EventodbEx.Error{code: "STREAM_VERSION_CONFLICT"}} ->
# Handle conflict
IO.puts("Version conflict - retry")
{:error, error} ->
# Other error
IO.puts("Error: #{error.message}")
endCommon error codes:
AUTH_REQUIRED- No authentication tokenAUTH_INVALID- Invalid tokenSTREAM_VERSION_CONFLICT- Optimistic locking conflictNAMESPACE_EXISTS- Namespace already existsNAMESPACE_NOT_FOUND- Namespace doesn't existNETWORK_ERROR- Connection or network issue
Testing
Tests run against a live EventoDB server. Each test creates its own namespace for isolation.
# Start EventoDB server
docker-compose up -d
# Run tests
mix test
# With custom server URL
EVENTODB_URL=http://localhost:8080 mix test
# Run specific test file
mix test test/write_test.exs
# Run with coverage
mix test --coverMessage Format
Stream Messages
Stream messages are 7-element lists:
[
id, # String - Message UUID
type, # String - Event type
position, # Integer - Stream position (0-based)
global_position, # Integer - Global sequence number
data, # Map - Event payload
metadata, # Map or nil - Message metadata
time # String - ISO 8601 timestamp (UTC)
]Category Messages
Category messages are 8-element lists (includes stream name):
[
id, # String - Message UUID
stream_name, # String - Full stream name
type, # String - Event type
position, # Integer - Stream position (0-based)
global_position, # Integer - Global sequence number
data, # Map - Event payload
metadata, # Map or nil - Message metadata
time # String - ISO 8601 timestamp (UTC)
]Advanced Patterns
Consumer Groups
Distribute category processing across multiple consumers:
# Consumer 1
{:ok, messages, client} = EventodbEx.category_get(
client,
"account",
%{consumer_group: %{member: 0, size: 4}}
)
# Consumer 2
{:ok, messages, client} = EventodbEx.category_get(
client,
"account",
%{consumer_group: %{member: 1, size: 4}}
)
# Each consumer gets a deterministic subset of streamsOptimistic Locking
Prevent concurrent write conflicts:
# Read current version
{:ok, version, client} = EventodbEx.stream_version(client, "account-123")
# Write with expected version
case EventodbEx.stream_write(
client,
"account-123",
message,
%{expected_version: version}
) do
{:ok, result, client} -> {:ok, result, client}
{:error, %{code: "STREAM_VERSION_CONFLICT"}} -> retry()
endCorrelation
Track related messages across streams:
# Write with correlation
{:ok, _, client} = EventodbEx.stream_write(
client,
"account-123",
%{
type: "Deposited",
data: %{amount: 100},
metadata: %{correlationStreamName: "workflow-456"}
}
)
# Query by correlation
{:ok, messages, client} = EventodbEx.category_get(
client,
"account",
%{correlation: "workflow"}
)License
MIT - see LICENSE