Huginn

ClickHouse client for Elixir using gRPC with connection pooling.

Features

Installation

Add huginn to your list of dependencies in mix.exs:

def deps do
  [
    {:huginn, "~> 0.3.0"}
  ]
end

Configuration

Configure the ClickHouse connection in your config/config.exs:

config :huginn, :clickhouse,
  host: "localhost",
  port: 9100,
  database: "default",
  auth: {:password, "default", ""},
  pool_size: 5

Configuration Options

Option Default Description
:host (required) ClickHouse server hostname
:port9100 gRPC port
:database"default" Default database
:authnil{:password, user, pass} or {:jwt, token}
:pool_size5 Number of connections
:sslfalse Enable SSL/TLS
:pool_name:clickhouse_pool Pool name for multiple pools

Production Configuration

# config/prod.exs
config :huginn, :clickhouse,
  host: System.get_env("CLICKHOUSE_HOST"),
  port: String.to_integer(System.get_env("CLICKHOUSE_PORT", "9100")),
  database: System.get_env("CLICKHOUSE_DATABASE", "default"),
  auth: {:password, System.get_env("CLICKHOUSE_USER"), System.get_env("CLICKHOUSE_PASSWORD")},
  pool_size: 10,
  ssl: true

Usage

gRPC Methods Overview

gRPC Method Function Use Case
ExecuteQueryquery/2, insert/3 Simple queries, small inserts
ExecuteQueryWithStreamInputinsert_stream/3 Large file imports
ExecuteQueryWithStreamOutputstream_query/2 Large result sets
ExecuteQueryWithStreamIOstream_io/1 Bidirectional streaming

Simple Queries (ExecuteQuery)

# Execute a query
{:ok, result} = Huginn.query("SELECT * FROM system.tables LIMIT 10")

# Get results as maps
maps = Huginn.Clickhouse.Result.to_maps(result)

# Query with options
{:ok, result} = Huginn.query(
  "SELECT * FROM users WHERE status = 'active'",
  database: "mydb",
  format: "JSONEachRow",
  timeout: 30_000
)

# Raising version
result = Huginn.query!("SELECT 1")

Inserts (ExecuteQuery)

# Simple insert with TabSeparated data
data = "john\t25\njane\t30"
{:ok, _} = Huginn.insert(
  "INSERT INTO users (name, age) FORMAT TabSeparated",
  data
)

# JSONEachRow format
data = ~s({"name":"john","age":25}\n{"name":"jane","age":30})
{:ok, _} = Huginn.insert("INSERT INTO users FORMAT JSONEachRow", data)

Streaming Inserts (ExecuteQueryWithStreamInput)

# Stream from a file
File.stream!("large_data.csv", [], 65_536)
|> Huginn.insert_stream("INSERT INTO logs FORMAT CSV")

# Stream with progress tracking using Agent
{:ok, counter} = Agent.start_link(fn -> 0 end)

large_data
|> Stream.chunk_every(1000)
|> Stream.map(fn chunk ->
  Agent.update(counter, &(&1 + length(chunk)))
  Enum.join(chunk, "\n")
end)
|> Huginn.insert_stream("INSERT INTO events FORMAT TabSeparated")

IO.puts("Inserted #{Agent.get(counter, & &1)} rows")
Agent.stop(counter)

Streaming Results (ExecuteQueryWithStreamOutput)

# Basic streaming
Huginn.stream_query("SELECT * FROM large_table")
|> Enum.each(fn
  {:ok, result} -> process_chunk(result)
  {:error, error} -> Logger.error("Error: #{inspect(error)}")
end)

# Stream rows directly
Huginn.stream_rows("SELECT * FROM events")
|> Stream.take(1000)
|> Enum.to_list()

# Stream as maps
Huginn.stream_maps("SELECT name, age FROM users")
|> Stream.filter(fn %{"age" => age} -> String.to_integer(age) > 18 end)
|> Enum.to_list()

Using Agent to Accumulate Results

# Accumulate all rows with error counting
{:ok, agent} = Agent.start_link(fn -> %{rows: [], errors: 0} end)

Huginn.stream_query("SELECT * FROM metrics")
|> Enum.each(fn
  {:ok, result} ->
    Agent.update(agent, fn state ->
      %{state | rows: state.rows ++ result.rows}
    end)
  {:error, _} ->
    Agent.update(agent, fn state ->
      %{state | errors: state.errors + 1}
    end)
end)

final_state = Agent.get(agent, & &1)
IO.puts("Got #{length(final_state.rows)} rows, #{final_state.errors} errors")
Agent.stop(agent)

Custom Stream.resource Pattern

defmodule MyApp.ClickHouseStream do
  @moduledoc "Custom streaming with backpressure control"

  def stream_with_backpressure(query, batch_size \\ 100) do
    Stream.resource(
      fn -> init_query(query) end,
      fn state -> next_batch(state, batch_size) end,
      fn _state -> :ok end
    )
  end

  defp init_query(query) do
    stream = Huginn.stream_rows(query)
    %{stream: stream, buffer: [], done: false}
  end

  defp next_batch(%{done: true} = state, _batch_size) do
    {:halt, state}
  end

  defp next_batch(%{stream: stream, buffer: buffer} = state, batch_size) do
    {rows, rest} =
      stream
      |> Stream.take(batch_size - length(buffer))
      |> Enum.to_list()
      |> then(fn new_rows -> {buffer ++ new_rows, stream} end)

    if length(rows) < batch_size do
      {[rows], %{state | done: true}}
    else
      {[Enum.take(rows, batch_size)], %{state | buffer: Enum.drop(rows, batch_size)}}
    end
  end
end

# Usage
MyApp.ClickHouseStream.stream_with_backpressure("SELECT * FROM events")
|> Enum.each(fn batch ->
  process_batch(batch)
  Process.sleep(100)  # Rate limiting
end)

Bidirectional Streaming (ExecuteQueryWithStreamIO)

# Interactive query session
{output, send} = Huginn.stream_io()

# Send queries
send.(Huginn.Clickhouse.Query.build("SELECT 1"))
send.(Huginn.Clickhouse.Query.build("SELECT 2"))

# Process results
Task.async(fn ->
  Enum.each(output, fn
    {:ok, result} -> IO.inspect(result.rows)
    {:error, err} -> IO.puts("Error: #{inspect(err)}")
  end)
end)

Query Cancellation

# Start a long-running query with custom ID
query_id = "my-long-query-#{System.unique_integer()}"

task = Task.async(fn ->
  Huginn.query("SELECT sleep(300)", query_id: query_id)
end)

# Cancel after some time
Process.sleep(5_000)
:ok = Huginn.cancel(query_id)

# Cancel queries by condition
Huginn.cancel_where("elapsed > 60")
Huginn.cancel_where("user = &#39;admin&#39;")

# List running queries
{:ok, result} = Huginn.running_queries()
Huginn.Clickhouse.Result.to_maps(result)
|> Enum.each(fn q -> IO.puts("#{q["query_id"]}: #{q["query"]}") end)

Health Check

case Huginn.ping() do
  :ok -> IO.puts("Connected!")
  {:error, reason} -> IO.puts("Failed: #{inspect(reason)}")
end

Development

Prerequisites

Start ClickHouse

docker-compose up -d

This starts ClickHouse with gRPC enabled on port 9100.

Run Tests

mix test

Generate Documentation

mix docs

Regenerate Proto Files

If you need to regenerate the proto files after updating the .proto file:

# Install protoc-gen-elixir
mix escript.install hex protobuf

# Generate Elixir code
protoc --elixir_out=plugins=grpc:./lib/huginn/proto \
  --proto_path=./priv/protos \
  clickhouse_grpc.proto

Architecture

lib/huginn/
├── clickhouse/
│   ├── client.ex      # High-level API (all 4 gRPC methods)
│   ├── config.ex      # Configuration management
│   ├── query.ex       # QueryInfo message builders
│   ├── result.ex      # Result parsing utilities
│   └── stream.ex      # Streaming helpers
└── proto/
    └── clickhouse_grpc.pb.ex  # Generated protobuf code

License

MIT