KafkaEx
Apache Kafka client for Elixir/Erlang.
Usage
Add KafkaEx to your mix.exs:
{:kafka_ex, "~> 0.0.1"}And run:
mix deps.getConfiguration
In your config/config.exs add the list of kafka brokers as below:
config KafkaEx,
brokers: [{HOST, PORT}]Alternatively from iex:
iex> Application.put_env(KafkaEx, :brokers, [{"localhost", 9092}, {"localhost", 9093}])
:okCreate KafkaEx worker
iex> KafkaEx.create_worker(:pr) # where :pr is the process name of the created worker
{:ok, #PID<0.171.0>}Retrieve kafka metadata
For all metadata
iex> KafkaEx.metadata
%{brokers: %{1 => {"localhost", 9092}},
topics: %{"foo" => %{error_code: 0,
partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}},
"bar" => %{error_code: 0,
partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}}}}For a specific topic
iex> KafkaEx.metadata(topic: "foo")
%{brokers: %{1 => {"localhost", 9092}},
topics: %{"foo" => %{error_code: 0,
partitions: %{0 => %{error_code: 0, isrs: [1], leader: 1, replicas: [1]}}}}}Retrieve the latest offset
iex> KafkaEx.latest_offset("foo", 0) # where 0 is the partition
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [16]}}}}Retrieve the earliest offset
iex> KafkaEx.earliest_offset("foo", 0) # where 0 is the partition
{:ok, %{"foo" => %{0 => %{error_code: 0, offsets: [0]}}}}Fetch kafka logs
iex> KafkaEx.fetch("foo", 0, 5) # where 0 is the partition and 5 is the offset we want to start fetching from
{:ok,
%{"foo" => %{0 => %{error_code: 0, hw_mark_offset: 133,
message_set: [%{attributes: 0, crc: 4264455069, key: nil, offset: 5,
value: "hey"},
%{attributes: 0, crc: 4264455069, key: nil, offset: 6, value: "hey"},
...]}}}}Produce kafka logs
iex> KafkaEx.produce("foo", 0, "hey") # where "foo" is the topic and "hey" is the message
:okStream kafka logs
iex> KafkaEx.create_worker([{"localhost", 9092}], :stream)
{:ok, #PID<0.196.0>}
iex> KafkaEx.produce("foo", 0, "hey", :stream)
:ok
iex> KafkaEx.produce("foo", 0, "hi", :stream)
:ok
iex> KafkaEx.stream("foo", 0) |> iex> Enum.take(2)
[%{attributes: 0, crc: 4264455069, key: nil, offset: 0, value: "hey"},
%{attributes: 0, crc: 4251893211, key: nil, offset: 1, value: "hi"}]Test
Unit tests
mix test --no-startIntegration tests
Add the broker config to config/config.exs and run:
mix test --only integrationStatic analysis
mix dialyze --unmatched-returns --error-handling --race-conditions --underspecsContributing
Please see CONTRIBUTING.md