ZenWebsocket
A robust WebSocket client library for Elixir, built on Gun transport for production-grade reliability. Designed for financial APIs with automatic reconnection, comprehensive error handling, and real-world testing.
Features
- Gun Transport - Battle-tested HTTP/2 and WebSocket client
- Automatic Reconnection - Exponential backoff with state preservation
- Financial-Grade Reliability - Built for high-frequency trading systems
- Platform Adapters - Ready-to-use Deribit integration, extensible for others
- Real API Testing - No mocks, tested against live systems
- Simple API - Only 5 public functions to learn
- Comprehensive Error Handling - Categorized errors with recovery strategies
- Rate Limiting - Configurable token bucket algorithm
- JSON-RPC 2.0 - Full protocol support with correlation tracking
- Pool Load Balancing - Health-based routing with automatic failover
- Session Recording - JSONL message recording for debugging and replay
- Test Utilities - Consumer-facing test helpers with mock server
Installation
Add zen_websocket to your dependencies in mix.exs:
def deps do
[
{:zen_websocket, "~> 0.1"}
]
endQuick Start
Basic Connection
# Connect to a WebSocket endpoint (use your actual endpoint)
{:ok, client} = ZenWebsocket.Client.connect("wss://api.example.com/ws", [
timeout: 5000,
heartbeat_interval: 30000
])
# Send a message
{:ok, _} = ZenWebsocket.Client.send_message(client, %{action: "ping"})
# Receive messages in your process
receive do
{:websocket_message, message} ->
# Process the incoming message
handle_message(message)
after
5_000 -> {:error, :timeout}
end
# Close the connection
:ok = ZenWebsocket.Client.close(client)Subscription Management
ZenWebsocket tracks channel subscriptions and automatically restores them after reconnection:
# Subscribe to channels
{:ok, client} = ZenWebsocket.Client.connect("wss://api.example.com/ws")
:ok = ZenWebsocket.Client.subscribe(client, ["ticker.BTC", "trades.BTC"])
# Subscriptions are automatically tracked when confirmations arrive
# On reconnect, tracked subscriptions are restored automaticallyWhat the library handles:
-
Tracking confirmed subscriptions via
SubscriptionManager -
Automatic restoration after reconnection (when
restore_subscriptions: true, the default) - Building restore messages in the correct format
What your client needs to handle:
- Processing subscription data messages (sent to your handler callback)
- Unsubscription logic (call your API's unsubscribe method, then the library removes from tracking)
- Authentication before subscribing to private channels
Configuration:
# Disable automatic subscription restoration
{:ok, client} = ZenWebsocket.Client.connect("wss://api.example.com/ws",
restore_subscriptions: false
)For more detailed examples, see our working examples with fully tested implementations:
- Basic Usage - Connection management and messaging
- Error Handling - Robust error recovery patterns
- JSON-RPC Client - JSON-RPC 2.0 protocol usage
- Subscription Management - Channel subscription patterns
See the Examples Guide for complete code samples and usage patterns.
Session Recording
Record WebSocket sessions for debugging and replay:
# Enable recording when connecting
{:ok, client} = ZenWebsocket.Client.connect("wss://api.example.com/ws",
record_to: "/tmp/session.jsonl"
)
# Use the connection normally - all messages are recorded
ZenWebsocket.Client.send_message(client, %{action: "subscribe", channel: "trades"})
# Close to flush remaining buffer
ZenWebsocket.Client.close(client)
# Replay the recorded session
ZenWebsocket.Recorder.replay("/tmp/session.jsonl", fn entry ->
IO.inspect(entry, label: "#{entry.dir} at #{entry.ts}")
end)
# Get session metadata
{:ok, meta} = ZenWebsocket.Recorder.metadata("/tmp/session.jsonl")
# => %{count: 42, inbound: 30, outbound: 12, duration_ms: 5000, ...}Connection Pool Load Balancing
Route messages to the healthiest connection in a pool:
# Start the supervisor in your application
{:ok, _} = ZenWebsocket.ClientSupervisor.start_link([])
# Create multiple supervised connections
{:ok, _client1} = ZenWebsocket.ClientSupervisor.start_client("wss://api.example.com/ws")
{:ok, _client2} = ZenWebsocket.ClientSupervisor.start_client("wss://api.example.com/ws")
{:ok, _client3} = ZenWebsocket.ClientSupervisor.start_client("wss://api.example.com/ws")
# Send messages with automatic load balancing
# Routes to healthiest connection based on: pending requests, latency, errors
:ok = ZenWebsocket.ClientSupervisor.send_balanced(message)
# Automatic failover on connection failure (max 3 attempts by default)
:ok = ZenWebsocket.ClientSupervisor.send_balanced(message, max_attempts: 5)
# Check pool health
health = ZenWebsocket.PoolRouter.pool_health(ZenWebsocket.ClientSupervisor.list_clients())
# => [%{pid: #PID<0.123.0>, health: 95}, %{pid: #PID<0.124.0>, health: 87}, ...]Deribit Integration
# Configure Deribit credentials
config = %{
client_id: System.get_env("DERIBIT_CLIENT_ID"),
client_secret: System.get_env("DERIBIT_CLIENT_SECRET"),
test_mode: true
}
# Start the supervised adapter
{:ok, adapter} = ZenWebsocket.Examples.DeribitGenServerAdapter.start_link(config)
# Subscribe to market data
{:ok, _} = ZenWebsocket.Examples.DeribitGenServerAdapter.subscribe(
adapter,
["book.BTC-PERPETUAL.raw", "trades.BTC-PERPETUAL.raw"]
)
# Send a custom JSON-RPC request (e.g., place an order)
{:ok, response} = ZenWebsocket.Examples.DeribitGenServerAdapter.send_request(
adapter,
"private/buy",
%{
instrument_name: "BTC-PERPETUAL",
amount: 10,
type: "limit",
price: 50000
}
)Architecture
ZenWebsocket follows a modular architecture with clear separation of concerns:
ZenWebsocket.Client # Main client interface
ZenWebsocket.Config # Configuration management
ZenWebsocket.Frame # WebSocket frame handling
ZenWebsocket.Reconnection # Automatic reconnection logic
ZenWebsocket.MessageHandler # Message parsing and routing
ZenWebsocket.ErrorHandler # Error categorization
ZenWebsocket.RateLimiter # API rate limiting
ZenWebsocket.JsonRpc # JSON-RPC 2.0 protocol
ZenWebsocket.HeartbeatManager # Heartbeat lifecycle management
ZenWebsocket.SubscriptionManager # Subscription tracking and restoration
ZenWebsocket.RequestCorrelator # Request/response correlation tracking
ZenWebsocket.Recorder # Session recording (pure functions)
ZenWebsocket.RecorderServer # Async file I/O for recording
ZenWebsocket.PoolRouter # Health-based connection pool routing
ZenWebsocket.Testing # Consumer-facing test utilitiesPlatform Integration
The library includes a complete Deribit adapter as a reference implementation. To integrate with other platforms:
- Create an adapter module following the Deribit pattern
- Implement platform-specific authentication
- Handle platform message formats
- Add comprehensive tests against the real API
See lib/zen_websocket/examples/deribit_adapter.ex for a complete example.
Documentation
Comprehensive guides are available in the docs/guides/ directory:
| Guide | Description |
|---|---|
| Building Adapters | Create platform adapters with heartbeat, auth, and reconnection patterns |
| Performance Tuning | Configure timeouts, rate limiting, and memory for your use case |
| Troubleshooting Reconnection | Debug connection issues and reconnection logic |
| AGENTS.md | Guide for AI coding agents contributing to this project |
See the full HexDocs documentation for API reference and module documentation.
Configuration Options
| Option | Description | Default |
|---|---|---|
url | WebSocket endpoint URL | required |
headers | Custom headers for connection | [] |
timeout | Connection timeout in milliseconds | 5000 |
retry_count | Maximum reconnection attempts | 3 |
retry_delay | Initial retry delay in milliseconds | 1000 |
heartbeat_interval | Ping interval in milliseconds | 30000 |
reconnect_on_error | Enable automatic reconnection | true |
restore_subscriptions | Restore subscriptions after reconnect | true |
record_to | Path to JSONL file for session recording | nil |
debug | Enable verbose debug logging | false |
Debug Logging
Debug logging is disabled by default to keep library output quiet. Enable it for troubleshooting connection issues:
# Enable debug logging for troubleshooting
{:ok, client} = ZenWebsocket.Client.connect("wss://example.com", debug: true)When enabled, you'll see detailed logs for connection establishment, WebSocket upgrades, frame handling, heartbeats, and reconnection attempts.
Using Debug in Custom Adapters:
If you're building a custom adapter or extension, use ZenWebsocket.Debug.log/2 with the Config struct:
alias ZenWebsocket.Debug
# Always pass the Config struct (not the full state map)
Debug.log(config, "Custom adapter initialized")
Debug.log(state.config, "Processing message: #{inspect(msg)}")
The function is a no-op when debug: false (the default), so you can leave debug statements in production code without performance impact.
Testing Your Application
ZenWebsocket provides test utilities for testing your own WebSocket clients:
defmodule MyApp.WebSocketTest do
use ExUnit.Case
alias ZenWebsocket.Testing
setup do
{:ok, server} = Testing.start_mock_server()
on_exit(fn -> Testing.stop_server(server) end)
{:ok, server: server}
end
test "client sends expected message", %{server: server} do
# Connect your client to the mock server
{:ok, client} = ZenWebsocket.Client.connect(server.url)
# Send a message
ZenWebsocket.Client.send_message(client, ~s({"type": "ping"}))
# Assert the server received it (supports string, regex, map, or function matchers)
assert Testing.assert_message_sent(server, %{"type" => "ping"}, 1000)
ZenWebsocket.Client.close(client)
end
test "client handles server messages", %{server: server} do
{:ok, client} = ZenWebsocket.Client.connect(server.url)
# Inject a message from the server
Testing.inject_message(server, ~s({"type": "notification", "data": "hello"}))
# Your client should receive it
assert_receive {:websocket_message, msg}, 1000
assert String.contains?(msg, "notification")
ZenWebsocket.Client.close(client)
end
test "client handles disconnection", %{server: server} do
{:ok, client} = ZenWebsocket.Client.connect(server.url, reconnect_on_error: false)
# Simulate server disconnect
Testing.simulate_disconnect(server, :going_away)
# Verify client detected disconnect
Process.sleep(100)
refute Process.alive?(client.server_pid)
end
endTesting Philosophy
This library uses real API testing exclusively. No mocks or stubs - every test runs against actual WebSocket endpoints or local test servers. This ensures the library handles real-world conditions including network latency, connection drops, and API quirks.
# Run all tests
mix test
# Run with coverage
mix test --cover
# Run quality checks
mix checkContributing
- Fork the repository
-
Create your feature branch (
git checkout -b feature/amazing-feature) - Write tests using real APIs (no mocks)
-
Ensure all quality checks pass (
mix check) - Commit your changes
- Push to the branch
- Open a Pull Request
Development Commands
mix compile # Compile the project
mix test # Run test suite
mix lint # Run Credo analysis
mix typecheck # Run Dialyzer
mix docs # Generate documentation
mix check # Run all quality checksLicense
This project is licensed under the MIT License.
Links
Acknowledgments
Built for the Elixir community by ZenHive.