KafkaBatcher

A library to increase the throughput of producing messages (coming one at a time) to Kafka by accumulating these messages into batches.

Installation

  1. Add KafkaBatcher to your list of dependencies in mix.exs:
    def deps do
      [
        {:kafka_batcher, "~> 1.1.0"},
        # and one of kafka libraries
        # {:kaffe, "~> 1.24"}
        # or
        # {:kafka_ex, "~> 0.12"}
      ]
    end
  1. Add KafkaBatcher supervisor to your supervisor tree

    def start(_type, _args) do
     children = [
       # Describe the child spec
       KafkaBatcher.Supervisor
     ]
    
     opts = [strategy: :one_for_one, name: MyApp.Supervisor, max_restarts: 3, max_seconds: 5]
     Supervisor.start_link(children, opts)
    end
  2. Configure a KafkaBatcher Producer

Config example:

  config :kafka_batcher, KafkaBatcher.Collector1, topic_name: "topic1"
  config :kafka_batcher, KafkaBatcher.Collector2, topic_name: "topic2"
  config :kafka_batcher, KafkaBatcher.Collector3, topic_name: "topic3"
  config :kafka_batcher, KafkaBatcher.Collector4, topic_name: "topic4"
  config :kafka_batcher, KafkaBatcher.Collector5, topic_name: "topic5"

  config :kafka_batcher, collectors:
          [
            KafkaBatcher.Collector1,
            KafkaBatcher.Collector2,
            KafkaBatcher.Collector3,
            KafkaBatcher.Collector4,
            KafkaBatcher.Collector5
          ]

  config :kafka_batcher, :json_library, Jason
          
  config :kafka_batcher, :kafka,
    endpoints: "localhost:9092",
    # in case you use SASL
    # sasl: %{mechanism: :scram_sha_512, login: "login", password: "password"},
    # ssl: true,
    telemetry: true,
    allow_topic_auto_creation: false,
    kafka_topic_aliases: %{
      "real_topic_name1" => "incoming-events",
      "real_topic_name2" => "special-topic"
    }
  
  # In case you use KafkaEx, you need to disable default worker to avoid crashes
  config :kafka_ex, :disable_default_worker, true

Available parameters:

Important: The size of one message should not exceed max_batch_bytesize setting. If you need to work with large messages you must increase max_batch_bytesize value and value of Kafka topic setting max.message.bytes as well.

Note: you can still produce messages to any Kafka topic (even if it is not described in the kafka_batcher config) by using direct calls of Kaffe or KafkaEx.

Usage

Collector examples

  defmodule MyApp.Collector1 do
    use KafkaBatcher.Collector, 
      collect_by_partition: true,
      partition_fn: &MyApp.Collector1.calculate_partition/4,
      required_acks: -1,
      batch_size: 30,
      max_wait_time: 20_000
                                        
    def calculate_partition(_topic, partitions_count, _key, value) do
      val = value["client_id"] || value["device_id"]
      :erlang.phash2(val, partitions_count)
    end
  end
  
  defmodule MyApp.Collector2 do
    use KafkaBatcher.Collector, 
      collect_by_partition: false,
      required_acks: 0,
      batch_size: 10,
      max_wait_time: 20_000
    end

  defmodule MyApp.Collector3 do
    use KafkaBatcher.Collector, 
      collect_by_partition: false,
      required_acks: 0,
      batch_size: 10,
      max_wait_time: 20_000,
      partition_strategy: :random
  end

  defmodule MyApp.Collector4 do
    use KafkaBatcher.Collector, 
      collect_by_partition: true,
      required_acks: 0,
      partition_fn: &MyApp.Collector4.calculate_partition/4,
      batch_size: 50,
      batch_flusher: MyApp.Collector4.BatchFlusher

    def calculate_partition(_topic, partitions_count, _key, value) do
      rem(key, partitions_count)
    end
    
    defmodule BatchFlusher do
      def flush?(_key, %{"type" => "SpecialType"}) do
        true
      end

      def flush?(_key, _value) do
        false
      end
    end
  end
  
  defmodule MyApp.Collector5 do
    use KafkaBatcher.Collector, 
      collect_by_partition: false,
      partition_strategy: :md5,
      batch_size: 100,
      max_wait_time: 20_000,
      min_delay: 100,
      max_batch_bytesize: 1_000_000
  end

Collector usage

  defmodule MyApp.MyModule do
    ...
    def produce_to_kafka_topic1(event)  do
      MyApp.Collector1.add_events(event)
    end

    def produce_to_kafka_topic2(event)  do
      MyApp.Collector2.add_events(event)
    end
    ...
  end

Getting current config of topic

    KafkaBatcher.Config.get_collector_config("topic1")

Getting all topics with config

    KafkaBatcher.Config.get_configs_by_topic()

Testing

mix test

or

mix test --cover

see https://github.com/parroty/excoveralls for details

Prometheus metrics

The library exposes the following metrics using the PromEx exporter plugin: