erl-esdb-gater

Hex.pmHexdocs.pm

Gateway for distributed access to erl-esdb event stores.

Gateway Architecture

Overview

erl-esdb-gater is an Erlang gateway service providing:

Installation

Community Edition (hex.pm)

Add to your rebar.config:

{deps, [
    {erl_esdb_gater, "0.6.0"}
]}.

Pure Erlang implementation - works everywhere, no native dependencies.

Enterprise Edition (optional NIF acceleration)

For NIF-accelerated performance (5-10x faster crypto operations), add the erl_esdb_nifs package:

{deps, [
    {erl_esdb_gater, "0.6.0"},
    {erl_esdb_nifs, {git, "git@github.com:macula-io/erl-esdb-nifs.git", {tag, "0.4.0"}}}
]}.

Requires Rust toolchain: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

The NIF provides accelerated Base58 encoding/decoding for DID operations and resource pattern matching. When unavailable, pure Erlang fallbacks are used automatically.

Quick Start

%% Start the application (typically started by erl-esdb)
application:ensure_all_started(erl_esdb_gater).

%% Append events to a stream
Events = [#{type => <<"user_created">>, data => #{name => <<"Alice">>}}],
{ok, Version} = esdb_gater_api:append_events(my_store, <<"users-123">>, Events).

%% Read events from a stream
{ok, EventList} = esdb_gater_api:stream_forward(my_store, <<"users-123">>, 0, 100).

%% Subscribe to PubSub channel
ok = esdb_channel_server:subscribe(esdb_channel_events, <<"user.*">>, self()).

%% Receive channel messages
receive
    {channel_message, esdb_channel_events, _Topic, Event} ->
        handle_event(Event)
end.

API Reference

Stream Operations

%% Append events to a stream
esdb_gater_api:append_events(StoreId, StreamId, Events) ->
    {ok, NewVersion} | {error, term()}.
esdb_gater_api:append_events(StoreId, StreamId, ExpectedVersion, Events) ->
    {ok, NewVersion} | {error, term()}.

%% Read events from a stream
esdb_gater_api:get_events(StoreId, StreamId, StartVersion, Count, Direction) ->
    {ok, [Event]} | {error, term()}.
esdb_gater_api:stream_forward(StoreId, StreamId, StartVersion, Count) ->
    {ok, [Event]} | {error, term()}.
esdb_gater_api:stream_backward(StoreId, StreamId, StartVersion, Count) ->
    {ok, [Event]} | {error, term()}.

%% Stream metadata
esdb_gater_api:get_version(StoreId, StreamId) -> {ok, Version} | {error, term()}.
esdb_gater_api:stream_exists(StoreId, StreamId) -> boolean().
esdb_gater_api:get_streams(StoreId) -> {ok, [StreamId]} | {error, term()}.

Subscription Operations

%% Create a subscription
esdb_gater_api:save_subscription(StoreId, Type, Selector, Name, StartFrom, Subscriber) ->
    ok | {error, term()}.

%% Remove a subscription
esdb_gater_api:remove_subscription(StoreId, Type, Selector, Name) ->
    ok | {error, term()}.

%% Acknowledge event processing
esdb_gater_api:ack_event(StoreId, StreamId, SubscriptionName, EventNumber) ->
    ok | {error, term()}.

%% List subscriptions
esdb_gater_api:get_subscriptions(StoreId) -> {ok, [Subscription]} | {error, term()}.

Snapshot Operations

%% Record a snapshot
esdb_gater_api:record_snapshot(StoreId, SourceUuid, StreamUuid, Version, Record) ->
    ok | {error, term()}.

%% Read a snapshot
esdb_gater_api:read_snapshot(StoreId, SourceUuid, StreamUuid, Version) ->
    {ok, Snapshot} | {error, term()}.

%% Delete a snapshot
esdb_gater_api:delete_snapshot(StoreId, SourceUuid, StreamUuid, Version) ->
    ok | {error, term()}.

%% List snapshots
esdb_gater_api:list_snapshots(StoreId, SourceUuid, StreamUuid) ->
    {ok, [Snapshot]} | {error, term()}.

Health

esdb_gater_api:health() -> healthy | {degraded, Reason} | {unhealthy, Reason}.
esdb_gater_api:quick_health_check(StoreId) -> ok | {error, term()}.

Temporal Queries

Query events by timestamp for point-in-time reconstruction. See Temporal Queries Guide.

%% Read events up to a timestamp
esdb_gater_api:read_until(StoreId, StreamId, Timestamp) ->
    {ok, [Event]} | {error, term()}.
esdb_gater_api:read_until(StoreId, StreamId, Timestamp, Opts) ->
    {ok, [Event]} | {error, term()}.

%% Read events in a time range
esdb_gater_api:read_range(StoreId, StreamId, FromTs, ToTs) ->
    {ok, [Event]} | {error, term()}.

%% Get stream version at a specific timestamp
esdb_gater_api:version_at(StoreId, StreamId, Timestamp) ->
    {ok, Version} | {error, term()}.

Scavenging

Remove old events beyond retention, optionally archive first. See Scavenging Guide.

%% Scavenge a stream (delete old events)
esdb_gater_api:scavenge(StoreId, StreamId, Opts) ->
    {ok, Result} | {error, term()}.

%% Scavenge streams matching a pattern
esdb_gater_api:scavenge_matching(StoreId, Pattern, Opts) ->
    {ok, [Result]} | {error, term()}.

%% Preview what would be deleted (dry run)
esdb_gater_api:scavenge_dry_run(StoreId, StreamId, Opts) ->
    {ok, Preview} | {error, term()}.

Causation Tracking

Track event lineage for debugging and auditing. See Causation Guide.

Causation Graph

%% Get events caused by an event
esdb_gater_api:get_effects(StoreId, EventId) ->
    {ok, [Event]} | {error, term()}.

%% Get the event that caused this one
esdb_gater_api:get_cause(StoreId, EventId) ->
    {ok, Event} | {error, not_found}.

%% Get full causation chain (root to this event)
esdb_gater_api:get_causation_chain(StoreId, EventId) ->
    {ok, [Event]} | {error, term()}.

%% Get all events with the same correlation ID
esdb_gater_api:get_correlated(StoreId, CorrelationId) ->
    {ok, [Event]} | {error, term()}.

%% Build causation graph for visualization
esdb_gater_api:build_causation_graph(StoreId, EventId) ->
    {ok, Graph} | {error, term()}.

Schema Operations

Schema registry with automatic upcasting. See Schema Evolution Guide.

Schema Upcasting

%% Register a schema
esdb_gater_api:register_schema(StoreId, EventType, Schema) -> ok.

%% Get schema for an event type
esdb_gater_api:get_schema(StoreId, EventType) ->
    {ok, Schema} | {error, not_found}.

%% List all schemas
esdb_gater_api:list_schemas(StoreId) -> {ok, [SchemaInfo]}.

%% Upcast events to current schema version
esdb_gater_api:upcast_events(StoreId, Events) ->
    {ok, UpcastedEvents} | {error, term()}.

%% Unregister a schema
esdb_gater_api:unregister_schema(StoreId, EventType) -> ok.

Memory Pressure

Adaptive behavior based on system memory. See Memory Pressure Guide.

%% Get current memory pressure level
esdb_gater_api:get_memory_level(StoreId) ->
    {ok, normal | elevated | critical}.

%% Get detailed memory statistics
esdb_gater_api:get_memory_stats(StoreId) ->
    {ok, #{used := bytes(), total := bytes(), level := atom()}}.

Stream Links

Create derived streams from source streams. See Stream Links Guide.

Stream Links

%% Create a new link (filter + transform)
esdb_gater_api:create_link(StoreId, LinkSpec) -> ok.

%% Delete a link
esdb_gater_api:delete_link(StoreId, LinkName) -> ok.

%% Get link by name
esdb_gater_api:get_link(StoreId, LinkName) ->
    {ok, LinkInfo} | {error, not_found}.

%% List all links
esdb_gater_api:list_links(StoreId) -> {ok, [LinkInfo]}.

%% Start/stop a link
esdb_gater_api:start_link(StoreId, LinkName) -> ok.
esdb_gater_api:stop_link(StoreId, LinkName) -> ok.

%% Get detailed link info
esdb_gater_api:link_info(StoreId, LinkName) ->
    {ok, #{status := atom(), events_processed := integer()}}.

Channels

%% Subscribe to a topic
esdb_channel_server:subscribe(ChannelName, Topic, Pid) -> ok.

%% Subscribe with capability token (for authorization)
esdb_channel_server:subscribe(ChannelName, Topic, Pid, CapabilityToken) ->
    ok | {error, {unauthorized, Reason}}.

%% Unsubscribe from a topic
esdb_channel_server:unsubscribe(ChannelName, Topic, Pid) -> ok.

%% Publish a message
esdb_channel_server:publish(ChannelName, Topic, Message) ->
    ok | {error, rate_limited | signature_required | invalid_signature}.

%% Publish with capability token (for authorization)
esdb_channel_server:publish(ChannelName, Topic, Message, CapabilityToken) ->
    ok | {error, {unauthorized, Reason}}.

Security

%% Sign a message with default secret
esdb_pubsub_security:sign(Message) -> SignedMessage.

%% Sign with custom secret
esdb_pubsub_security:sign(Message, Secret) -> SignedMessage.

%% Verify a signed message
esdb_pubsub_security:verify(SignedMessage) -> ok | {error, Reason}.

%% Set the default secret
esdb_pubsub_security:set_secret(Secret) -> ok.

Retry Configuration

%% Create custom retry config
Config = esdb_gater_retry:new_config(
    100,     %% base_delay_ms
    5000,    %% max_delay_ms
    5        %% max_attempts
),

%% Execute with custom retry
esdb_gater_api:execute(my_store, Fun, Config).

Channels

PubSub Channels

The gateway provides 10 dedicated PubSub channels:

Channel Priority Rate Limit HMAC Purpose
esdb_channel_alerts critical unlimited required Critical system alerts
esdb_channel_security critical unlimited required Security events
esdb_channel_events high unlimited optional Business events
esdb_channel_health high 100/sec optional Health checks
esdb_channel_system normal unlimited optional System notifications
esdb_channel_metrics normal 10000/sec optional Performance metrics
esdb_channel_audit normal unlimited optional Audit trail
esdb_channel_lifecycle normal unlimited optional Lifecycle events
esdb_channel_logging low 1000/sec optional Log messages
esdb_channel_diagnostics low 100/sec optional Diagnostic info

Channel Priorities

Architecture

Supervision Tree

Supervision Tree

Worker Registry Flow

Worker Registry Flow

Channel Message Flow

Channel Message Flow

Configuration

%% sys.config
[{erl_esdb_gater, [
    %% Cluster configuration
    {cluster, [
        {port, 45893},
        {multicast_addr, {239, 255, 0, 2}}
    ]},

    %% Retry defaults
    {retry, [
        {base_delay_ms, 100},
        {max_delay_ms, 30000},
        {max_attempts, 10}
    ]},

    %% Channel configuration
    {channels, [
        {esdb_channel_events, [
            {priority, high}
        ]},
        {esdb_channel_metrics, [
            {max_rate, 10000}
        ]}
    ]},

    %% Security
    {security, [
        {hmac_secret, <<"your_secret_here">>},
        {message_ttl_seconds, 300}
    ]},

    %% Telemetry
    {telemetry_handlers, [logger]}
]}].

Telemetry Events

Event Measurements Metadata
[esdb_gater, worker, registered] system_time store_id, node, pid
[esdb_gater, worker, unregistered] system_time store_id, pid
[esdb_gater, worker, lookup] duration store_id
[esdb_gater, request, start] system_time store_id, request_type
[esdb_gater, request, stop] duration store_id, request_type, result
[esdb_gater, request, error] duration store_id, request_type, reason
[esdb_gater, retry, attempt] delay_ms, attempt store_id, reason
[esdb_gater, retry, exhausted] total_attempts store_id, reason
[esdb_gater, cluster, node, up] system_time node, member_count
[esdb_gater, cluster, node, down] system_time node, member_count
[esdb_gater, channel, broadcast] recipient_count channel, topic

Attaching Handlers

%% Attach default logger handler
ok = esdb_gater_telemetry:attach_default_handler().

%% Attach custom handler
Handler = fun(Event, Measurements, Meta, Config) ->
    %% Your custom handling
    ok
end,
ok = esdb_gater_telemetry:attach(my_handler, Handler, #{}).

%% Detach handler
ok = esdb_gater_telemetry:detach(my_handler).

Building

rebar3 compile         # Compile
rebar3 eunit           # Unit tests (44 tests)
rebar3 ct              # Integration tests (8 tests)
rebar3 dialyzer        # Type checking

Testing

Test counts:

rebar3 eunit                                    # All unit tests
rebar3 ct --suite=esdb_channel_SUITE            # Channel tests

Run e2e tests from erl-esdb:

cd /path/to/erl-esdb
rebar3 ct --suite=test/e2e/erl_esdb_gater_e2e_SUITE

Integration with erl-esdb

erl-esdb-gater is designed to work with erl-esdb to provide load-balanced, distributed access to event stores.

Automatic Worker Registration

When both packages are deployed on the same nodes:

  1. erl-esdb gateway workers automatically register with erl-esdb-gater
  2. No manual registration is required
  3. Worker cleanup is automatic when nodes leave or workers crash

Automatic Worker Registration

Accessing the Event Store

Use the gateway API to access erl-esdb with automatic load balancing and retry:

%% Stream operations
{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, Events).
{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, ExpectedVersion, Events).
{ok, Events} = esdb_gater_api:stream_forward(my_store, StreamId, 0, 100).
{ok, Events} = esdb_gater_api:stream_backward(my_store, StreamId, 100, 50).
{ok, Version} = esdb_gater_api:get_version(my_store, StreamId).
true = esdb_gater_api:stream_exists(my_store, StreamId).

%% Subscription operations
ok = esdb_gater_api:save_subscription(my_store, stream, StreamId, <<"my_sub">>, 0, self()).
ok = esdb_gater_api:remove_subscription(my_store, stream, StreamId, <<"my_sub">>).
ok = esdb_gater_api:ack_event(my_store, StreamId, <<"my_sub">>, EventNumber).
{ok, Subs} = esdb_gater_api:get_subscriptions(my_store).

%% Snapshot operations
ok = esdb_gater_api:record_snapshot(my_store, SourceUuid, StreamUuid, Version, Record).
{ok, Snap} = esdb_gater_api:read_snapshot(my_store, SourceUuid, StreamUuid, Version).
ok = esdb_gater_api:delete_snapshot(my_store, SourceUuid, StreamUuid, Version).
{ok, Snaps} = esdb_gater_api:list_snapshots(my_store, SourceUuid, StreamUuid).

%% Health check
healthy = esdb_gater_api:health().
ok = esdb_gater_api:quick_health_check(my_store).

Deployment

erl-esdb includes erl-esdb-gater as a dependency. Starting erl-esdb automatically starts the gateway:

%% Start erl-esdb (includes gater)
application:ensure_all_started(erl_esdb).

%% Gateway workers auto-register with the pg-based registry
%% Use the gater API for all operations
{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, Events).

In a multi-node cluster, each node runs erl-esdb with its gateway worker. The pg-based registry provides:

Shared Types

erl-esdb-gater provides shared type definitions used across the ecosystem. Include them in your modules:

-include_lib("erl_esdb_gater/include/esdb_gater_types.hrl").

Records

Record Purpose
#event{} Event with type, data (Erlang term), and metadata
#snapshot{} Aggregate snapshot at a specific version
#subscription{} Subscription state and configuration
#append_result{} Result of an append operation

Version Constants

Constant Value Purpose
?NO_STREAM -1 Stream must not exist (first write)
?ANY_VERSION -2 No version check, always append
?STREAM_EXISTS -4 Stream must exist

See the Shared Types Guide for detailed usage.

Related Projects

Ecosystem

Project Description
erl-esdb Core event store built on Khepri/Ra
erl-evoq CQRS/Event Sourcing framework
erl-evoq-esdb Adapter connecting erl-evoq to erl-esdb

License

Apache-2.0