Update By Query API

edit

Update By Query API

edit

The simplest usage of updateByQuery updates each document in an index without changing the source. This usage enables picking up a new property or another online mapping change.

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

Calls to the updateByQuery API start by getting a snapshot of the index, indexing any documents found using the internal versioning.

Version conflicts happen when a document changes between the time of the snapshot and the time the index request processes.

When the versions match, updateByQuery updates the document and increments the version number.

All update and query failures cause updateByQuery to abort. These failures are available from the BulkByScrollResponse#getIndexingFailures method. Any successful updates remain and are not rolled back. While the first failure causes the abort, the response contains all of the failures generated by the failed bulk request.

To prevent version conflicts from causing updateByQuery to abort, set abortOnVersionConflict(false). The first example does this because it is trying to pick up an online mapping change and a version conflict means that the conflicting document was updated between the start of the updateByQuery and the time when it attempted to update the document. This is fine because that update will have picked up the online mapping update.

The UpdateByQueryRequestBuilder API supports filtering the updated documents, limiting the total number of documents to update, and updating documents with a script:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder also enables direct access to the query used to select the documents. You can use this access to change the default scroll size or otherwise modify the request for matching documents.

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

You can also combine size with sorting to limit the documents updated:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

In addition to changing the _source field for the document, you can use a script to change the action, similar to the Update API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == 'absolutely') {"
            + "  ctx.op='noop'"
            + "} else if (ctx._source.awesome == 'lame') {"
            + "  ctx.op='delete'"
            + "} else {"
            + "ctx._source.awesome = 'absolutely'}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

As in the Update API, you can set the value of ctx.op to change the operation that executes:

noop
Set ctx.op = "noop" if your script doesn’t make any changes. The updateByQuery operaton then omits that document from the updates. This behavior increments the noop counter in the response body.
delete
Set ctx.op = "delete" if your script decides that the document must be deleted. The deletion will be reported in the deleted counter in the response body.

Setting ctx.op to any other value generates an error. Setting any other field in ctx generates an error.

This API doesn’t allow you to move the documents it touches, just modify their source. This is intentional! We’ve made no provisions for removing the document from its original location.

You can also perform these operations on multiple indices and types at once, similar to the search API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

If you provide a routing value then the process copies the routing value to the scroll query, limiting the process to the shards that match that routing value:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery can also use the ingest node by specifying a pipeline like this:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();

Works with the Task API

edit

You can fetch the status of all running update-by-query requests with the Task API:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

With the TaskId shown above you can look up the task directly:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

Works with the Cancel Task API

edit

Any Update By Query can be canceled using the Task Cancel API:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

Use the list tasks API to find the value of taskId.

Cancelling a request is typically a very fast process but can take up to a few seconds. The task status API continues to list the task until the cancellation is complete.

Rethrottling

edit

Use the _rethrottle API to change the value of requests_per_second on a running update:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

Use the list tasks API to find the value of taskId.

As with the updateByQuery API, the value of requests_per_second can be any positive float value to set the level of the throttle, or Float.POSITIVE_INFINITY to disable throttling. A value of requests_per_second that speeds up the process takes effect immediately. requests_per_second values that slow the query take effect after completing the current batch in order to prevent scroll timeouts.