Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vandonr committed Jul 11, 2024
1 parent 9668efb commit 10eb93c
Showing 1 changed file with 37 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS

class DefaultPathwayContextTest extends DDCoreSpecification {
def wellKnownTags = new WellKnownTags("runtimeid", "hostname", "testing", "service", "version", "java")
long baseHash = 12

static final DEFAULT_BUCKET_DURATION_NANOS = Config.get().getDataStreamsBucketDurationNanoseconds()
def pointConsumer = new Consumer<StatsPoint>() {
Expand All @@ -44,7 +45,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "First Set checkpoint starts the context."() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)

when:
timeSource.advance(50)
Expand All @@ -59,7 +60,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Checkpoint generated"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)

when:
timeSource.advance(50)
Expand All @@ -85,7 +86,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Checkpoint with payload size"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)

when:
timeSource.advance(25)
Expand All @@ -106,7 +107,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Multiple checkpoints generated"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)

when:
timeSource.advance(50)
Expand Down Expand Up @@ -146,7 +147,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Exception thrown when trying to encode unstarted context"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)

when:
context.encode()
Expand All @@ -158,14 +159,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Set checkpoint with dataset tags"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)

when:
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "in"]), pointConsumer)
def encoded = context.strEncode()
timeSource.advance(MILLISECONDS.toNanos(2))
def decodedContext = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, encoded)
def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["type": "s3", "ds.namespace": "my_bucket", "ds.name": "my_object.csv", "direction": "out"]), pointConsumer)

Expand All @@ -184,14 +185,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
// Timesource needs to be advanced in milliseconds because encoding truncates to millis
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
when:
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def encoded = context.strEncode()
timeSource.advance(MILLISECONDS.toNanos(2))
def decodedContext = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, encoded)
def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer)
Expand All @@ -212,7 +213,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Set checkpoint with timestamp"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
def timeFromQueue = timeSource.getCurrentTimeMillis() - 200
when:
context.setCheckpoint(["type": "internal"], pointConsumer, timeFromQueue)
Expand All @@ -233,15 +234,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
// Timesource needs to be advanced in milliseconds because encoding truncates to millis
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
when:
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def encoded = context.strEncode()
timeSource.advance(MILLISECONDS.toNanos(1))
def decodedContext = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, encoded)
def decodedContext = DefaultPathwayContext.strDecode(timeSource, baseHash, encoded)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer)
Expand All @@ -260,7 +261,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
when:
def secondEncode = decodedContext.strEncode()
timeSource.advance(MILLISECONDS.toNanos(2))
def secondDecode = DefaultPathwayContext.strDecode(timeSource, wellKnownTags, secondEncode)
def secondDecode = DefaultPathwayContext.strDecode(timeSource, baseHash, secondEncode)
timeSource.advance(MILLISECONDS.toNanos(30))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer)
Expand All @@ -281,7 +282,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
// Timesource needs to be advanced in milliseconds because encoding truncates to millis
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
def contextVisitor = new Base64MapContextVisitor()
when:
Expand All @@ -291,7 +292,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def encoded = context.strEncode()
Map<String, String> carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"]
timeSource.advance(MILLISECONDS.toNanos(1))
def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags)
def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer)
Expand All @@ -311,7 +312,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def secondEncode = decodedContext.strEncode()
carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode]
timeSource.advance(MILLISECONDS.toNanos(2))
def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags)
def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(30))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer)
Expand All @@ -332,14 +333,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
// Timesource needs to be advanced in milliseconds because encoding truncates to millis
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
when:
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def encoded = context.encode()
timeSource.advance(MILLISECONDS.toNanos(2))
def decodedContext = DefaultPathwayContext.decode(timeSource, wellKnownTags, encoded)
def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer)
Expand All @@ -361,15 +362,15 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
// Timesource needs to be advanced in milliseconds because encoding truncates to millis
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
when:
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def encoded = context.encode()
timeSource.advance(MILLISECONDS.toNanos(1))
def decodedContext = DefaultPathwayContext.decode(timeSource, wellKnownTags, encoded)
def decodedContext = DefaultPathwayContext.decode(timeSource, baseHash, encoded)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer)
Expand All @@ -388,7 +389,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
when:
def secondEncode = decodedContext.encode()
timeSource.advance(MILLISECONDS.toNanos(2))
def secondDecode = DefaultPathwayContext.decode(timeSource, wellKnownTags, secondEncode)
def secondDecode = DefaultPathwayContext.decode(timeSource, baseHash, secondEncode)
timeSource.advance(MILLISECONDS.toNanos(30))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer)
Expand All @@ -408,7 +409,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Legacy binary encoding"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
def contextVisitor = new BinaryMapContextVisitor()
when:
Expand All @@ -418,7 +419,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def encoded = java.util.Base64.getDecoder().decode(context.encode())
Map<String, byte[]> carrier = [(PathwayContext.PROPAGATION_KEY): encoded]
timeSource.advance(MILLISECONDS.toNanos(1))
def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, wellKnownTags)
def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash)
then:
decodedContext.strEncode() == context.strEncode()
Expand All @@ -428,7 +429,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
// Timesource needs to be advanced in milliseconds because encoding truncates to millis
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
def contextVisitor = new BinaryMapContextVisitor()
when:
Expand All @@ -438,7 +439,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def encoded = context.encode()
Map<String, byte[]> carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": new byte[0]]
timeSource.advance(MILLISECONDS.toNanos(1))
def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, wellKnownTags)
def decodedContext = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topic", "type": "kafka"]), pointConsumer)
Expand All @@ -458,7 +459,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def secondEncode = decodedContext.encode()
carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode]
timeSource.advance(MILLISECONDS.toNanos(2))
def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, wellKnownTags)
def secondDecode = DefaultPathwayContext.extractBinary(carrier, contextVisitor, timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(30))
context.setCheckpoint(new LinkedHashMap<>(["group": "group", "topic": "topicB", "type": "kafka"]), pointConsumer)
Expand All @@ -479,7 +480,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
// Timesource needs to be advanced in milliseconds because encoding truncates to millis
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
def contextVisitor = new Base64MapContextVisitor()
when:
Expand All @@ -489,7 +490,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def encoded = context.strEncode()
Map<String, String> carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): encoded, "someotherkey": "someothervalue"]
timeSource.advance(MILLISECONDS.toNanos(1))
def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags)
def decodedContext = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(25))
context.setCheckpoint(new LinkedHashMap<>(["topic": "topic", "type": "sqs"]), pointConsumer)
Expand All @@ -509,7 +510,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def secondEncode = decodedContext.strEncode()
carrier = [(PathwayContext.PROPAGATION_KEY_BASE64): secondEncode]
timeSource.advance(MILLISECONDS.toNanos(2))
def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, wellKnownTags)
def secondDecode = DefaultPathwayContext.extract(carrier, contextVisitor, timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(30))
context.setCheckpoint(new LinkedHashMap<>(["topic": "topicB", "type": "sqs"]), pointConsumer)
Expand All @@ -529,7 +530,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def "Empty tags not set"() {
given:
def timeSource = new ControllableTimeSource()
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
when:
timeSource.advance(50)
Expand Down Expand Up @@ -561,24 +562,14 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
}
def "Primary tag used in hash calculation"() {
given:
def timeSource = new ControllableTimeSource()
when:
def firstContext = new DefaultPathwayContext(timeSource, wellKnownTags)
timeSource.advance(50)
firstContext.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def firstBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags)
injectSysConfig(PRIMARY_TAG, "region-2")
def secondContext = new DefaultPathwayContext(timeSource, wellKnownTags)
timeSource.advance(25)
secondContext.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def secondBaseHash = DefaultPathwayContext.getBaseHash(wellKnownTags)
then:
pointConsumer.points.size() == 2
verifyFirstPoint(pointConsumer.points[0])
verifyFirstPoint(pointConsumer.points[1])
pointConsumer.points[0].hash != pointConsumer.points[1].hash
firstBaseHash != secondBaseHash
}
def "Check context extractor decorator behavior"() {
Expand All @@ -600,7 +591,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def encoded = context.strEncode()
Expand Down Expand Up @@ -646,7 +637,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def encoded = context.strEncode()
Expand Down Expand Up @@ -687,7 +678,7 @@ class DefaultPathwayContextTest extends DDCoreSpecification {
def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { globalTraceConfig }, wellKnownTags, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS)
def context = new DefaultPathwayContext(timeSource, wellKnownTags)
def context = new DefaultPathwayContext(timeSource, baseHash)
timeSource.advance(MILLISECONDS.toNanos(50))
context.setCheckpoint(new LinkedHashMap<>(["type": "internal"]), pointConsumer)
def encoded = context.strEncode()
Expand Down

0 comments on commit 10eb93c

Please sign in to comment.