Broadway Producer for Apache Pulsar

CICoverage StatusPackage Versionhexdocs.pm

A Broadway producer for Apache Pulsar, built on top of pulsar-elixir.

Installation

Add :off_broadway_pulsar to your dependencies in mix.exs:

def deps do
[
{:off_broadway_pulsar, "~> 1.3.11"}
]
end

Quick Start

Assuming you have Pulsar running on localhost:6650, you can create a Broadway pipeline like this:

defmodule MyApp.PulsarPipeline do
use Broadway
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {OffBroadway.Pulsar.Producer,
host: "pulsar://localhost:6650",
topics: [persistent://public/default/my-topic"],
subscription: "my-subscription"
},
concurrency: 1
],
processors: [
default: [concurrency: 10]
],
batchers: [
default: [
batch_size: 100,
batch_timeout: 1000
]
]
)
end
@impl true
def handle_message(_processor, message, _context) do
IO.inspect(message.data, label: "Received")
message
end
@impl true
def handle_batch(_batcher, messages, _batch_info, _context) do
IO.inspect(length(messages), label: "Batch size")
messages
end
end

If you're running Pulsar globally in your application supervision tree, omit the :host option and optionally specify :client:

producer: [
module: {OffBroadway.Pulsar.Producer,
topic: "persistent://public/default/my-topic",
subscription: "my-subscription",
client: :default # Optional, defaults to :default
},
concurrency: 1
]