QuiqupAvroEx

Hex.pmCICoverage

Avro encoder/decoder with Confluent Schema Registry support using AvroEx.

This library provides a simple API for encoding and decoding Avro messages using the Confluent wire format, with built-in schema registry integration and caching.

Why QuiqupAvroEx?

We built this as a replacement for Avrora after encountering schema parsing issues with erlavro (the Erlang Avro library). AvroEx is a pure Elixir implementation that handles complex schemas more reliably.

Installation

Add quiqup_avro_ex to your list of dependencies in mix.exs:

def deps do
  [
    {:quiqup_avro_ex, "~> 0.1.0"}
  ]
end

Configuration

config :quiqup_avro_ex,
  registry_url: "http://localhost:8081",
  schemas_path: "./priv/schemas",
  cache_ttl: :timer.minutes(5)

Or use environment variables:

Usage

Decoding Messages

Decode Confluent wire format messages (magic byte + schema ID + payload):

# Decode a Kafka message
{:ok, data} = QuiqupAvroEx.decode(message)

Encoding Messages

Encode data using a schema name or ID:

# Using schema name (looks up from registry or local files)
{:ok, encoded} = QuiqupAvroEx.encode(data, schema_name: "my.namespace.MyRecord")

# Using schema ID directly
{:ok, encoded} = QuiqupAvroEx.encode(data, schema_id: 123)

Schema Registration

Register schemas with the Schema Registry:

# Register from local file (schema_name maps to file path)
{:ok, schema_id} = QuiqupAvroEx.register_schema("my.namespace.MyRecord")

# Register with explicit schema JSON
schema_json = ~s({"type": "record", "name": "MyRecord", ...})
{:ok, schema_id} = QuiqupAvroEx.register_schema("my.namespace.MyRecord", schema_json)

Schema Management

# Get schema by ID
{:ok, schema} = QuiqupAvroEx.get_schema(123)

# Get schema by name (returns {schema, schema_id})
{:ok, {schema, schema_id}} = QuiqupAvroEx.get_schema_by_name("my.namespace.MyRecord")

# List all schema versions
{:ok, versions} = QuiqupAvroEx.get_schema_versions("my.namespace.MyRecord")

# Check compatibility
{:ok, true} = QuiqupAvroEx.check_compatibility("my.namespace.MyRecord", new_schema_json)

# Delete a schema
:ok = QuiqupAvroEx.delete_schema("my.namespace.MyRecord")

# Clear the cache
:ok = QuiqupAvroEx.clear_cache()

Wire Format Utilities

# Parse wire format header
{:ok, schema_id, payload} = QuiqupAvroEx.parse_wire_format(message)

# Build wire format message
message = QuiqupAvroEx.build_wire_format(schema_id, encoded_payload)

Local Schema Files

Schemas can be loaded from local files as a fallback when the registry is unavailable. The file path is derived from the schema name:

Migration from Avrora

QuiqupAvroEx provides API compatibility with Avrora:

# Before (Avrora)
{:ok, data} = Avrora.decode(message)
{:ok, encoded} = Avrora.encode(data, schema_name: "my.schema")

# After (QuiqupAvroEx)
{:ok, data} = QuiqupAvroEx.decode(message)
{:ok, encoded} = QuiqupAvroEx.encode(data, schema_name: "my.schema")

License

MIT