EventodbKit

Production-ready Elixir SDK for EventoDB with built-in resilience patterns.

EventodbKit sits on top of the lightweight EventodbEx SDK and provides:

Installation

Add eventodb_kit to your list of dependencies in mix.exs:

def deps do
  [
    {:eventodb_kit, "~> 0.1.0"}
  ]
end

Database Setup

EventodbKit requires database tables for outbox, position tracking, and idempotency.

Step 1: Add Migration

Create a migration in your application:

mix ecto.gen.migration add_eventodb_kit_tables

Step 2: Use EventodbKit.Migration

defmodule MyApp.Repo.Migrations.AddEventodbKitTables do
  use Ecto.Migration

  def up do
    EventodbKit.Migration.up(version: 1)
  end

  def down do
    EventodbKit.Migration.down(version: 1)
  end
end

Step 3: Run Migration

mix ecto.migrate

Quick Start

Type-Safe Events with Code Generation

For the best developer experience, use with generated event schemas:

# 1. Define your event registry (using generated schemas)
defmodule MyApp.Events do
  use EventodbKit.EventDispatcher
  
  register "UserCreated", MyApp.Events.UserCreated
  register "OrderPlaced", MyApp.Events.OrderPlaced
end

# 2. Use in consumers with automatic validation
defmodule MyApp.MyConsumer do
  use EventodbKit.Consumer
  
  def handle_message(message, state) do
    MyApp.Events.dispatch(message["type"], message["data"], &handle_event/2)
  end
  
  defp handle_event(MyApp.Events.UserCreated, event) do
    # event is validated struct with all fields
    IO.puts("User: #{event.user_id}")
    :ok
  end
end

See the repository's CODEGEN_API.md for complete code generation integration guide.

Usage

Create Client

kit = EventodbKit.Client.new(
  "http://localhost:8080",
  token: "ns_abc123",
  repo: MyApp.Repo
)

Write Operations (Outbox Pattern)

Writes go to local database first, then are sent in the background:

# Write to outbox
{:ok, outbox_id, kit} = EventodbKit.stream_write(
  kit,
  "account-123",
  %{type: "Deposited", data: %{amount: 100}}
)

# Write with idempotency key (prevents duplicates)
{:ok, outbox_id, kit} = EventodbKit.stream_write(
  kit,
  "payment-456",
  %{
    type: "PaymentRequested",
    data: %{
      amount: 1000,
      idempotency_key: "payment_user123_invoice456"
    }
  }
)

# Transactional write with business logic
MyApp.Repo.transaction(fn ->
  lead = MyApp.Repo.insert!(%Lead{email: "test@example.com"})
  
  {:ok, _outbox_id, _kit} = EventodbKit.stream_write(
    kit,
    "partnership-#{lead.id}",
    %{type: "LeadCreated", data: %{lead_id: lead.id}}
  )
  
  lead
end)

Read Operations

Read operations delegate directly to EventodbEx:

# Stream operations
{:ok, messages, kit} = EventodbKit.stream_get(kit, "account-123")
{:ok, message, kit} = EventodbKit.stream_last(kit, "account-123")
{:ok, version, kit} = EventodbKit.stream_version(kit, "account-123")

# Category operations
{:ok, messages, kit} = EventodbKit.category_get(kit, "account", %{
  position: 0,
  batch_size: 100
})

Outbox Sender (Background Worker)

Add to your application supervisor:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.Repo,
      
      # Outbox sender - sends unsent messages in background
      {EventodbKit.OutboxSender, [
        namespace: "analytics",
        base_url: "http://localhost:8080",
        token: System.get_env("ANALYTICS_TOKEN"),
        repo: MyApp.Repo,
        poll_interval: 1_000,  # Poll every second
        batch_size: 100
      ]}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

Consumer Pattern

Create a consumer to process events from a category:

defmodule MyApp.PartnershipConsumer do
  use EventodbKit.Consumer
  require Logger
  
  def start_link(opts) do
    EventodbKit.Consumer.start_link(__MODULE__, opts)
  end
  
  @impl EventodbKit.Consumer
  def init(opts) do
    {:ok, %{
      namespace: Keyword.fetch!(opts, :namespace),
      category: "partnership",
      consumer_id: "singleton",
      base_url: "http://localhost:8080",
      token: Keyword.fetch!(opts, :token),
      repo: MyApp.Repo,
      poll_interval: 1_000,
      batch_size: 100
    }}
  end
  
  @impl EventodbKit.Consumer
  def handle_message(message, state) do
    case message["type"] do
      "PartnershipApplicationSubmitted" ->
        %MyApp.Lead{
          email: message["data"]["email"],
          school_name: message["data"]["school_name"]
        }
        |> MyApp.Repo.insert!()
        :ok
        
      _ ->
        Logger.warn("Unknown event type: #{message["type"]}")
        :ok
    end
  end
end

# Add to supervisor
{MyApp.PartnershipConsumer, [
  namespace: "analytics",
  token: System.get_env("ANALYTICS_TOKEN")
]}

Consumer Groups

For higher throughput, use consumer groups to partition processing:

defmodule MyApp.PartnershipConsumerGroup do
  use EventodbKit.Consumer
  
  @impl EventodbKit.Consumer
  def init(opts) do
    {:ok, %{
      namespace: Keyword.fetch!(opts, :namespace),
      category: "partnership",
      consumer_id: "member-#{Keyword.fetch!(opts, :group_member)}",
      group_member: Keyword.fetch!(opts, :group_member),
      group_size: Keyword.fetch!(opts, :group_size),
      base_url: "http://localhost:8080",
      token: Keyword.fetch!(opts, :token),
      repo: MyApp.Repo,
      poll_interval: 1_000,
      batch_size: 100
    }}
  end
  
  @impl EventodbKit.Consumer
  def handle_message(message, _state) do
    # Process message
    :ok
  end
end

# In supervisor - start 3 members
children = [
  {MyApp.PartnershipConsumerGroup, [
    namespace: "analytics",
    token: token,
    group_member: 0,
    group_size: 3
  ]},
  {MyApp.PartnershipConsumerGroup, [
    namespace: "analytics",
    token: token,
    group_member: 1,
    group_size: 3
  ]},
  {MyApp.PartnershipConsumerGroup, [
    namespace: "analytics",
    token: token,
    group_member: 2,
    group_size: 3
  ]}
]

Features

Outbox Pattern

Consumer Position Tracking

Idempotency

Resilience

Database Tables

EventodbKit creates three tables:

evento_outbox

Stores events before they're sent to EventoDB:

evento_consumer_positions

Tracks consumer positions:

evento_processed_events

Tracks processed events for idempotency:

Configuration

General Options

Option Default Description
:log_sqlfalse Enable SQL query logging for debugging
# config/config.exs
config :eventodb_kit, :log_sql, false  # default, quiet mode

# config/dev.exs (for debugging)
config :eventodb_kit, :log_sql, true

Outbox Sender Options

Consumer Options

Testing

# Run tests
cd clients/eventodb_kit
mix test

# Or use the test runner script
bin/run_elixir_kit_specs.sh

License

MIT - see LICENSE

Links