Skip to content

Commit

Permalink
Add support for all elasticsearch actions while bulk indexing
Browse files Browse the repository at this point in the history
Currently, if the response body of the bulk indexing action contains
anything else than index action, the response is taken as invalid, even
though the request was successful.

This patch adds support for all the other remaining actions specified in
the elasticsearch documentation[1].

[1]: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body

Signed-off-by: Ladislav Macoun <[email protected]>
  • Loading branch information
ladislavmacoun committed Oct 26, 2021
1 parent d0d63a2 commit edfbdf5
Showing 1 changed file with 53 additions and 38 deletions.
91 changes: 53 additions & 38 deletions src/bulk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,52 @@ static std::string jsonValueToString(const rapidjson::Value& val) {

namespace elasticlient {

static const char *supportedActions[] = {
"create",
"index",
"update",
"delete",
};


static int containsError(const rapidjson::Value &item, const char *action) {
int errors = 0;

const rapidjson::Value &res = item[action];
if (!res.IsObject()) {
LOG(LogLevel::WARNING, "Bulk response has unexpected format, "
"object was expected.");
return ++errors;
}

// read status code
if (!res.HasMember("status")) {
LOG(LogLevel::WARNING, "Bulk response item with missing status.");
return ++errors;
};

const rapidjson::Value &status = res["status"];
if (!status.IsInt()) {
LOG(LogLevel::WARNING, "Bulk response was expected to have numeric status. "
"Skipping this response checking.");
return ++errors;
}

// if status code is not 2xx family, consider it as error
int status_code = status.GetInt();
if (status_code < 200 || status_code > 299) {
std::ostringstream out;
out << "Bulk response contains status code "
<< std::to_string(status_code)
<< " Elastic response: " << jsonValueToString(item);

LOG(LogLevel::WARNING, out.str().c_str());
return ++errors;
}

return errors;
}


IBulkData::~IBulkData() {}

Expand Down Expand Up @@ -247,6 +293,8 @@ void Bulk::Implementation::processResult(
// "_shards": {"total": int, "successful": int, "failed": int},
// "status": 201}}
// ]}
// Reference:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body

// check errors flag, if it is false, do not parse single responses
if (root.HasMember("errors")) {
Expand Down Expand Up @@ -280,44 +328,11 @@ void Bulk::Implementation::processResult(
}

// check index action response
if (item.HasMember("index")) {
const rapidjson::Value &res = item["index"];
if (!res.IsObject()) {
LOG(LogLevel::WARNING, "Bulk response has unexpected format, "
"object was expected.");
errCount++;
continue;
}
// read status code
if (!res.HasMember("status")) {
LOG(LogLevel::WARNING, "Bulk response item with missing status.");
errCount++;
continue;
};
const rapidjson::Value &status = res["status"];
if (!status.IsInt()) {
LOG(LogLevel::WARNING, "Bulk response was expected to have numeric status. "
"Skipping this response checking.");
errCount++;
continue;
}

// if status code is not 2xx family, consider it as error
if (status.GetInt() / 100 != 2) {
std::ostringstream out;
out << "Bulk response contains status code "
<< std::to_string(status.GetInt())
<< " Elastic response: " << jsonValueToString(item);

LOG(LogLevel::WARNING, out.str().c_str());
errCount++;
}
} else {
std::ostringstream out;
out << "Unsupported 'action' found at bulk response, "
<< jsonValueToString(root);

LOG(LogLevel::WARNING, out.str().c_str());
for (const auto &action : supportedActions) {
if (item.HasMember(action)) {
errCount += containsError(item, action);
break;
}
}
}

Expand Down

0 comments on commit edfbdf5

Please sign in to comment.