Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] fix for max & min aggregations when no metric property exist #878

Merged
merged 1 commit into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion detekt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ complexity:
LongMethod:
excludes: ['**/test/**']
LongParameterList:
excludes: ['**/test/**']
excludes: ['**/test/**']
NestedBlockDepth:
threshold: 5
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ class RollupIndexer(
it.aggregations.forEach {
when (it) {
is InternalSum -> aggResults[it.name] = it.value
is InternalMax -> aggResults[it.name] = it.value
is InternalMin -> aggResults[it.name] = it.value
// TODO: Need to redo the logic in corresponding doXContentBody of InternalMax and InternalMin
is InternalMax -> if (it.value.isInfinite()) aggResults[it.name] = null else aggResults[it.name] = it.value
is InternalMin -> if (it.value.isInfinite()) aggResults[it.name] = null else aggResults[it.name] = it.value
is InternalValueCount -> aggResults[it.name] = it.value
is InternalAvg -> aggResults[it.name] = it.value
else -> error("Found aggregation in composite result that is not supported [${it.type} - ${it.name}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() {
insertSampleBulkData(index, javaClass.classLoader.getResource("data/nyc_5000.ndjson").readText())
}

protected fun generateMessageLogsData(index: String = "message-logs") {
createIndex(index, Settings.EMPTY, """"properties": {"message":{"properties":{"bytes_in":{"type":"long"},"bytes_out":{"type":"long"},"plugin":{"eager_global_ordinals":true,"ignore_above":10000,"type":"keyword"},"timestamp_received":{"type":"date"}}}}""")
insertSampleBulkData(index, javaClass.classLoader.getResource("data/message_logs.ndjson").readText())
}

@Suppress("UNCHECKED_CAST")
protected fun extractFailuresFromSearchResponse(searchResponse: Response): List<Map<String, String>?>? {
val shards = searchResponse.asMap()["_shards"] as Map<String, ArrayList<Map<String, Any>>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,78 @@ class RollupRunnerIT : RollupRestTestCase() {
}
}

@Suppress("UNCHECKED_CAST")
fun `test rollup with max metric when metric property not present`() {
val sourceIdxTestName = "source_idx_test_max"
val targetIdxTestName = "target_idx_test_max"
val propertyName = "message.bytes_in"
val maxMetricName = "min_message_bytes_in"

generateMessageLogsData(sourceIdxTestName)
val rollup = Rollup(
id = "rollup_test_max",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic stats test",
sourceIndex = sourceIdxTestName,
targetIndex = targetIdxTestName,
metadataID = null,
roles = emptyList(),
pageSize = 100,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "message.timestamp_received", targetField = "message.timestamp_received", fixedInterval = "10m"),
Terms("message.plugin", "message.plugin")
),
metrics = listOf(
RollupMetrics(sourceField = propertyName, targetField = propertyName, metrics = listOf(Max()))
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) }

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)

// Term query
val req = """
{
"size": 0,
"query": {
"match_all": {}
},
"aggs": {
"$maxMetricName": {
"max": {
"field": "$propertyName"
}
}
}
}
""".trimIndent()
var rawRes = client().makeRequest(RestRequest.Method.POST.name, "/$sourceIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
var rollupRes = client().makeRequest(RestRequest.Method.POST.name, "/$targetIdxTestName/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupRes.restStatus() == RestStatus.OK)
var rawAggRes = rawRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
var rollupAggRes = rollupRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"Source and rollup index did not return same max results",
rawAggRes.getValue(maxMetricName)["value"],
rollupAggRes.getValue(maxMetricName)["value"]
)
}
}

// TODO: Test scenarios:
// - Source index deleted after first execution
// * If this is with a source index pattern and the underlying indices are recreated but with different data
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/data/message_logs.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"create":{}}
{"message":{"bytes_out":4256,"plugin":"AlienVault NIDS","timestamp_received":1689786716020}}
{"create":{}}
{"message":{"bytes_out":4526,"plugin":"AlienVault NIDS","timestamp_received":1689886716020}}
Loading