Indexing documents

edit

NEST exposes the index and bulk APIs of Elasticsearch as methods, to enable indexing of single or multiple documents. In addition to this, the client provides some convenient shorthand methods for the typical indexing approaches.

Single documents

edit

A single document can be indexed, either synchronously or asynchronously, using the IndexDocument and IndexDocumentAsync methods, respectively. These methods are a simple way to index a single document that doesn’t require any additional request parameters

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman"
};

var indexResponse = client.IndexDocument(person); 
if (!indexResponse.IsValid)
{
    // If the request isn't valid, we can take action here
}

var indexResponseAsync = await client.IndexDocumentAsync(person); 

synchronous method that returns an IndexResponse

asynchronous method that returns a Task<IndexResponse> that can be awaited

Single documents with parameters

edit

If you need to set additional parameters when indexing, you can use the Index method with either the fluent or object initializer syntax. The Index method exposes a way to set additional parameters such as the name of the index in which to index, the id to assign to the document, routing parameters, etc., allowing more control over indexing.

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman"
};

var indexResponse1 = client.Index(person, i => i.Index("people")); 

var indexResponse2 = client.Index(new IndexRequest<Person>(person, "people")); 

fluent syntax

object initializer syntax

Multiple documents with IndexMany

edit

Multiple documents can be indexed using the IndexMany and IndexManyAsync methods, again either synchronously or asynchronously, respectively. These methods are specific to the NEST client to provide a convenient shortcut to indexing multiple documents using the _bulk endpoint.

IndexMany all documents in a single HTTP request, so for very large document collections it is not a recommended approach - consider using the BulkAllObservable helper instead.

var people = new []
{
    new Person
    {
        Id = 1,
        FirstName = "Martijn",
        LastName = "Laarman"
    },
    new Person
    {
        Id = 2,
        FirstName = "Stuart",
        LastName = "Cam"
    },
    new Person
    {
        Id = 3,
        FirstName = "Russ",
        LastName = "Cam"
    }
};

var indexManyResponse = client.IndexMany(people); 

if (indexManyResponse.Errors) 
{
    foreach (var itemWithError in indexManyResponse.ItemsWithErrors) 
    {
        Console.WriteLine($"Failed to index document {itemWithError.Id}: {itemWithError.Error}");
    }
}

// Alternatively, documents can be indexed asynchronously
var indexManyAsyncResponse = await client.IndexManyAsync(people); 

synchronous method that returns an IBulkResponse

the response can be inspected to see if any of the bulk operations resulted in an error

If there are errors, they can be enumerated and inspected

asynchronous method that returns a Task<IBulkResponse> that can be awaited

Multiple documents with Bulk

edit

If you require more control over indexing many documents, you can use the Bulk and BulkAsync methods and use the descriptors to customise the bulk calls.

As with the IndexMany methods, documents are sent using the bulk API in a single HTTP request. This does mean that consideration should be given to the overall size of the HTTP request. For indexing a large number of documents, it may be sensible to perform multiple separate Bulk calls, or use BulkAllObservable, which takes care of a lot of the complexity.

var bulkIndexResponse = client.Bulk(b => b
    .Index("people")
    .IndexMany(people)
); 

// Alternatively, documents can be indexed asynchronously similar to IndexManyAsync
var asyncBulkIndexResponse = await client.BulkAsync(b => b
    .Index("people")
    .IndexMany(people)
); 

synchronous method that returns an IBulkResponse, the same as IndexMany and can be inspected in the same way for errors

asynchronous method that returns a Task<IBulkResponse> that can be awaited

Control over how each bulk index operation is configured can be achieved by passing a descriptor to the IndexMany method on Bulk. Here’s an example of specifying a different index and pipeline for each document, based on properties of the document to be indexed

var bulkIndexResponse = client.Bulk(b => b
    .Index("people")
    .IndexMany(people, (descriptor, person) => descriptor
        .Index(person.Id % 2 == 0
            ? "even-index"
            : "odd-index") 
        .Pipeline(person.FirstName.StartsWith("M")
            ? "startswith_m_pipeline"
            : "does_not_start_with_m_pipeline") 
    )
);

configure an explicit index for a document, based on its Id

specify an ingest pipeline to use when indexing the document

Multiple documents with BulkAllObservable helper

edit

Using the BulkAllObservable helper allows you to focus on the overall objective of indexing, without having to concern yourself with retry, backoff or chunking mechanics. Multiple documents can be indexed using the BulkAll method and Wait() extension method.

This helper exposes functionality to automatically retry / backoff in the event of an indexing failure, and to control the number of documents indexed in a single HTTP request. In the example below each request will contain 1000 documents, chunked from the original input. In the event of a large number of documents this could result in many HTTP requests, each containing 1000 documents (the last request may contain less, depending on the total number).

The helper lazily enumerates the provided IEnumerable<T> of documents, allowing you to index a large number of documents easily

var bulkAllObservable = client.BulkAll(people, b => b
    .Index("people")
    .BackOffTime("30s") 
    .BackOffRetries(2) 
    .RefreshOnCompleted()
    .MaxDegreeOfParallelism(Environment.ProcessorCount)
    .Size(1000) 
)
.Wait(TimeSpan.FromMinutes(15), next => 
{
    // do something e.g. write number of pages to console
});

how long to wait between retries

how many retries are attempted if a failure occurs

items per bulk request

perform the indexing and wait up to 15 minutes, whilst the BulkAll calls are asynchronous this is a blocking operation

The internal implementation of BulkAllObservable is asynchronous, using the Observer Design Pattern to enable observers to be registered to take action when each bulk response is returned, an error has occurred, and when the BulkAllObservable has finished. Whilst the internal implementation is asynchronous, you typically want to wait until all bulk indexing has finished before continuing. The Wait method is a convenient shorthand to use for this, using a ManualResetEvent to block the current thread until bulk indexing has finished or an error has occurred.

Advanced bulk indexing

edit

The BulkAllObservable helper exposes a number of methods to further control the process, such as

  • BufferToBulk to customize individual operations within the bulk request before it is dispatched to the server
  • RetryDocumentPredicate to decide if a document that failed to be indexed should be retried
  • DroppedDocumentCallback to determine what to do in the event a document is not indexed, even after retrying

The following example demonstrates some of these methods, in addition to using a BulkAllObserver to subscribe to the bulk indexing process and take some action on each successful bulk response, when an error occurs, and when the process has finished.

An observer such as BulkAllObserver should not throw exceptions from its interface implementations, such as OnNext and OnError. Any exceptions thrown should be expected to go unhandled. In light of this, any exception that occurs during the bulk indexing process should be captured and thrown outside of the observer, as demonstrated in the example below. Take a look at the Observer Design Pattern best practices on handling exceptions.

var bulkAllObservable = client.BulkAll(people, b => b
      .BufferToBulk((descriptor, buffer) => 
      {
          foreach (var person in buffer)
          {
              descriptor.Index<Person>(bi => bi
                  .Index(person.Id % 2 == 0 ? "even-index" : "odd-index") 
                  .Document(person)
              );
          }
      })
      .RetryDocumentPredicate((bulkResponseItem, person) => 
      {
          return bulkResponseItem.Error.Index == "even-index" && person.FirstName == "Martijn";
      })
      .DroppedDocumentCallback((bulkResponseItem, person) => 
      {
          Console.WriteLine($"Unable to index: {bulkResponseItem} {person}");
      }));

var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo exceptionDispatchInfo = null;

var observer = new BulkAllObserver(
    onNext: response =>
    {
        // do something e.g. write number of pages to console
    },
    onError: exception =>
    {
        exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
        waitHandle.Set();
    },
    onCompleted: () => waitHandle.Set());

bulkAllObservable.Subscribe(observer); 

waitHandle.WaitOne(); 

exceptionDispatchInfo?.Throw(); 

Customise each bulk operation before it is dispatched

Index each document into either even-index or odd-index

Decide if a document should be retried in the event of a failure

If a document cannot be indexed this delegate is called

Subscribe to the observable, which will initiate the bulk indexing process

Block the current thread until a signal is received

If an exception was captured during the bulk indexing process, throw it