Using Bulk Processor
editUsing Bulk Processor
editThe BulkProcessor
class offers a simple interface to flush bulk operations automatically based on the number or size
of requests, or after a given period.
To use it, first create a BulkProcessor
instance:
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();
Add your elasticsearch client |
|
This method is called just before bulk is executed. You can for example see the numberOfActions with
|
|
This method is called after bulk execution. You can for example check if there was some failing requests
with |
|
This method is called when the bulk failed and raised a |
|
We want to execute the bulk every 10 000 requests |
|
We want to flush the bulk every 5mb |
|
We want to flush the bulk every 5 seconds whatever the number of requests |
|
Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. |
|
Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
times. A retry is attempted whenever one or more bulk item requests have failed with an |
Then you can simply add your requests to the BulkProcessor
:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
By default, BulkProcessor
:
-
sets bulkActions to
1000
-
sets bulkSize to
5mb
- does not set flushInterval
- sets concurrentRequests to 1
- sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
When all documents are loaded to the BulkProcessor
it can be closed by using awaitClose
or close
methods:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
or
bulkProcessor.close();
Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
flushInterval
. If concurrent requests were enabled the awaitClose
method waits for up to the specified timeout for
all bulk requests to complete then returns true
, if the specified waiting time elapses before all bulk requests complete,
false
is returned. The close
method doesn’t wait for any remaining bulk requests to complete and exits immediately.