Client Helpers

edit

The Elasticsearch Ruby client includes some useful helpers for a more comfortable experience with some APIs.

Bulk Helper

edit

The Bulk API in Elasticsearch allows you to perform multiple indexing or deletion operations through a single API call, resulting in reduced overhead and improved indexing speed. The actions are specified in the request body using a newline delimited JSON (NDJSON) structure. In the Elasticsearch Ruby client, the bulk method supports several data structures as a parameter. You can use the Bulk API in an idiomatic way without concerns about payload formatting. Refer to Bulk requests for more information.

The BulkHelper provides a better developer experience when using the Bulk API. At its simplest, you can send it a collection of hashes in an array, and it will bulk ingest them into Elasticsearch.

To use the BulkHelper, require it in your code:

require 'elasticsearch/helpers/bulk_helper'

Instantiate a BulkHelper with a client, and an index:

client = Elasticsearch::Client.new
bulk_helper = Elasticsearch::Helpers::BulkHelper.new(client, index)

This helper works on the index you pass in during initialization, but you can change the index at any time in your code:

bulk_helper.index = 'new_index'

If you want to index a collection of documents, use the ingest method:

documents = [
  { name: 'document1', date: '2024-05-16' },
  { name: 'document2', date: '2023-12-19' },
  { name: 'document3', date: '2024-07-07' }
]
bulk_helper.ingest(documents)

If you’re ingesting a large set of data and want to separate the documents into smaller pieces before sending them to Elasticsearch, use the slice parameter.

bulk_helper.ingest(documents, { slice: 2 })

This way the data will be sent in two different bulk requests.

You can also include the parameters you would send to the Bulk API either in the query parameters or in the request body. The method signature is ingest(docs, params = {}, body = {}, &block). Additionally, the method can be called with a block, that will provide access to the response object received from calling the Bulk API and the documents sent in the request:

helper.ingest(documents) { |_, docs| puts "Ingested #{docs.count} documents" }

You can update and delete documents with the BulkHelper too. To delete a set of documents, you can send an array of document ids:

ids = ['shm0I4gB6LpJd9ljO9mY', 'sxm0I4gB6LpJd9ljO9mY', 'tBm0I4gB6LpJd9ljO9mY', 'tRm0I4gB6LpJd9ljO9mY', 'thm0I4gB6LpJd9ljO9mY', 'txm0I4gB6LpJd9ljO9mY', 'uBm0I4gB6LpJd9ljO9mY', 'uRm0I4gB6LpJd9ljO9mY', 'uhm0I4gB6LpJd9ljO9mY', 'uxm0I4gB6LpJd9ljO9mY']
helper.delete(ids)

To update documents, you can send the array of documents with their respective ids:

documents = [
  {name: 'updated name 1', id: 'AxkFJYgB6LpJd9ljOtr7'},
  {name: 'updated name 2', id: 'BBkFJYgB6LpJd9ljOtr7'}
]
helper.update(documents)

Ingest a JSON file

edit

BulkHelper also provides a helper to ingest data straight from a JSON file. By giving a file path as an input, the helper will parse and ingest the documents in the file:

file_path = './data.json'
helper.ingest_json(file_path)

In cases where the array of data you want to ingest is not necessarily in the root of the JSON file, you can provide the keys to access the data, for example given the following JSON file:

{
  "field": "value",
  "status": 200,
  "data": {
    "items": [
      {
        "name": "Item 1",
        (...)
      },
      {
      (...)
    ]
  }
}

The following is an example of the Ruby code to ingest the documents in the JSON above:

bulk_helper.ingest_json(file_path, { keys: ['data', 'items'] })

Scroll Helper

edit

This helper provides an easy way to get results from a Scroll.

To use the ScrollHelper, require it in your code:

require 'elasticsearch/helpers/scroll_helper'

Instantiate a ScrollHelper with a client, an index, and a body (with the scroll API parameters) which will be used in every following scroll request:

client = Elasticsearch::Client.new
scroll_helper = Elasticsearch::Helpers::ScrollHelper.new(client, index, body)

There are two ways to get the results from a scroll using the helper.

  1. You can iterate over a scroll using the methods in Enumerable such as each and map:

    scroll_helper.each do |item|
      puts item
    end
  2. You can fetch results by page, with the results function:

    my_documents = []
    while !(documents = scroll_helper.results).empty?
      my_documents << documents
    end
    scroll_helper.clear

ES|QL Helper

edit

This functionality is Experimental and may be changed or removed completely in a future release. If you have any feedback on this helper, please let us know.

The helper provides an object response from the ESQL query API instead of the default JSON value.

To use the ES|QL helper, require it in your code:

require 'elasticsearch/helpers/esql_helper'

By default, the query API returns a Hash response with columns and values like so:

query = <<ESQL
        FROM sample_data
        | EVAL duration_ms = ROUND(event.duration / 1000000.0, 1)
ESQL

response = client.esql.query(body: { query: query})
puts response

{"columns"=>[
  {"name"=>"@timestamp", "type"=>"date"},
  {"name"=>"client.ip", "type"=>"ip"},
  {"name"=>"event.duration", "type"=>"long"},
  {"name"=>"message", "type"=>"keyword"},
  {"name"=>"duration_ms", "type"=>"double"}
],
"values"=>[
  ["2023-10-23T12:15:03.360Z", "172.21.2.162", 3450233, "Connected to 10.1.0.3", 3.5],
  ["2023-10-23T12:27:28.948Z", "172.21.2.113", 2764889, "Connected to 10.1.0.2", 2.8],
  ["2023-10-23T13:33:34.937Z", "172.21.0.5", 1232382, "Disconnected", 1.2],
  ["2023-10-23T13:51:54.732Z", "172.21.3.15", 725448, "Connection error", 0.7],
  ["2023-10-23T13:52:55.015Z", "172.21.3.15", 8268153, "Connection error", 8.3],
  ["2023-10-23T13:53:55.832Z", "172.21.3.15", 5033755, "Connection error", 5.0],
  ["2023-10-23T13:55:01.543Z", "172.21.3.15", 1756467, "Connected to 10.1.0.1", 1.8]
]}

The helper returns an array of hashes with the columns as keys and the respective values. So for the previous example, it would return the following:

response = Elasticsearch::Helpers::ESQLHelper.query(client, query)

puts response

{"duration_ms"=>3.5, "message"=>"Connected to 10.1.0.3", "event.duration"=>3450233, "client.ip"=>"172.21.2.162", "@timestamp"=>"2023-10-23T12:15:03.360Z"}
{"duration_ms"=>2.8, "message"=>"Connected to 10.1.0.2", "event.duration"=>2764889, "client.ip"=>"172.21.2.113", "@timestamp"=>"2023-10-23T12:27:28.948Z"}
{"duration_ms"=>1.2, "message"=>"Disconnected", "event.duration"=>1232382, "client.ip"=>"172.21.0.5", "@timestamp"=>"2023-10-23T13:33:34.937Z"}
{"duration_ms"=>0.7, "message"=>"Connection error", "event.duration"=>725448, "client.ip"=>"172.21.3.15", "@timestamp"=>"2023-10-23T13:51:54.732Z"}
{"duration_ms"=>8.3, "message"=>"Connection error", "event.duration"=>8268153, "client.ip"=>"172.21.3.15", "@timestamp"=>"2023-10-23T13:52:55.015Z"}

Additionally, you can transform the data in the response by passing in a Hash of column => Proc values. You could use this for example to convert @timestamp into a DateTime object. Pass in a Hash to query as a parser defining a Proc for each value you’d like to parse:

require 'elasticsearch/helpers/esql_helper'

parser = {
  '@timestamp' => Proc.new { |t| DateTime.parse(t) }
}
response = Elasticsearch::Helpers::ESQLHelper.query(client, query, parser: parser)
response.first['@timestamp']
# <DateTime: 2023-10-23T12:15:03+00:00 ((2460241j,44103s,360000000n),+0s,2299161j)>

You can pass in as many Procs as there are columns in the response. For example:

parser = {
  '@timestamp' => Proc.new { |t| DateTime.parse(t) },
  'client.ip' => Proc.new { |i| IPAddr.new(i) },
  'event.duration' => Proc.new { |d| d.to_s }
}

response = Elasticsearch::Helpers::ESQLHelper.query(client, query, parser: parser)

puts response

{"duration_ms"=>3.5, "message"=>"Connected to 10.1.0.3", "event.duration"=>"3450233", "client.ip"=>#<IPAddr: IPv4:172.21.2.162/255.255.255.255>, "@timestamp"=>#<DateTime: 2023-10-23T12:15:03+00:00 ((2460241j,44103s,360000000n),+0s,2299161j)>}
{"duration_ms"=>2.8, "message"=>"Connected to 10.1.0.2", "event.duration"=>"2764889", "client.ip"=>#<IPAddr: IPv4:172.21.2.113/255.255.255.255>, "@timestamp"=>#<DateTime: 2023-10-23T12:27:28+00:00 ((2460241j,44848s,948000000n),+0s,2299161j)>}
{"duration_ms"=>1.2, "message"=>"Disconnected", "event.duration"=>"1232382", "client.ip"=>#<IPAddr: IPv4:172.21.0.5/255.255.255.255>, "@timestamp"=>#<DateTime: 2023-10-23T13:33:34+00:00 ((2460241j,48814s,937000000n),+0s,2299161j)>}
{"duration_ms"=>0.7, "message"=>"Connection error", "event.duration"=>"725448", "client.ip"=>#<IPAddr: IPv4:172.21.3.15/255.255.255.255>, "@timestamp"=>#<DateTime: 2023-10-23T13:51:54+00:00 ((2460241j,49914s,732000000n),+0s,2299161j)>}
{"duration_ms"=>8.3, "message"=>"Connection error", "event.duration"=>"8268153", "client.ip"=>#<IPAddr: IPv4:172.21.3.15/255.255.255.255>, "@timestamp"=>#<DateTime: 2023-10-23T13:52:55+00:00 ((2460241j,49975s,15000000n),+0s,2299161j)>}