GrpcConnectionPool

Hex.pmDocumentationLicense

A high-performance gRPC connection pooling library for Elixir with zero-GenServer-call hot path, pluggable selection strategies, and automatic health monitoring.

Overview

GrpcConnectionPool was extracted from a production Pub/Sub gRPC client and optimized for maximum throughput. The hot path (get_channel) does zero GenServer calls — channels are stored in ETS for O(1) indexed access with lock-free atomics-based round-robin.

Key Features

Architecture

Pool.Supervisor (one_for_one)
+-- PoolState (GenServer -- owns ETS, persistent_term setup)
+-- Registry (health tracking)
+-- DynamicSupervisor --> Workers
+-- WorkerStarter (Task, temporary)
+-- TelemetryReporter (GenServer, periodic status)

Hot Path

Pool.get_channel()
-> ETS.lookup(:channel_count) # O(1)
-> persistent_term.get(strategy) # O(1), zero-copy
-> Strategy.select (atomics.add_get) # O(1), lock-free
-> ETS.lookup({:channel, index}) # O(1)
-> return channel # no GenServer!

Design Decisions

  1. Custom pool over NimblePool — "no checkout" model where channels are returned directly. No checkout/checkin overhead.

  2. ETS + atomics over GenServer.call — Workers store channels in ETS on connect/disconnect. The hot path reads ETS directly with zero message passing.

  3. Pluggable strategies — Different workloads need different selection. Round-robin is default, but random avoids correlated hot-spotting and power-of-two handles uneven loads.

  4. Separation of Concerns:

    • Config: Configuration management with strategy support
    • Worker: Connection lifecycle, stores channels in ETS
    • Pool: Zero-GenServer hot path, strategy dispatch
    • PoolState: ETS ownership, persistent_term setup
    • Strategy: Behaviour for pluggable selection

Installation

Add grpc_connection_pool to your dependencies in mix.exs:

def deps do
[
{:grpc_connection_pool, "~> 0.3.0"},
{:grpc, "~> 0.11.5"} # Required peer dependency
]
end

Quick Start

1. Basic Usage

# Create configuration
{:ok, config} = GrpcConnectionPool.Config.production(
host: "api.example.com",
port: 443,
pool_size: 5
)
# Start pool
{:ok, _pid} = GrpcConnectionPool.start_link(config)
# Execute gRPC operations
operation = fn channel ->
request = %MyService.ListRequest{}
MyService.Stub.list(channel, request)
end
{:ok, response} = GrpcConnectionPool.execute(operation)

2. Local Development

{:ok, config} = GrpcConnectionPool.Config.local(
host: "localhost",
port: 9090,
pool_size: 3
)
{:ok, _pid} = GrpcConnectionPool.start_link(config)

Configuration

Configuration Options

The library supports flexible configuration through GrpcConnectionPool.Config:

{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production, # :production, :local, or custom atom
host: "api.example.com", # Required: gRPC server hostname
port: 443, # Required: gRPC server port
ssl: [], # SSL options ([] for default SSL)
credentials: nil, # Custom GRPC.Credential (overrides ssl)
interceptors: nil, # List of gRPC client interceptors (modules)
retry_config: [ # Optional: retry configuration
max_attempts: 3,
base_delay: 1000,
max_delay: 5000
]
],
pool: [
size: 5, # Number of connections in pool
name: MyApp.GrpcPool, # Pool name (must be unique)
strategy: :round_robin # :round_robin | :random | :power_of_two | CustomModule
],
connection: [
keepalive: 30_000, # HTTP/2 keepalive interval
ping_interval: 25_000, # Ping interval to keep connections warm
health_check: true, # Enable connection health monitoring
suppress_connection_errors: false # Suppress gun_down/gun_error logs (useful for GCP endpoints)
]
])

Configuration from Environment

In config/config.exs:

# Single service configuration
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: []
],
pool: [
size: 10,
name: MyApp.GrpcPool
],
connection: [
ping_interval: 30_000
]
# Multiple service configuration
config :my_app, :service_a,
endpoint: [type: :production, host: "service-a.example.com"],
pool: [size: 5, name: MyApp.ServiceA.Pool]
config :my_app, :service_b,
endpoint: [type: :production, host: "service-b.example.com"],
pool: [size: 8, name: MyApp.ServiceB.Pool]

Complete Configuration Examples

Basic Production Configuration

# For a production gRPC service with SSL
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: [] # Use default SSL settings
],
pool: [
size: 10,
name: MyApp.GrpcPool
],
connection: [
keepalive: 30_000, # Send keepalive every 30 seconds
ping_interval: 25_000, # Ping every 25 seconds to keep warm
health_check: true
]

Local Development Configuration

# For local development with a gRPC server (no SSL)
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090
],
pool: [
size: 3, # Smaller pool for development
name: MyApp.DevPool
]

Test Environment with Emulator

# For testing with Google Pub/Sub emulator or similar
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085,
retry_config: [
max_attempts: 3,
base_delay: 1000,
max_delay: 5000
]
],
pool: [
size: 2, # Small pool for tests
name: MyApp.TestPool
],
connection: [
ping_interval: nil # Disable pinging in tests
]

Multiple Service Configuration

# Configuration for multiple gRPC services
config :my_app, :service_a,
endpoint: [
type: :production,
host: "service-a.example.com",
port: 443
],
pool: [
size: 5,
name: MyApp.ServiceAPool
]
config :my_app, :service_b,
endpoint: [
type: :production,
host: "service-b.example.com",
port: 443
],
pool: [
size: 8,
name: MyApp.ServiceBPool
]

Advanced SSL Configuration

# Custom SSL configuration with client certificates
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "secure-api.example.com",
port: 443,
credentials: GRPC.Credential.new(ssl: [
verify: :verify_peer,
cacertfile: "/path/to/ca.pem",
certfile: "/path/to/client.pem",
keyfile: "/path/to/client-key.pem"
])
]

Environment-Specific Configuration

# Different configurations per environment using Mix.env()
case Mix.env() do
:prod ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443
],
pool: [size: 20] # Large pool for production
:dev ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090
],
pool: [size: 3] # Small pool for development
:test ->
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085
],
pool: [size: 1], # Minimal pool for tests
connection: [
ping_interval: nil # No pinging needed in tests
]
end

Load from environment:

# Single pool
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
# Multiple pools
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)

Runtime Configuration

For configuration determined at runtime (e.g., from environment variables):

In config/runtime.exs:

import Config
if config_env() == :prod do
# Get configuration from environment variables
grpc_host = System.get_env("GRPC_HOST") || "api.example.com"
grpc_port = System.get_env("GRPC_PORT", "443") |> String.to_integer()
pool_size = System.get_env("GRPC_POOL_SIZE", "10") |> String.to_integer()
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: grpc_host,
port: grpc_port
],
pool: [size: pool_size]
end

Adding to Supervisor Tree

Single Pool

defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configuration
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
children = [
# Your other services...
{GrpcConnectionPool, config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end

Multiple Pools for Different Services

defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configurations for different services
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
children = [
# Your other services...
{GrpcConnectionPool, service_a_config},
{GrpcConnectionPool, service_b_config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end

Manual Pool Configuration

def start(_type, _args) do
# Create configurations programmatically
{:ok, user_service_config} = GrpcConnectionPool.Config.production(
host: "users.example.com",
pool_name: MyApp.UserService.Pool,
pool_size: 5
)
{:ok, payment_service_config} = GrpcConnectionPool.Config.production(
host: "payments.example.com",
pool_name: MyApp.PaymentService.Pool,
pool_size: 3
)
children = [
{GrpcConnectionPool, user_service_config},
{GrpcConnectionPool, payment_service_config}
]
Supervisor.start_link(children, strategy: :one_for_one)
end

Complete Application Module Examples

Single Pool Application

# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Single pool from config
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
children = [
# Your other processes...
{GrpcConnectionPool, config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end

Multiple Pool Application

# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
# Load configurations for different services
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
children = [
# Your other services...
{GrpcConnectionPool, service_a_config},
{GrpcConnectionPool, service_b_config}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end

Usage Examples

Basic Operations

# Simple operation with default pool
operation = fn channel ->
request = %MyService.GetUserRequest{user_id: "123"}
MyService.Stub.get_user(channel, request)
end
case GrpcConnectionPool.execute(operation) do
{:ok, {:ok, user}} -> IO.puts("Found user: #{user.name}")
{:ok, {:error, error}} -> IO.puts("gRPC error: #{inspect(error)}")
{:error, reason} -> IO.puts("Pool error: #{inspect(reason)}")
end

Using Named Pools

# Execute on specific pool
user_operation = fn channel ->
request = %UserService.GetRequest{id: "123"}
UserService.Stub.get(channel, request)
end
payment_operation = fn channel ->
request = %PaymentService.ChargeRequest{amount: 1000}
PaymentService.Stub.charge(channel, request)
end
# Use different pools for different services
{:ok, user} = GrpcConnectionPool.execute(user_operation, pool: MyApp.UserService.Pool)
{:ok, charge} = GrpcConnectionPool.execute(payment_operation, pool: MyApp.PaymentService.Pool)

Error Handling

operation = fn channel ->
request = %MyService.CreateRequest{data: "important data"}
MyService.Stub.create(channel, request)
end
case GrpcConnectionPool.execute(operation) do
{:ok, {:ok, result}} ->
# Success case
handle_success(result)
{:ok, {:error, %GRPC.RPCError{status: status, message: message}}} ->
# gRPC-level error (service returned error)
handle_grpc_error(status, message)
{:error, :not_connected} ->
# Pool has no healthy connections
handle_connection_error()
{:error, {:exit, {:noproc, _}}} ->
# Pool doesn't exist
handle_pool_missing_error()
{:error, reason} ->
# Other pool-level errors
handle_pool_error(reason)
end

Custom SSL Configuration

# For services requiring client certificates
ssl_config = [
verify: :verify_peer,
cacertfile: "/path/to/ca.pem",
certfile: "/path/to/client.pem",
keyfile: "/path/to/client-key.pem"
]
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "secure-api.example.com",
port: 443,
ssl: ssl_config
],
pool: [size: 5]
])

Using Custom gRPC Credentials

# For advanced authentication scenarios
credentials = GRPC.Credential.new(ssl: [
verify: :verify_peer,
cacerts: :public_key.cacerts_get()
])
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
credentials: credentials # Overrides ssl config
]
])

Using gRPC Interceptors

# Define interceptors for logging, authentication, metrics, etc.
{:ok, config} = GrpcConnectionPool.Config.new([
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
interceptors: [
MyApp.LoggingInterceptor, # Log all requests/responses
MyApp.AuthInterceptor, # Add authentication headers
MyApp.MetricsInterceptor # Track metrics
]
],
pool: [size: 10]
])
# Interceptors are applied to all connections in the pool
{:ok, _pid} = GrpcConnectionPool.start_link(config)

Interceptors are modules that implement the gRPC interceptor behavior and are applied to all gRPC calls made through channels from the pool. They can be used for:

Environment-Specific Usage

Development Environment

# config/dev.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :local,
host: "localhost",
port: 9090 # Local gRPC server
],
pool: [size: 2], # Smaller pool for development
connection: [
ping_interval: nil # Disable pinging for local development
]

Test Environment with Emulators

# config/test.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :test,
host: "localhost",
port: 8085, # Emulator port
retry_config: [
max_attempts: 3,
base_delay: 500, # Faster retries for tests
max_delay: 2000
]
],
pool: [size: 1], # Minimal pool for tests
connection: [
ping_interval: nil, # No pinging needed in tests
health_check: false # Disable health checks in tests
]

Production Environment

# config/prod.exs
config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "api.example.com",
port: 443,
ssl: [] # Use default SSL
],
pool: [
size: 20, # Large pool for production load
checkout_timeout: 10_000
],
connection: [
keepalive: 30_000,
ping_interval: 25_000,
health_check: true
]

Advanced Features

Pool Monitoring

# Check pool status
status = GrpcConnectionPool.status()
IO.inspect(status) # %{pool_name: GrpcConnectionPool.Pool, status: :running}
# Check specific pool
status = GrpcConnectionPool.status(MyApp.UserService.Pool)

Graceful Shutdown

# Stop specific pool
:ok = GrpcConnectionPool.stop(MyApp.UserService.Pool)
# The pool will be automatically restarted by the supervisor if needed

Telemetry Events

The library emits telemetry events for observability. Attach handlers to monitor pool and connection behavior.

Pool Events

EventMeasurementsMetadata
[:grpc_connection_pool, :pool, :init]pool_sizepool_name, endpoint
[:grpc_connection_pool, :pool, :get_channel]durationpool_name, available_channels
[:grpc_connection_pool, :pool, :scale_up]duration, requested, succeeded, failed, new_sizepool_name
[:grpc_connection_pool, :pool, :scale_down]duration, requested, terminated, new_sizepool_name
[:grpc_connection_pool, :pool, :status]expected_size, current_sizepool_name

Channel Events

EventMeasurementsMetadata
[:grpc_connection_pool, :channel, :connected]durationpool_name
[:grpc_connection_pool, :channel, :connection_failed]durationpool_name, error
[:grpc_connection_pool, :channel, :disconnected]durationpool_name, reason
[:grpc_connection_pool, :channel, :ping]durationpool_name, result (:ok or :error)
[:grpc_connection_pool, :channel, :gun_down]-pool_name, reason, protocol
[:grpc_connection_pool, :channel, :gun_error]-pool_name, reason
[:grpc_connection_pool, :channel, :reconnect_scheduled]delay_ms, attemptpool_name, reason

Example: Attaching Telemetry Handlers

# In your application startup
:telemetry.attach_many(
"grpc-pool-handler",
[
[:grpc_connection_pool, :pool, :init],
[:grpc_connection_pool, :pool, :get_channel],
[:grpc_connection_pool, :channel, :connected],
[:grpc_connection_pool, :channel, :disconnected],
[:grpc_connection_pool, :channel, :reconnect_scheduled]
],
&MyApp.Telemetry.handle_event/4,
nil
)
# Handler module
defmodule MyApp.Telemetry do
require Logger
def handle_event([:grpc_connection_pool, :pool, :init], measurements, metadata, _config) do
Logger.info("Pool #{metadata.pool_name} initialized with #{measurements.pool_size} connections to #{metadata.endpoint}")
end
def handle_event([:grpc_connection_pool, :channel, :connected], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
Logger.debug("Channel connected for #{metadata.pool_name} in #{duration_ms}ms")
end
def handle_event([:grpc_connection_pool, :channel, :reconnect_scheduled], measurements, metadata, _config) do
Logger.warning("Reconnect scheduled for #{metadata.pool_name}: attempt #{measurements.attempt}, delay #{measurements.delay_ms}ms, reason: #{inspect(metadata.reason)}")
end
def handle_event(_event, _measurements, _metadata, _config), do: :ok
end

Connection Health

The library automatically monitors connection health and replaces dead connections. You can also check worker status:

# This is mainly for debugging - not needed in normal usage
worker_pid = :poolex.checkout(MyApp.UserService.Pool)
status = GrpcConnectionPool.Worker.status(worker_pid) # :connected | :disconnected
:poolex.checkin(MyApp.UserService.Pool, worker_pid)

Testing

Unit Tests

mix test --exclude emulator

Integration Tests with Emulator

Start the Google Pub/Sub emulator (or any gRPC service emulator):

# Using Docker Compose (if available)
docker-compose up -d
# Or manually
docker run --rm -p 8085:8085 google/cloud-sdk:emulators-pubsub \
/google-cloud-sdk/bin/gcloud beta emulators pubsub start \
--host-port=0.0.0.0:8085 --project=test-project

Run integration tests:

mix test --only emulator

Run all tests:

mix test

Testing in Your Application

# In your test helper
defmodule MyApp.TestHelper do
def start_test_pool do
{:ok, config} = GrpcConnectionPool.Config.local(
host: "localhost",
port: 8085, # Your test service port
pool_size: 1
)
{:ok, _pid} = GrpcConnectionPool.start_link(config, name: TestPool)
end
def stop_test_pool do
GrpcConnectionPool.stop(TestPool)
end
end
# In your tests
defmodule MyApp.GrpcTest do
use ExUnit.Case
setup do
MyApp.TestHelper.start_test_pool()
on_exit(&MyApp.TestHelper.stop_test_pool/0)
end
test "grpc operation works" do
operation = fn channel ->
# Your gRPC test operation
end
assert {:ok, result} = GrpcConnectionPool.execute(operation, pool: TestPool)
end
end

Connection Strategies

The pool supports pluggable channel selection strategies via the GrpcConnectionPool.Strategy behaviour.

Built-in Strategies

StrategyConfigDescriptionBest For
Round Robin:round_robinLock-free atomics counter, cycles through channels sequentiallyGeneral use (default)
Random:randomRandom channel selection via :rand.uniformAvoiding correlated hot-spotting
Power of Two:power_of_twoPick 2 random channels, choose least-recently-usedUneven workloads

Configuring a Strategy

{:ok, config} = GrpcConnectionPool.Config.new(
endpoint: [host: "api.example.com", port: 443],
pool: [size: 10, strategy: :random]
)

Custom Strategies

Implement the GrpcConnectionPool.Strategy behaviour:

defmodule MyApp.WeightedStrategy do
@behaviour GrpcConnectionPool.Strategy
@impl true
def init(_pool_name, _pool_size) do
# Return any state — stored in :persistent_term
%{weights: [0.5, 0.3, 0.2]}
end
@impl true
def select(state, channel_count, _ets_table) do
# Must return {:ok, index} where 0 <= index < channel_count
{:ok, weighted_random(state.weights, channel_count)}
end
defp weighted_random(_weights, count), do: :rand.uniform(count) - 1
end

Then use it:

config = GrpcConnectionPool.Config.new(
pool: [strategy: MyApp.WeightedStrategy]
)

Strategy Performance

Benchmarked with pool_size=10:

StrategyThroughputAvg LatencyMemory/call
round_robin1.91M ips522 ns0.72 KB
random1.87M ips534 ns1.05 KB
power_of_two1.08M ips925 ns2.05 KB

Performance Considerations

Pool Sizing

Connection Settings

config :my_app, GrpcConnectionPool,
connection: [
keepalive: 30_000, # Keep connections alive (important for cloud services)
ping_interval: 25_000, # Ping before cloud timeouts (usually 60s)
health_check: true # Enable automatic health monitoring
]

Monitoring

Monitor pool performance using:

Troubleshooting

Common Issues

  1. :noproc errors: Pool not started or wrong pool name
  2. Connection timeouts: Network issues or service unavailable
  3. SSL errors: Incorrect SSL configuration or certificates
  4. Pool checkout timeouts: Pool too small or slow operations

Debug Logging

Enable debug logging to troubleshoot issues:

# config/config.exs
config :logger, level: :info
# In your code
require Logger
Logger.info("Pool status: #{inspect(GrpcConnectionPool.status())}")

Health Check Failures

If you see frequent reconnections:

  1. Check network connectivity
  2. Verify service availability
  3. Adjust ping_interval and keepalive settings
  4. Check for firewall or load balancer timeouts

Connection Error Messages with GCP Services

GCP gRPC endpoints have hardcoded timeouts that periodically close connections, causing gun_down error messages. This is normal behavior and the connection pool will automatically replace the workers. To suppress these expected error messages:

config :my_app, GrpcConnectionPool,
endpoint: [
type: :production,
host: "pubsub.googleapis.com", # Or other GCP gRPC endpoints
port: 443
],
connection: [
suppress_connection_errors: true # Suppress expected gun_down errors from GCP
]

The worker processes will still reconnect with backoff and the pool will automatically recover.

Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite (mix test)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

Development Setup

git clone https://github.com/nyo16/grpc_connection_pool.git
cd grpc_connection_pool
mix deps.get
mix test

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments


Documentation: https://hexdocs.pm/grpc_connection_pool
Source Code: https://github.com/nyo16/grpc_connection_pool
Issues: https://github.com/nyo16/grpc_connection_pool/issues