diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy index 6abb2b64172..3d95a7d0023 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultPathwayContextTest.groovy @@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS class DefaultPathwayContextTest extends DDCoreSpecification { def wellKnownTags = new WellKnownTags("runtimeid", "hostname", "testing", "service", "version", "java") + long baseHash = 12 static final DEFAULT_BUCKET_DURATION_NANOS = Config.get().getDataStreamsBucketDurationNanoseconds() def pointConsumer = new Consumer() { @@ -44,7 +45,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "First Set checkpoint starts the context."() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(50) @@ -59,7 +60,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Checkpoint generated"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(50) @@ -85,7 +86,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Checkpoint with payload size"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(25) @@ -106,7 +107,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Multiple checkpoints generated"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(50) @@ -146,7 +147,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Exception thrown when trying to encode unstarted context"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: context.encode() @@ -158,14 +159,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Set checkpoint with dataset tags"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "in"]), pointConsumer) def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "out"]), pointConsumer) @@ -184,14 +185,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -212,7 +213,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Set checkpoint with timestamp"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) def timeFromQueue = timeSource.getCurrentTimeMillis() - 200 when: context.setCheckpoint(["type": "internal"], pointConsumer, timeFromQueue) @@ -233,7 +234,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(MILLISECONDS.toNanos(50)) @@ -241,7 +242,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, encoded) + def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -260,7 +261,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.strEncode() timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, secondEncode) + def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -281,7 +282,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) def contextVisitor = new Base64MapContextVisitor() when: @@ -291,7 +292,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags) + def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -311,7 +312,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.strEncode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags) + def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -332,14 +333,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(2)) - def decodedContext = DefaultPathwayContext.decode(timeSource, wellKnownTags, encoded) + def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -361,7 +362,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(MILLISECONDS.toNanos(50)) @@ -369,7 +370,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.encode() timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.decode(timeSource, wellKnownTags, encoded) + def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -388,7 +389,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { when: def secondEncode = decodedContext.encode() timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.decode(timeSource, wellKnownTags, secondEncode) + def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, secondEncode) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -408,7 +409,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Legacy binary encoding"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) def contextVisitor = new BinaryMapContextVisitor() when: @@ -418,7 +419,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = java.util.Base64.getDecoder().decode(context.encode()) Map carrier = [(PathwayContext.PROPAGATION_KEY): encoded] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, wellKnownTags) + def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) then: decodedContext.strEncode() == context.strEncode() @@ -428,7 +429,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) def contextVisitor = new BinaryMapContextVisitor() when: @@ -438,7 +439,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.encode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": new byte[0]] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, wellKnownTags) + def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer) @@ -458,7 +459,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.encode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, wellKnownTags) + def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer) @@ -479,7 +480,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { // Timesource needs to be advanced in milliseconds because encoding truncates to millis given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) def contextVisitor = new Base64MapContextVisitor() when: @@ -489,7 +490,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def encoded = context.strEncode() Map carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"] timeSource.advance(MILLISECONDS.toNanos(1)) - def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags) + def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(25)) context.setCheckpoint(new LinkedHashMap<>(["topic": "topic", "type": "sqs"]), pointConsumer) @@ -509,7 +510,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def secondEncode = decodedContext.strEncode() carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode] timeSource.advance(MILLISECONDS.toNanos(2)) - def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags) + def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(30)) context.setCheckpoint(new LinkedHashMap<>(["topic": "topicB", "type": "sqs"]), pointConsumer) @@ -529,7 +530,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def "Empty tags not set"() { given: def timeSource = new ControllableTimeSource() - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) when: timeSource.advance(50) @@ -561,24 +562,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification { } def "Primary tag used in hash calculation"() { - given: - def timeSource = new ControllableTimeSource() - when: - def firstContext = new DefaultPathwayContext(timeSource, wellKnownTags) - timeSource.advance(50) - firstContext.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + def firstBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags) injectSysConfig(PRIMARY_TAG, "region-2") - def secondContext = new DefaultPathwayContext(timeSource, wellKnownTags) - timeSource.advance(25) - secondContext.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) + def secondBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags) then: - pointConsumer.points.size() == 2 - verifyFirstPoint(pointConsumer.points[0]) - verifyFirstPoint(pointConsumer.points[1]) - pointConsumer.points[0].hash != pointConsumer.points[1].hash + firstBaseHash != secondBaseHash } def "Check context extractor decorator behavior"() { @@ -600,7 +591,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() @@ -646,7 +637,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode() @@ -687,7 +678,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification { def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) - def context = new DefaultPathwayContext(timeSource, wellKnownTags) + def context = new DefaultPathwayContext(timeSource, baseHash) timeSource.advance(MILLISECONDS.toNanos(50)) context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer) def encoded = context.strEncode()