ElasticsearchElixirBulkProcessor

Hex.pmCircleCICoverage StatusHex.pm

Elasticsearch Elixir Bulk Processor is a configurable manager for efficiently inserting data into Elasticsearch. This processor uses GenStages for handling backpressure, and various settings to control the bulk payloads being uploaded to Elasticsearch.

Inspired by the Java Bulk Processor. Uses elasticsearch-elixir as the client.

Installation

If available in Hex, the package can be installed by adding elasticsearch_elixir_bulk_processor to your list of dependencies in mix.exs:

def deps do
[
{:elasticsearch_elixir_bulk_processor, "~> 0.1.0"}
]
end

Sending data

ElasticsearchElixirBulkProcessor.send_requests(list_of_items)

To send a list of request items to Elasticsearch. This mechanism uses GenStages for back pressure. NOTE: It should be completely reasonable to use this function by passing single element lists, the mechanism aggregates the items together prior to sending them.

The list elements must be structs:

Examples

iex> alias ElasticsearchElixirBulkProcessor.Items.Index
...> [
...> %Index{index: "test_index", source: %{"field" => "value1"}},
...> %Index{index: "test_index", source: %{"field" => "value2"}},
...> %Index{index: "test_index", source: %{"field" => "value3"}}
...> ]
...> |> ElasticsearchElixirBulkProcessor.send_requests()
:ok

Configuration

Elasticsearch endpoint

Can be configurate via the ELASTICSEARCH_URL environment variable, defaults to: "http://localhost:9200". Alternatively:

config :elasticsearch_elixir_bulk_processor,
ElasticsearchElixirBulkProcessor.ElasticsearchCluster,
url: "http://localhost:9200",
api: Elasticsearch.API.HTTP

See the client configuration for more.

Action count

Number of actions/items to send per bulk (can be changed at run time)

ElasticsearchElixirBulkProcessor.set_event_count_threshold(100)

Byte size

Max number of bytes to send per bulk (can be changed at run time)

ElasticsearchElixirBulkProcessor.set_byte_threshold(100)

Action order

Preservation of order of actions/items

config :elasticsearch_elixir_bulk_processor, preserve_event_order: false

Retries

Retry policy, this uses the ElixirRetry DSL. See ElasticsearchElixirBulkProcessor.Bulk.Retry.policy.

config :elasticsearch_elixir_bulk_processor, retry_function: &MyApp.Retry.policy/0

Success and error handlers

The callbacks on a successful upload or in case of failed items or failed request can bet set through the config. On success, the handler is called with the Elasticsearch bulk request. On failure, the hanlder is called with%{data: any, error: any}, data being the original payload and error being the response or HTTP error. See ElasticsearchElixirBulkProcessor.Bulk.Handlers.

config :elasticsearch_elixir_bulk_processor,
success_function: &MyApp.success_handler/1,
error_function: &MyApp.error_handler/1

Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/elasticsearch_elixir_bulk_processor.

Testing script

Run Elasticsearch set up with:

docker-compose up

Run test:

mix insert_test <INSERT_COUNT> <BULK_SIZE> staged|direct