Broadway Producer for Apache Pulsar

CICoverage StatusPackage Versionhexdocs.pm

A Broadway producer for Apache Pulsar, built on top of pulsar-elixir.

Installation

Add :off_broadway_pulsar to your dependencies in mix.exs:

def deps do
  [
    {:off_broadway_pulsar, "~> 1.3.5"}
  ]
end

Quick Start

Assuming you have Pulsar running on localhost:6650, you can create a Broadway pipeline like this:

defmodule MyApp.PulsarPipeline do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {OffBroadway.Pulsar.Producer,
          host: "pulsar://localhost:6650",
          topics: [persistent://public/default/my-topic"],
          subscription: "my-subscription"
        },
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 10]
      ],
      batchers: [
        default: [
          batch_size: 100,
          batch_timeout: 1000
        ]
      ]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    IO.inspect(message.data, label: "Received")
    message
  end

  @impl true
  def handle_batch(_batcher, messages, _batch_info, _context) do
    IO.inspect(length(messages), label: "Batch size")
    messages
  end
end

If you're running Pulsar globally in your application supervision tree, omit the :host option and optionally specify :client:

producer: [
  module: {OffBroadway.Pulsar.Producer,
    topic: "persistent://public/default/my-topic",
    subscription: "my-subscription",
    client: :default  # Optional, defaults to :default
  },
  concurrency: 1
]