GrpcConnectionPool

Hex.pmDocumentationLicense

A high-performance gRPC connection pooling library for Elixir with zero-GenServer-call hot path, pluggable selection strategies, and automatic health monitoring.

Overview

GrpcConnectionPool was extracted from a production Pub/Sub gRPC client and optimized for maximum throughput. The hot path (get_channel) does zero GenServer calls — channels are stored in ETS for O(1) indexed access with lock-free atomics-based round-robin.

Key Features

Architecture

Pool.Supervisor (one_for_one)
+-- PoolState (GenServer -- owns ETS, persistent_term setup)
+-- Registry (health tracking)
+-- DynamicSupervisor --> Workers
+-- WorkerStarter (Task, temporary)
+-- TelemetryReporter (GenServer, periodic status)

Hot Path

Pool.get_channel()
  -> ETS.lookup(:channel_count)            # O(1)
  -> persistent_term.get(strategy)         # O(1), zero-copy
  -> Strategy.select (atomics.add_get)     # O(1), lock-free
  -> ETS.lookup({:channel, index})         # O(1)
  -> return channel                        # no GenServer!

Design Decisions

  1. Custom pool over NimblePool — “no checkout” model where channels are returned directly. No checkout/checkin overhead.

  2. ETS + atomics over GenServer.call — Workers store channels in ETS on connect/disconnect. The hot path reads ETS directly with zero message passing.

  3. Pluggable strategies — Different workloads need different selection. Round-robin is default, but random avoids correlated hot-spotting and power-of-two handles uneven loads.

  4. Separation of Concerns:

    • Config: Configuration management with strategy support
    • Worker: Connection lifecycle, stores channels in ETS
    • Pool: Zero-GenServer hot path, strategy dispatch
    • PoolState: ETS ownership, persistent_term setup
    • Strategy: Behaviour for pluggable selection

Installation

Add grpc_connection_pool to your dependencies in mix.exs:

def deps do
  [
    {:grpc_connection_pool, "~> 0.3.0"},
    {:grpc, "~> 0.11.5"}  # Required peer dependency
  ]
end

Quick Start

1. Basic Usage

# Create configuration
{:ok, config} = GrpcConnectionPool.Config.production(
  host: "api.example.com",
  port: 443,
  pool_size: 5
)

# Start pool
{:ok, _pid} = GrpcConnectionPool.start_link(config)

# Execute gRPC operations
operation = fn channel ->
  request = %MyService.ListRequest{}
  MyService.Stub.list(channel, request)
end

{:ok, response} = GrpcConnectionPool.execute(operation)

2. Local Development

{:ok, config} = GrpcConnectionPool.Config.local(
  host: "localhost",
  port: 9090,
  pool_size: 3
)

{:ok, _pid} = GrpcConnectionPool.start_link(config)

Configuration

Configuration Options

The library supports flexible configuration through GrpcConnectionPool.Config:

{:ok, config} = GrpcConnectionPool.Config.new([
  endpoint: [
    type: :production,           # :production, :local, or custom atom
    host: "api.example.com",     # Required: gRPC server hostname
    port: 443,                   # Required: gRPC server port  
    ssl: [],                     # SSL options ([] for default SSL)
    credentials: nil,            # Custom GRPC.Credential (overrides ssl)
    interceptors: nil,           # List of gRPC client interceptors (modules)
    retry_config: [              # Optional: retry configuration
      max_attempts: 3,
      base_delay: 1000,
      max_delay: 5000
    ]
  ],
  pool: [
    size: 5,                     # Number of connections in pool
    name: MyApp.GrpcPool,        # Pool name (must be unique)
    strategy: :round_robin       # :round_robin | :random | :power_of_two | CustomModule
  ],
  connection: [
    keepalive: 30_000,           # HTTP/2 keepalive interval
    ping_interval: 25_000,       # Ping interval to keep connections warm
    health_check: true,          # Enable connection health monitoring
    suppress_connection_errors: false  # Suppress gun_down/gun_error logs (useful for GCP endpoints)
  ]
])

Configuration from Environment

In config/config.exs:

# Single service configuration
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :production,
    host: "api.example.com",
    port: 443,
    ssl: []
  ],
  pool: [
    size: 10,
    name: MyApp.GrpcPool
  ],
  connection: [
    ping_interval: 30_000
  ]

# Multiple service configuration
config :my_app, :service_a,
  endpoint: [type: :production, host: "service-a.example.com"],
  pool: [size: 5, name: MyApp.ServiceA.Pool]

config :my_app, :service_b, 
  endpoint: [type: :production, host: "service-b.example.com"],
  pool: [size: 8, name: MyApp.ServiceB.Pool]

Complete Configuration Examples

Basic Production Configuration

# For a production gRPC service with SSL
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :production,
    host: "api.example.com",
    port: 443,
    ssl: []  # Use default SSL settings
  ],
  pool: [
    size: 10,
    name: MyApp.GrpcPool
  ],
  connection: [
    keepalive: 30_000,     # Send keepalive every 30 seconds
    ping_interval: 25_000,  # Ping every 25 seconds to keep warm
    health_check: true
  ]

Local Development Configuration

# For local development with a gRPC server (no SSL)
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :local,
    host: "localhost", 
    port: 9090
  ],
  pool: [
    size: 3,  # Smaller pool for development
    name: MyApp.DevPool
  ]

Test Environment with Emulator

# For testing with Google Pub/Sub emulator or similar
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :test,
    host: "localhost",
    port: 8085,
    retry_config: [
      max_attempts: 3,
      base_delay: 1000,
      max_delay: 5000
    ]
  ],
  pool: [
    size: 2,  # Small pool for tests
    name: MyApp.TestPool
  ],
  connection: [
    ping_interval: nil  # Disable pinging in tests
  ]

Multiple Service Configuration

# Configuration for multiple gRPC services
config :my_app, :service_a,
  endpoint: [
    type: :production,
    host: "service-a.example.com",
    port: 443
  ],
  pool: [
    size: 5,
    name: MyApp.ServiceAPool
  ]

config :my_app, :service_b,  
  endpoint: [
    type: :production,
    host: "service-b.example.com", 
    port: 443
  ],
  pool: [
    size: 8,
    name: MyApp.ServiceBPool
  ]

Advanced SSL Configuration

# Custom SSL configuration with client certificates
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :production,
    host: "secure-api.example.com",
    port: 443,
    credentials: GRPC.Credential.new(ssl: [
      verify: :verify_peer,
      cacertfile: "/path/to/ca.pem",
      certfile: "/path/to/client.pem", 
      keyfile: "/path/to/client-key.pem"
    ])
  ]

Environment-Specific Configuration

# Different configurations per environment using Mix.env()
case Mix.env() do
  :prod ->
    config :my_app, GrpcConnectionPool,
      endpoint: [
        type: :production,
        host: "api.example.com",
        port: 443
      ],
      pool: [size: 20]  # Large pool for production
      
  :dev ->
    config :my_app, GrpcConnectionPool,
      endpoint: [
        type: :local,
        host: "localhost", 
        port: 9090
      ],
      pool: [size: 3]   # Small pool for development
      
  :test ->
    config :my_app, GrpcConnectionPool,
      endpoint: [
        type: :test,
        host: "localhost",
        port: 8085
      ],
      pool: [size: 1],  # Minimal pool for tests
      connection: [
        ping_interval: nil  # No pinging needed in tests
      ]
end

Load from environment:

# Single pool
{:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)

# Multiple pools  
{:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
{:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)

Runtime Configuration

For configuration determined at runtime (e.g., from environment variables):

In config/runtime.exs:

import Config

if config_env() == :prod do
  # Get configuration from environment variables
  grpc_host = System.get_env("GRPC_HOST") || "api.example.com"
  grpc_port = System.get_env("GRPC_PORT", "443") |> String.to_integer()
  pool_size = System.get_env("GRPC_POOL_SIZE", "10") |> String.to_integer()
  
  config :my_app, GrpcConnectionPool,
    endpoint: [
      type: :production,
      host: grpc_host,
      port: grpc_port
    ],
    pool: [size: pool_size]
end

Adding to Supervisor Tree

Single Pool

defmodule MyApp.Application do
  use Application
  
  def start(_type, _args) do
    # Load configuration
    {:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
    
    children = [
      # Your other services...
      {GrpcConnectionPool, config}
    ]
    
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Multiple Pools for Different Services

defmodule MyApp.Application do
  use Application
  
  def start(_type, _args) do
    # Load configurations for different services
    {:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)
    {:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
    
    children = [
      # Your other services...
      {GrpcConnectionPool, service_a_config},
      {GrpcConnectionPool, service_b_config}
    ]
    
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Manual Pool Configuration

def start(_type, _args) do
  # Create configurations programmatically
  {:ok, user_service_config} = GrpcConnectionPool.Config.production(
    host: "users.example.com",
    pool_name: MyApp.UserService.Pool,
    pool_size: 5
  )
  
  {:ok, payment_service_config} = GrpcConnectionPool.Config.production(
    host: "payments.example.com", 
    pool_name: MyApp.PaymentService.Pool,
    pool_size: 3
  )
  
  children = [
    {GrpcConnectionPool, user_service_config},
    {GrpcConnectionPool, payment_service_config}
  ]
  
  Supervisor.start_link(children, strategy: :one_for_one)
end

Complete Application Module Examples

Single Pool Application

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application
  
  def start(_type, _args) do
    # Single pool from config
    {:ok, config} = GrpcConnectionPool.Config.from_env(:my_app)
    
    children = [
      # Your other processes...
      {GrpcConnectionPool, config}
    ]
    
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Multiple Pool Application

# lib/my_app/application.ex  
defmodule MyApp.Application do
  use Application
  
  def start(_type, _args) do
    # Load configurations for different services
    {:ok, service_a_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_a)  
    {:ok, service_b_config} = GrpcConnectionPool.Config.from_env(:my_app, :service_b)
    
    children = [
      # Your other services...
      {GrpcConnectionPool, service_a_config},
      {GrpcConnectionPool, service_b_config}
    ]
    
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Usage Examples

Basic Operations

# Simple operation with default pool
operation = fn channel ->
  request = %MyService.GetUserRequest{user_id: "123"}
  MyService.Stub.get_user(channel, request)
end

case GrpcConnectionPool.execute(operation) do
  {:ok, {:ok, user}} -> IO.puts("Found user: #{user.name}")
  {:ok, {:error, error}} -> IO.puts("gRPC error: #{inspect(error)}")
  {:error, reason} -> IO.puts("Pool error: #{inspect(reason)}")
end

Using Named Pools

# Execute on specific pool
user_operation = fn channel ->
  request = %UserService.GetRequest{id: "123"}
  UserService.Stub.get(channel, request)
end

payment_operation = fn channel ->
  request = %PaymentService.ChargeRequest{amount: 1000}
  PaymentService.Stub.charge(channel, request)
end

# Use different pools for different services
{:ok, user} = GrpcConnectionPool.execute(user_operation, pool: MyApp.UserService.Pool)
{:ok, charge} = GrpcConnectionPool.execute(payment_operation, pool: MyApp.PaymentService.Pool)

Error Handling

operation = fn channel ->
  request = %MyService.CreateRequest{data: "important data"}
  MyService.Stub.create(channel, request)
end

case GrpcConnectionPool.execute(operation) do
  {:ok, {:ok, result}} -> 
    # Success case
    handle_success(result)
    
  {:ok, {:error, %GRPC.RPCError{status: status, message: message}}} ->
    # gRPC-level error (service returned error)
    handle_grpc_error(status, message)
    
  {:error, :not_connected} ->
    # Pool has no healthy connections
    handle_connection_error()
    
  {:error, {:exit, {:noproc, _}}} ->
    # Pool doesn't exist
    handle_pool_missing_error()
    
  {:error, reason} ->
    # Other pool-level errors
    handle_pool_error(reason)
end

Custom SSL Configuration

# For services requiring client certificates
ssl_config = [
  verify: :verify_peer,
  cacertfile: "/path/to/ca.pem",
  certfile: "/path/to/client.pem", 
  keyfile: "/path/to/client-key.pem"
]

{:ok, config} = GrpcConnectionPool.Config.new([
  endpoint: [
    type: :production,
    host: "secure-api.example.com",
    port: 443,
    ssl: ssl_config
  ],
  pool: [size: 5]
])

Using Custom gRPC Credentials

# For advanced authentication scenarios
credentials = GRPC.Credential.new(ssl: [
  verify: :verify_peer,
  cacerts: :public_key.cacerts_get()
])

{:ok, config} = GrpcConnectionPool.Config.new([
  endpoint: [
    type: :production,
    host: "api.example.com", 
    port: 443,
    credentials: credentials  # Overrides ssl config
  ]
])

Using gRPC Interceptors

# Define interceptors for logging, authentication, metrics, etc.
{:ok, config} = GrpcConnectionPool.Config.new([
  endpoint: [
    type: :production,
    host: "api.example.com",
    port: 443,
    interceptors: [
      MyApp.LoggingInterceptor,    # Log all requests/responses
      MyApp.AuthInterceptor,        # Add authentication headers
      MyApp.MetricsInterceptor      # Track metrics
    ]
  ],
  pool: [size: 10]
])

# Interceptors are applied to all connections in the pool
{:ok, _pid} = GrpcConnectionPool.start_link(config)

Interceptors are modules that implement the gRPC interceptor behavior and are applied to all gRPC calls made through channels from the pool. They can be used for:

Environment-Specific Usage

Development Environment

# config/dev.exs
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :local,
    host: "localhost",
    port: 9090  # Local gRPC server
  ],
  pool: [size: 2],  # Smaller pool for development
  connection: [
    ping_interval: nil  # Disable pinging for local development
  ]

Test Environment with Emulators

# config/test.exs
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :test,
    host: "localhost",
    port: 8085,  # Emulator port
    retry_config: [
      max_attempts: 3,
      base_delay: 500,   # Faster retries for tests
      max_delay: 2000
    ]
  ],
  pool: [size: 1],  # Minimal pool for tests
  connection: [
    ping_interval: nil,  # No pinging needed in tests
    health_check: false  # Disable health checks in tests
  ]

Production Environment

# config/prod.exs  
config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :production,
    host: "api.example.com",
    port: 443,
    ssl: []  # Use default SSL
  ],
  pool: [
    size: 20,  # Large pool for production load
    checkout_timeout: 10_000
  ],
  connection: [
    keepalive: 30_000,
    ping_interval: 25_000,
    health_check: true
  ]

Advanced Features

Pool Monitoring

# Check pool status
status = GrpcConnectionPool.status()
IO.inspect(status)  # %{pool_name: GrpcConnectionPool.Pool, status: :running}

# Check specific pool
status = GrpcConnectionPool.status(MyApp.UserService.Pool)

Graceful Shutdown

# Stop specific pool
:ok = GrpcConnectionPool.stop(MyApp.UserService.Pool)

# The pool will be automatically restarted by the supervisor if needed

Telemetry Events

The library emits telemetry events for observability. Attach handlers to monitor pool and connection behavior.

Pool Events

Event Measurements Metadata
[:grpc_connection_pool, :pool, :init]pool_sizepool_name, endpoint
[:grpc_connection_pool, :pool, :get_channel]durationpool_name, available_channels
[:grpc_connection_pool, :pool, :scale_up]duration, requested, succeeded, failed, new_sizepool_name
[:grpc_connection_pool, :pool, :scale_down]duration, requested, terminated, new_sizepool_name
[:grpc_connection_pool, :pool, :status]expected_size, current_sizepool_name

Channel Events

Event Measurements Metadata
[:grpc_connection_pool, :channel, :connected]durationpool_name
[:grpc_connection_pool, :channel, :connection_failed]durationpool_name, error
[:grpc_connection_pool, :channel, :disconnected]durationpool_name, reason
[:grpc_connection_pool, :channel, :ping]durationpool_name, result (:ok or :error)
[:grpc_connection_pool, :channel, :gun_down] - pool_name, reason, protocol
[:grpc_connection_pool, :channel, :gun_error] - pool_name, reason
[:grpc_connection_pool, :channel, :reconnect_scheduled]delay_ms, attemptpool_name, reason

Example: Attaching Telemetry Handlers

# In your application startup
:telemetry.attach_many(
  "grpc-pool-handler",
  [
    [:grpc_connection_pool, :pool, :init],
    [:grpc_connection_pool, :pool, :get_channel],
    [:grpc_connection_pool, :channel, :connected],
    [:grpc_connection_pool, :channel, :disconnected],
    [:grpc_connection_pool, :channel, :reconnect_scheduled]
  ],
  &MyApp.Telemetry.handle_event/4,
  nil
)

# Handler module
defmodule MyApp.Telemetry do
  require Logger

  def handle_event([:grpc_connection_pool, :pool, :init], measurements, metadata, _config) do
    Logger.info("Pool #{metadata.pool_name} initialized with #{measurements.pool_size} connections to #{metadata.endpoint}")
  end

  def handle_event([:grpc_connection_pool, :channel, :connected], measurements, metadata, _config) do
    duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
    Logger.debug("Channel connected for #{metadata.pool_name} in #{duration_ms}ms")
  end

  def handle_event([:grpc_connection_pool, :channel, :reconnect_scheduled], measurements, metadata, _config) do
    Logger.warning("Reconnect scheduled for #{metadata.pool_name}: attempt #{measurements.attempt}, delay #{measurements.delay_ms}ms, reason: #{inspect(metadata.reason)}")
  end

  def handle_event(_event, _measurements, _metadata, _config), do: :ok
end

Connection Health

The library automatically monitors connection health and replaces dead connections. You can also check worker status:

# This is mainly for debugging - not needed in normal usage
worker_pid = :poolex.checkout(MyApp.UserService.Pool)
status = GrpcConnectionPool.Worker.status(worker_pid)  # :connected | :disconnected
:poolex.checkin(MyApp.UserService.Pool, worker_pid)

Testing

Unit Tests

mix test --exclude emulator

Integration Tests with Emulator

Start the Google Pub/Sub emulator (or any gRPC service emulator):

# Using Docker Compose (if available)
docker-compose up -d

# Or manually
docker run --rm -p 8085:8085 google/cloud-sdk:emulators-pubsub \
  /google-cloud-sdk/bin/gcloud beta emulators pubsub start \
  --host-port=0.0.0.0:8085 --project=test-project

Run integration tests:

mix test --only emulator

Run all tests:

mix test

Testing in Your Application

# In your test helper
defmodule MyApp.TestHelper do
  def start_test_pool do
    {:ok, config} = GrpcConnectionPool.Config.local(
      host: "localhost",
      port: 8085,  # Your test service port
      pool_size: 1
    )
    
    {:ok, _pid} = GrpcConnectionPool.start_link(config, name: TestPool)
  end
  
  def stop_test_pool do
    GrpcConnectionPool.stop(TestPool)
  end
end

# In your tests
defmodule MyApp.GrpcTest do
  use ExUnit.Case
  
  setup do
    MyApp.TestHelper.start_test_pool()
    on_exit(&MyApp.TestHelper.stop_test_pool/0)
  end
  
  test "grpc operation works" do
    operation = fn channel ->
      # Your gRPC test operation
    end
    
    assert {:ok, result} = GrpcConnectionPool.execute(operation, pool: TestPool)
  end
end

Connection Strategies

The pool supports pluggable channel selection strategies via the GrpcConnectionPool.Strategy behaviour.

Built-in Strategies

Strategy Config Description Best For
Round Robin:round_robin Lock-free atomics counter, cycles through channels sequentially General use (default)
Random:random Random channel selection via :rand.uniform Avoiding correlated hot-spotting
Power of Two:power_of_two Pick 2 random channels, choose least-recently-used Uneven workloads

Configuring a Strategy

{:ok, config} = GrpcConnectionPool.Config.new(
  endpoint: [host: "api.example.com", port: 443],
  pool: [size: 10, strategy: :random]
)

Custom Strategies

Implement the GrpcConnectionPool.Strategy behaviour:

defmodule MyApp.WeightedStrategy do
  @behaviour GrpcConnectionPool.Strategy

  @impl true
  def init(_pool_name, _pool_size) do
    # Return any state — stored in :persistent_term
    %{weights: [0.5, 0.3, 0.2]}
  end

  @impl true
  def select(state, channel_count, _ets_table) do
    # Must return {:ok, index} where 0 <= index < channel_count
    {:ok, weighted_random(state.weights, channel_count)}
  end

  defp weighted_random(_weights, count), do: :rand.uniform(count) - 1
end

Then use it:

config = GrpcConnectionPool.Config.new(
  pool: [strategy: MyApp.WeightedStrategy]
)

Strategy Performance

Benchmarked with pool_size=10:

Strategy Throughput Avg Latency Memory/call
round_robin 1.91M ips 522 ns 0.72 KB
random 1.87M ips 534 ns 1.05 KB
power_of_two 1.08M ips 925 ns 2.05 KB

Performance Considerations

Pool Sizing

Connection Settings

config :my_app, GrpcConnectionPool,
  connection: [
    keepalive: 30_000,     # Keep connections alive (important for cloud services)
    ping_interval: 25_000, # Ping before cloud timeouts (usually 60s)
    health_check: true     # Enable automatic health monitoring
  ]

Monitoring

Monitor pool performance using:

Troubleshooting

Common Issues

  1. :noproc errors: Pool not started or wrong pool name
  2. Connection timeouts: Network issues or service unavailable
  3. SSL errors: Incorrect SSL configuration or certificates
  4. Pool checkout timeouts: Pool too small or slow operations

Debug Logging

Enable debug logging to troubleshoot issues:

# config/config.exs
config :logger, level: :info

# In your code
require Logger
Logger.info("Pool status: #{inspect(GrpcConnectionPool.status())}")

Health Check Failures

If you see frequent reconnections:

  1. Check network connectivity
  2. Verify service availability
  3. Adjust ping_interval and keepalive settings
  4. Check for firewall or load balancer timeouts

Connection Error Messages with GCP Services

GCP gRPC endpoints have hardcoded timeouts that periodically close connections, causing gun_down error messages. This is normal behavior and the connection pool will automatically replace the workers. To suppress these expected error messages:

config :my_app, GrpcConnectionPool,
  endpoint: [
    type: :production,
    host: "pubsub.googleapis.com",  # Or other GCP gRPC endpoints
    port: 443
  ],
  connection: [
    suppress_connection_errors: true  # Suppress expected gun_down errors from GCP
  ]

The worker processes will still reconnect with backoff and the pool will automatically recover.

Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Add tests for your changes
  5. Run the test suite (mix test)
  6. Commit your changes (git commit -m 'Add amazing feature')
  7. Push to the branch (git push origin feature/amazing-feature)
  8. Open a Pull Request

Development Setup

git clone https://github.com/nyo16/grpc_connection_pool.git
cd grpc_connection_pool
mix deps.get
mix test

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments


Documentation: https://hexdocs.pm/grpc_connection_pool
Source Code: https://github.com/nyo16/grpc_connection_pool
Issues: https://github.com/nyo16/grpc_connection_pool/issues