Elixir Client for Apache Pulsar

CICoverage StatusPackage Versionhexdocs.pm

Tip

Using Broadway? Check out the companion project: off_broadway_pulsar.

An Elixir client for Apache Pulsar.

Installation

Add :pulsar_elixir to your dependencies in mix.exs:

def deps do
[
{:pulsar, "~> 2.8.18", hex: :pulsar_elixir}
]
end

Quick Start

Assuming you have Pulsar running on localhost:6650, the quickest way to consume messages from a Pulsar topic is using the Reader interace as shown below

"persistent://my-tenant/my-namespace/my-topic"
|> Pulsar.Reader.stream(host: "pulsar://localhost:6650", timeout: 100)
|> Enum.map(fn msg -> String.to_integer(msg.payload) end)
|> Enum.filter(fn n -> rem(n, 2) == 0 end)
|> Enum.map(fn n -> n * 2 end)

For more complex scenarios and assuming that you have implemented a basic consumer like the one below:

defmodule MyPulsarConsumer do
use Pulsar.Consumer.Callback
def handle_message(message, state) do
IO.puts("Received: #{message.payload}")
{:ok, state}
end
end

You can start producing and consuming messages with the following configuration:

config :pulsar,
host: "pulsar://localhost:6650",
consumers: [
my_consumer: [
topic: "persistent://my-tenant/my-namespace/my-topic",
subscription_name: "my-subscription",
callback_module: MyPulsarConsumer
]
],
producers: [
my_producer: [
topic: "persistent://my-tenant/my-namespace/my-topic"
]
]

Sending a message using the configured producer can be done as follows:

Pulsar.send(:my_producer, "Hello, Pulsar!")

By default, brokers, consumers and producers are started within the scope of the :default client, but you can also configure multiple clients (which may come in handy if you need to connect to multiple clusters).

clients: [
client_1: [
host: "pulsar://host.cluster1.com:6650"
],
client_2: [
host: "pulsar://host.cluster2.com:6650"
]
]

Then, you can specify the client in the consumer or producer configuration using the client key, eg. client: :client_1.

producers: [
my_producer_1: [
client: :client_1
topic: "persistent://my-tenant/my-namespace/my-topic"
]
]

If your Pulsar cluster requires authentication, you can configure it in the client using the auth key:

auth: [
type: Pulsar.Auth.OAuth2,
opts: [
client_id: "<YOUR-OAUTH2-CLIENT-ID>",
client_secret: "<YOUR-OAUTH2-CLIENT-SECRET>",
site: "<YOUR-OAUTH2-ISSUER-URL>",
audience: "<YOUR-OAUTH2-AUDIENCE>"
]
]

Testing

Important

Do not forget to add the following line to your /etc/hosts file before running the tests:

127.0.0.1 broker1 broker2

To run the tests, run the following command:

mix test

If you want to run only a subset of tests, specify the file including the tests you want to run

mix test test/integration/consumer_test.exs

You can also run individual tests by passing the line number where they are defined

mix test test/integration/consumer_test.exs:43

The examples directory includes a number of examples that demonstrate the use of the Pulsar client. For example:

mix run --no-start examples/bingo.exs

Features

The full feature matrix for Apache Pulsar can be found here.

ComponentFeatureSupported
ClientTLS encryption
ClientAuthentication⚠️
ClientTransaction
ClientStatistics
ProducerSync send
ProducerAsync send
ProducerBatching
ProducerChunking
ProducerCompression
ProducerSchema
ProducerPartitioned topics
ProducerAccess modes
ConsumerACK
ConsumerBatch-index ACK
ConsumerNACK
ConsumerNACK back-off
ConsumerBatching
ConsumerPartitioned topics
ConsumerChunking
ConsumerSeek
ConsumerSubscription types
ConsumerSubscription modes
ConsumerRetry letter topic
ConsumerDead letter topic
ConsumerCompression
ConsumerCompaction
ConsumerSchema
ConsumerConfigurable flow control settings
Reader
TableView