diff --git a/src/bulk.cc b/src/bulk.cc index ce438d9..4d44f6a 100644 --- a/src/bulk.cc +++ b/src/bulk.cc @@ -44,6 +44,52 @@ static std::string jsonValueToString(const rapidjson::Value& val) { namespace elasticlient { +static const char *supportedActions[] = { + "create", + "index", + "update", + "delete", +}; + + +static int containsError(const rapidjson::Value &item, const char *action) { + int errors = 0; + + const rapidjson::Value &res = item[action]; + if (!res.IsObject()) { + LOG(LogLevel::WARNING, "Bulk response has unexpected format, " + "object was expected."); + return ++errors; + } + + // read status code + if (!res.HasMember("status")) { + LOG(LogLevel::WARNING, "Bulk response item with missing status."); + return ++errors; + }; + + const rapidjson::Value &status = res["status"]; + if (!status.IsInt()) { + LOG(LogLevel::WARNING, "Bulk response was expected to have numeric status. " + "Skipping this response checking."); + return ++errors; + } + + // if status code is not 2xx family, consider it as error + int status_code = status.GetInt(); + if (status_code < 200 || status_code > 299) { + std::ostringstream out; + out << "Bulk response contains status code " + << std::to_string(status_code) + << " Elastic response: " << jsonValueToString(item); + + LOG(LogLevel::WARNING, out.str().c_str()); + return ++errors; + } + + return errors; +} + IBulkData::~IBulkData() {} @@ -247,6 +293,8 @@ void Bulk::Implementation::processResult( // "_shards": {"total": int, "successful": int, "failed": int}, // "status": 201}} // ]} + // Reference: + // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body // check errors flag, if it is false, do not parse single responses if (root.HasMember("errors")) { @@ -280,44 +328,11 @@ void Bulk::Implementation::processResult( } // check index action response - if (item.HasMember("index")) { - const rapidjson::Value &res = item["index"]; - if (!res.IsObject()) { - LOG(LogLevel::WARNING, "Bulk response has unexpected format, " - "object was expected."); - errCount++; - continue; - } - // read status code - if (!res.HasMember("status")) { - LOG(LogLevel::WARNING, "Bulk response item with missing status."); - errCount++; - continue; - }; - const rapidjson::Value &status = res["status"]; - if (!status.IsInt()) { - LOG(LogLevel::WARNING, "Bulk response was expected to have numeric status. " - "Skipping this response checking."); - errCount++; - continue; - } - - // if status code is not 2xx family, consider it as error - if (status.GetInt() / 100 != 2) { - std::ostringstream out; - out << "Bulk response contains status code " - << std::to_string(status.GetInt()) - << " Elastic response: " << jsonValueToString(item); - - LOG(LogLevel::WARNING, out.str().c_str()); - errCount++; - } - } else { - std::ostringstream out; - out << "Unsupported 'action' found at bulk response, " - << jsonValueToString(root); - - LOG(LogLevel::WARNING, out.str().c_str()); + for (const auto &action : supportedActions) { + if (item.HasMember(action)) { + errCount += containsError(item, action); + break; + } } }