GrpcConnectionPool
A high-performance gRPC connection pooling library for Elixir with zero-GenServer-call hot path, pluggable selection strategies, and automatic health monitoring.
Overview
GrpcConnectionPool was extracted from a production Pub/Sub gRPC client and optimized for maximum throughput. The hot path (get_channel) does zero GenServer calls — channels are stored in ETS for O(1) indexed access with lock-free atomics-based round-robin.
Key Features
- Zero GenServer.call hot path — 2M+ ops/sec single-process throughput
- Pluggable strategies — round-robin (atomics), random, power-of-two-choices, or custom
- O(1) channel selection — constant time regardless of pool size
:persistent_termconfig — zero-copy reads for configuration- Health monitoring — automatic connection health checks and recovery
- Connection warming — periodic pings to prevent idle connection timeouts
- Exponential backoff — configurable retry with jitter
- Multiple pools — support for multiple named pools serving different services
- Dynamic scaling —
scale_up/2,scale_down/2,resize/2at runtime await_ready/2— block until pool has connections, useful for startup- Rich telemetry — comprehensive events for observability
Architecture
Pool.Supervisor (one_for_one)
+-- PoolState (GenServer -- owns ETS, persistent_term setup)
+-- Registry (health tracking)
+-- DynamicSupervisor --> Workers
+-- WorkerStarter (Task, temporary)
+-- TelemetryReporter (GenServer, periodic status)Hot Path
Pool.get_channel()
-> ETS.lookup(:channel_count) # O(1)
-> persistent_term.get(strategy) # O(1), zero-copy
-> Strategy.select (atomics.add_get) # O(1), lock-free
-> ETS.lookup({:channel, index}) # O(1)
-> return channel # no GenServer!Design Decisions
Custom pool over NimblePool — “no checkout” model where channels are returned directly. No checkout/checkin overhead.
ETS + atomics over GenServer.call — Workers store channels in ETS on connect/disconnect. The hot path reads ETS directly with zero message passing.
Pluggable strategies — Different workloads need different selection. Round-robin is default, but random avoids correlated hot-spotting and power-of-two handles uneven loads.
Separation of Concerns:
Config: Configuration management with strategy supportWorker: Connection lifecycle, stores channels in ETSPool: Zero-GenServer hot path, strategy dispatchPoolState: ETS ownership, persistent_term setupStrategy: Behaviour for pluggable selection
Installation
Add grpc_connection_pool to your dependencies in mix.exs:
def deps do
[
{:grpc_connection_pool, "~> 0.3.0"},
{:grpc, "~> 0.11.5"} # Required peer dependency
]
endQuick Start
1. Basic Usage
# Create configuration
{:ok, config} = GrpcConnectionPool.Config.production(
host: "api.example.com",
port: 443,
pool_size: 5
)
# Start pool
{:ok, _pid} = GrpcConnectionPool.start_link(config)
# Execute gRPC operations
operation = fn channel ->
request = %MyService.ListRequest{}
MyService.Stub.list(channel, request)
end
{:ok, response} = GrpcConnectionPool.execute(operation)2. Local Development
{:ok, config} = GrpcConnectionPool.Config.local(
host: "localhost",
port: 9090,
pool_size: 3
)
{:ok, _pid} = GrpcConnectionPool.start_link(config)Configuration
Configuration Options
The library supports flexible configuration through GrpcConnectionPool.Config:
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production, # :production, :local, or custom atom
host: "api.example.com", # Required: gRPC server hostname
port: 443, # Required: gRPC server port
ssl: [], # SSL options ([] for default SSL)
credentials: nil, # Custom GRPC.Credential (overrides ssl)
interceptors: nil, # List of gRPC client interceptors (modules)
retry_config: [ # Optional: retry configuration
max_attempts: 3,
base_delay: 1000,
max_delay: 5000
]
],
pool: [
size: 5, # Number of connections in pool
name: MyApp.GrpcPool, # Pool name (must be unique)
strategy: :round_robin # :round_robin | :random | :power_of_two | CustomModule
],
connection: [
keepalive: 30_000, # HTTP/2 keepalive interval
ping_interval: 25_000, # Ping interval to keep connections warm
health_check: true, # Enable connection health monitoring
suppress_connection_errors: false # Suppress gun_down/gun_error logs (useful for GCP endpoints)
]
])Configuration from Environment
In config/config.exs:
# Single service configuration
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: []
],
pool: [
size: 10,
name: MyApp.GrpcPool
],
connection: [
ping_interval: 30_000
]
# Multiple service configuration
config :my_app, :service_a,
endpoint: [type: :production, host: "service-a.example.com"],
pool: [size: 5, name: MyApp.ServiceA.Pool]
config :my_app, :service_b,
endpoint: [type: :production, host: "service-b.example.com"],
pool: [size: 8, name: MyApp.ServiceB.Pool]Complete Configuration Examples
Basic Production Configuration
# For a production gRPC service with SSL
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: [] # Use default SSL settings
],
pool: [
size: 10,
name: MyApp.GrpcPool
],
connection: [
keepalive: 30_000, # Send keepalive every 30 seconds
ping_interval: 25_000, # Ping every 25 seconds to keep warm
health_check: true
]Local Development Configuration
# For local development with a gRPC server (no SSL)
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090
],
pool: [
size: 3, # Smaller pool for development
name: MyApp.DevPool
]Test Environment with Emulator
# For testing with Google Pub/Sub emulator or similar
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085,
retry_config: [
max_attempts: 3,
base_delay: 1000,
max_delay: 5000
]
],
pool: [
size: 2, # Small pool for tests
name: MyApp.TestPool
],
connection: [
ping_interval: nil # Disable pinging in tests
]Multiple Service Configuration
# Configuration for multiple gRPC services
config :my_app, :service_a,
endpoint: [
type: :production,
host: "service-a.example.com",
port: 443
],
pool: [
size: 5,
name: MyApp.ServiceAPool
]
config :my_app, :service_b,
endpoint: [
type: :production,
host: "service-b.example.com",
port: 443
],
pool: [
size: 8,
name: MyApp.ServiceBPool
]Advanced SSL Configuration
# Custom SSL configuration with client certificates
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "secure-api.example.com",
port: 443,
credentials: GRPC.Credential.new(ssl: [
verify: :verify_peer,
cacertfile: "/path/to/ca.pem",
certfile: "/path/to/client.pem",
keyfile: "/path/to/client-key.pem"
])
]Environment-Specific Configuration
# Different configurations per environment using Mix.env()
case Mix.env() do
:prod ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443
],
pool: [size: 20] # Large pool for production
:dev ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090
],
pool: [size: 3] # Small pool for development
:test ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085
],
pool: [size: 1], # Minimal pool for tests
connection: [
ping_interval: nil # No pinging needed in tests
]
endLoad from environment:
# Single pool
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
# Multiple pools
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)Runtime Configuration
For configuration determined at runtime (e.g., from environment variables):
In config/runtime.exs:
import Config
if config_env() == :prod do
# Get configuration from environment variables
grpc_host = System.get_env("GRPC_HOST") || "api.example.com"
grpc_port = System.get_env("GRPC_PORT", "443") |> String.to_integer()
pool_size = System.get_env("GRPC_POOL_SIZE", "10") |> String.to_integer()
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: grpc_host,
port: grpc_port
],
pool: [size: pool_size]
endAdding to Supervisor Tree
Single Pool
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configuration
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
children = [
# Your other services...
{GrpcConnectionPool, config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endMultiple Pools for Different Services
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configurations for different services
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
children = [
# Your other services...
{GrpcConnectionPool, service_a_config},
{GrpcConnectionPool, service_b_config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endManual Pool Configuration
def start(_type, _args) do
# Create configurations programmatically
{:ok, user_service_config} = GrpcConnectionPool.Config.production(
host: "users.example.com",
pool_name: MyApp.UserService.Pool,
pool_size: 5
)
{:ok, payment_service_config} = GrpcConnectionPool.Config.production(
host: "payments.example.com",
pool_name: MyApp.PaymentService.Pool,
pool_size: 3
)
children = [
{GrpcConnectionPool, user_service_config},
{GrpcConnectionPool, payment_service_config}
]
Supervisor.start_link(children, strategy: :one_for_one)
endComplete Application Module Examples
Single Pool Application
# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Single pool from config
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
children = [
# Your other processes...
{GrpcConnectionPool, config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endMultiple Pool Application
# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configurations for different services
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
children = [
# Your other services...
{GrpcConnectionPool, service_a_config},
{GrpcConnectionPool, service_b_config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endUsage Examples
Basic Operations
# Simple operation with default pool
operation = fn channel ->
request = %MyService.GetUserRequest{user_id: "123"}
MyService.Stub.get_user(channel, request)
end
case GrpcConnectionPool.execute(operation) do
{:ok, {:ok, user}} -> IO.puts("Found user: #{user.name}")
{:ok, {:error, error}} -> IO.puts("gRPC error: #{inspect(error)}")
{:error, reason} -> IO.puts("Pool error: #{inspect(reason)}")
endUsing Named Pools
# Execute on specific pool
user_operation = fn channel ->
request = %UserService.GetRequest{id: "123"}
UserService.Stub.get(channel, request)
end
payment_operation = fn channel ->
request = %PaymentService.ChargeRequest{amount: 1000}
PaymentService.Stub.charge(channel, request)
end
# Use different pools for different services
{:ok, user} = GrpcConnectionPool.execute(user_operation, pool: MyApp.UserService.Pool)
{:ok, charge} = GrpcConnectionPool.execute(payment_operation, pool: MyApp.PaymentService.Pool)Error Handling
operation = fn channel ->
request = %MyService.CreateRequest{data: "important data"}
MyService.Stub.create(channel, request)
end
case GrpcConnectionPool.execute(operation) do
{:ok, {:ok, result}} ->
# Success case
handle_success(result)
{:ok, {:error, %GRPC.RPCError{status: status, message: message}}} ->
# gRPC-level error (service returned error)
handle_grpc_error(status, message)
{:error, :not_connected} ->
# Pool has no healthy connections
handle_connection_error()
{:error, {:exit, {:noproc, _}}} ->
# Pool doesn't exist
handle_pool_missing_error()
{:error, reason} ->
# Other pool-level errors
handle_pool_error(reason)
endCustom SSL Configuration
# For services requiring client certificates
ssl_config = [
verify: :verify_peer,
cacertfile: "/path/to/ca.pem",
certfile: "/path/to/client.pem",
keyfile: "/path/to/client-key.pem"
]
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "secure-api.example.com",
port: 443,
ssl: ssl_config
],
pool: [size: 5]
])Using Custom gRPC Credentials
# For advanced authentication scenarios
credentials = GRPC.Credential.new(ssl: [
verify: :verify_peer,
cacerts: :public_key.cacerts_get()
])
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
credentials: credentials # Overrides ssl config
]
])Using gRPC Interceptors
# Define interceptors for logging, authentication, metrics, etc.
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
interceptors: [
MyApp.LoggingInterceptor, # Log all requests/responses
MyApp.AuthInterceptor, # Add authentication headers
MyApp.MetricsInterceptor # Track metrics
]
],
pool: [size: 10]
])
# Interceptors are applied to all connections in the pool
{:ok, _pid} = GrpcConnectionPool.start_link(config)Interceptors are modules that implement the gRPC interceptor behavior and are applied to all gRPC calls made through channels from the pool. They can be used for:
- Request/response logging
- Authentication header injection
- Metrics collection
- Request/response transformation
- Error handling and retry logic
Environment-Specific Usage
Development Environment
# config/dev.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090 # Local gRPC server
],
pool: [size: 2], # Smaller pool for development
connection: [
ping_interval: nil # Disable pinging for local development
]Test Environment with Emulators
# config/test.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085, # Emulator port
retry_config: [
max_attempts: 3,
base_delay: 500, # Faster retries for tests
max_delay: 2000
]
],
pool: [size: 1], # Minimal pool for tests
connection: [
ping_interval: nil, # No pinging needed in tests
health_check: false # Disable health checks in tests
]Production Environment
# config/prod.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: [] # Use default SSL
],
pool: [
size: 20, # Large pool for production load
checkout_timeout: 10_000
],
connection: [
keepalive: 30_000,
ping_interval: 25_000,
health_check: true
]Advanced Features
Pool Monitoring
# Check pool status
status = GrpcConnectionPool.status()
IO.inspect(status) # %{pool_name: GrpcConnectionPool.Pool, status: :running}
# Check specific pool
status = GrpcConnectionPool.status(MyApp.UserService.Pool)Graceful Shutdown
# Stop specific pool
:ok = GrpcConnectionPool.stop(MyApp.UserService.Pool)
# The pool will be automatically restarted by the supervisor if neededTelemetry Events
The library emits telemetry events for observability. Attach handlers to monitor pool and connection behavior.
Pool Events
| Event | Measurements | Metadata |
|---|---|---|
[:grpc_connection_pool, :pool, :init] | pool_size | pool_name, endpoint |
[:grpc_connection_pool, :pool, :get_channel] | duration | pool_name, available_channels |
[:grpc_connection_pool, :pool, :scale_up] | duration, requested, succeeded, failed, new_size | pool_name |
[:grpc_connection_pool, :pool, :scale_down] | duration, requested, terminated, new_size | pool_name |
[:grpc_connection_pool, :pool, :status] | expected_size, current_size | pool_name |
Channel Events
| Event | Measurements | Metadata |
|---|---|---|
[:grpc_connection_pool, :channel, :connected] | duration | pool_name |
[:grpc_connection_pool, :channel, :connection_failed] | duration | pool_name, error |
[:grpc_connection_pool, :channel, :disconnected] | duration | pool_name, reason |
[:grpc_connection_pool, :channel, :ping] | duration | pool_name, result (:ok or :error) |
[:grpc_connection_pool, :channel, :gun_down] | - | pool_name, reason, protocol |
[:grpc_connection_pool, :channel, :gun_error] | - | pool_name, reason |
[:grpc_connection_pool, :channel, :reconnect_scheduled] | delay_ms, attempt | pool_name, reason |
Example: Attaching Telemetry Handlers
# In your application startup
:telemetry.attach_many(
"grpc-pool-handler",
[
[:grpc_connection_pool, :pool, :init],
[:grpc_connection_pool, :pool, :get_channel],
[:grpc_connection_pool, :channel, :connected],
[:grpc_connection_pool, :channel, :disconnected],
[:grpc_connection_pool, :channel, :reconnect_scheduled]
],
&MyApp.Telemetry.handle_event/4,
nil
)
# Handler module
defmodule MyApp.Telemetry do
require Logger
def handle_event([:grpc_connection_pool, :pool, :init], measurements, metadata, _config) do
Logger.info("Pool #{metadata.pool_name} initialized with #{measurements.pool_size} connections to #{metadata.endpoint}")
end
def handle_event([:grpc_connection_pool, :channel, :connected], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
Logger.debug("Channel connected for #{metadata.pool_name} in #{duration_ms}ms")
end
def handle_event([:grpc_connection_pool, :channel, :reconnect_scheduled], measurements, metadata, _config) do
Logger.warning("Reconnect scheduled for #{metadata.pool_name}: attempt #{measurements.attempt}, delay #{measurements.delay_ms}ms, reason: #{inspect(metadata.reason)}")
end
def handle_event(_event, _measurements, _metadata, _config), do: :ok
endConnection Health
The library automatically monitors connection health and replaces dead connections. You can also check worker status:
# This is mainly for debugging - not needed in normal usage
worker_pid = :poolex.checkout(MyApp.UserService.Pool)
status = GrpcConnectionPool.Worker.status(worker_pid) # :connected | :disconnected
:poolex.checkin(MyApp.UserService.Pool, worker_pid)Testing
Unit Tests
mix test --exclude emulatorIntegration Tests with Emulator
Start the Google Pub/Sub emulator (or any gRPC service emulator):
# Using Docker Compose (if available)
docker-compose up -d
# Or manually
docker run --rm -p 8085:8085 google/cloud-sdk:emulators-pubsub \
/google-cloud-sdk/bin/gcloud beta emulators pubsub start \
--host-port=0.0.0.0:8085 --project=test-projectRun integration tests:
mix test --only emulatorRun all tests:
mix testTesting in Your Application
# In your test helper
defmodule MyApp.TestHelper do
def start_test_pool do
{:ok, config} = GrpcConnectionPool.Config.local(
host: "localhost",
port: 8085, # Your test service port
pool_size: 1
)
{:ok, _pid} = GrpcConnectionPool.start_link(config, name: TestPool)
end
def stop_test_pool do
GrpcConnectionPool.stop(TestPool)
end
end
# In your tests
defmodule MyApp.GrpcTest do
use ExUnit.Case
setup do
MyApp.TestHelper.start_test_pool()
on_exit(&MyApp.TestHelper.stop_test_pool/0)
end
test "grpc operation works" do
operation = fn channel ->
# Your gRPC test operation
end
assert {:ok, result} = GrpcConnectionPool.execute(operation, pool: TestPool)
end
endConnection Strategies
The pool supports pluggable channel selection strategies via the GrpcConnectionPool.Strategy behaviour.
Built-in Strategies
| Strategy | Config | Description | Best For |
|---|---|---|---|
| Round Robin | :round_robin | Lock-free atomics counter, cycles through channels sequentially | General use (default) |
| Random | :random |
Random channel selection via :rand.uniform | Avoiding correlated hot-spotting |
| Power of Two | :power_of_two | Pick 2 random channels, choose least-recently-used | Uneven workloads |
Configuring a Strategy
{:ok, config} = GrpcConnectionPool.Config.new(
endpoint: [host: "api.example.com", port: 443],
pool: [size: 10, strategy: :random]
)Custom Strategies
Implement the GrpcConnectionPool.Strategy behaviour:
defmodule MyApp.WeightedStrategy do
@behaviour GrpcConnectionPool.Strategy
@impl true
def init(_pool_name, _pool_size) do
# Return any state — stored in :persistent_term
%{weights: [0.5, 0.3, 0.2]}
end
@impl true
def select(state, channel_count, _ets_table) do
# Must return {:ok, index} where 0 <= index < channel_count
{:ok, weighted_random(state.weights, channel_count)}
end
defp weighted_random(_weights, count), do: :rand.uniform(count) - 1
endThen use it:
config = GrpcConnectionPool.Config.new(
pool: [strategy: MyApp.WeightedStrategy]
)Strategy Performance
Benchmarked with pool_size=10:
| Strategy | Throughput | Avg Latency | Memory/call |
|---|---|---|---|
| round_robin | 1.91M ips | 522 ns | 0.72 KB |
| random | 1.87M ips | 534 ns | 1.05 KB |
| power_of_two | 1.08M ips | 925 ns | 2.05 KB |
Performance Considerations
Pool Sizing
- Small services: 3-5 connections per pool
- Medium load: 5-10 connections per pool
- High load: 10-20+ connections per pool
- Multiple services: Separate pools with appropriate sizing
Connection Settings
config :my_app, GrpcConnectionPool,
connection: [
keepalive: 30_000, # Keep connections alive (important for cloud services)
ping_interval: 25_000, # Ping before cloud timeouts (usually 60s)
health_check: true # Enable automatic health monitoring
]Monitoring
Monitor pool performance using:
- Connection establishment logs
- Pool checkout timeouts
- gRPC operation latencies
- Connection health check failures
Troubleshooting
Common Issues
:noprocerrors: Pool not started or wrong pool name- Connection timeouts: Network issues or service unavailable
- SSL errors: Incorrect SSL configuration or certificates
- Pool checkout timeouts: Pool too small or slow operations
Debug Logging
Enable debug logging to troubleshoot issues:
# config/config.exs
config :logger, level: :info
# In your code
require Logger
Logger.info("Pool status: #{inspect(GrpcConnectionPool.status())}")Health Check Failures
If you see frequent reconnections:
- Check network connectivity
- Verify service availability
-
Adjust
ping_intervalandkeepalivesettings - Check for firewall or load balancer timeouts
Connection Error Messages with GCP Services
GCP gRPC endpoints have hardcoded timeouts that periodically close connections, causing gun_down error messages. This is normal behavior and the connection pool will automatically replace the workers. To suppress these expected error messages:
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "pubsub.googleapis.com", # Or other GCP gRPC endpoints
port: 443
],
connection: [
suppress_connection_errors: true # Suppress expected gun_down errors from GCP
]The worker processes will still reconnect with backoff and the pool will automatically recover.
Contributing
- Fork the repository
-
Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Add tests for your changes
-
Run the test suite (
mix test) -
Commit your changes (
git commit -m 'Add amazing feature') -
Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Development Setup
git clone https://github.com/nyo16/grpc_connection_pool.git
cd grpc_connection_pool
mix deps.get
mix testLicense
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments
- Inspired by production requirements from Google Cloud Pub/Sub integration
- Thanks to the Elixir gRPC community for the solid foundation
Documentation: https://hexdocs.pm/grpc_connection_pool
Source Code: https://github.com/nyo16/grpc_connection_pool
Issues: https://github.com/nyo16/grpc_connection_pool/issues