Skip to content

Commit

Permalink
feat(tracing): propagate tracing information when enqueueing events
Browse files Browse the repository at this point in the history
  • Loading branch information
giovanniberti committed Jul 12, 2023
1 parent 0be0e9a commit 29739bc
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 94 deletions.
38 changes: 36 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-serializer-json-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kotlin</groupId>
<artifactId>reactor-kotlin-extensions</artifactId>
Expand All @@ -87,6 +95,24 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.27.0-alpha</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down Expand Up @@ -202,6 +228,14 @@
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.27.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -373,8 +407,8 @@
<configuration>
<skipCheckoutIfExists>false</skipCheckoutIfExists>
<connectionUrl>scm:git:https://github.com/pagopa/pagopa-ecommerce-commons.git</connectionUrl>
<scmVersionType>tag</scmVersionType>
<scmVersion>${pagopa-ecommerce-commons.version}</scmVersion>
<scmVersionType>branch</scmVersionType>
<scmVersion>queue-events</scmVersion>
<goals>install -DskipTests</goals>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package it.pagopa.ecommerce.transactions.scheduler.configurations

import com.azure.storage.queue.QueueAsyncClient
import com.azure.storage.queue.QueueClientBuilder
import it.pagopa.ecommerce.commons.client.QueueAsyncClient
import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -19,6 +20,6 @@ class AzureStorageConfig {
.queueName(queueEventInitName)
.buildAsyncClient()
queueAsyncClient.createIfNotExists().block()
return queueAsyncClient
return QueueAsyncClient(queueAsyncClient, StrictJsonSerializerProvider().createInstance())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package it.pagopa.ecommerce.transactions.scheduler.configurations;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import it.pagopa.ecommerce.commons.queues.TracingUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TracingConfig {
@Bean
public OpenTelemetry openTelemetry() {
return GlobalOpenTelemetry.get();
}

@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("pagopa-ecommerce-transactions-scheduler-service");
}

@Bean
public TracingUtils tracingUtils(OpenTelemetry openTelemetry, Tracer tracer) {
return new TracingUtils(openTelemetry, tracer);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package it.pagopa.ecommerce.transactions.scheduler.publishers

import com.azure.core.util.BinaryData
import com.azure.storage.queue.QueueAsyncClient
import it.pagopa.ecommerce.commons.client.QueueAsyncClient
import it.pagopa.ecommerce.commons.documents.v1.TransactionEvent
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransaction
import it.pagopa.ecommerce.commons.generated.server.model.TransactionStatusDto
import it.pagopa.ecommerce.commons.queues.QueueEvent
import it.pagopa.ecommerce.commons.queues.TracingUtils
import java.time.Duration
import java.util.concurrent.atomic.AtomicLong
import org.slf4j.Logger
Expand All @@ -17,7 +18,8 @@ abstract class EventPublisher<E>(
private val queueAsyncClient: QueueAsyncClient,
private val logger: Logger,
private val parallelEventsToProcess: Int,
private val transientQueueTTLSeconds: Int
private val transientQueueTTLSeconds: Int,
private val tracingUtils: TracingUtils,
) where E : TransactionEvent<*> {

private fun publishEvent(
Expand All @@ -28,21 +30,23 @@ abstract class EventPublisher<E>(
return Mono.just(baseTransaction)
.flatMap { storeEventAndUpdateView(it, newStatus) }
.flatMap { event ->
queueAsyncClient
.sendMessageWithResponse(
BinaryData.fromObject(event),
Duration.ofMillis(visibilityTimeoutMillis),
Duration.ofSeconds(transientQueueTTLSeconds.toLong())
)
.flatMap {
logger.info(
"Event: [$event] successfully sent with visibility timeout: [${it.value.timeNextVisible}] ms to queue: [${queueAsyncClient.queueName}]"
tracingUtils.traceMono(this.javaClass.simpleName) { tracingInfo ->
queueAsyncClient
.sendMessageWithResponse(
QueueEvent(event, tracingInfo),
Duration.ofMillis(visibilityTimeoutMillis),
Duration.ofSeconds(transientQueueTTLSeconds.toLong())
)
Mono.just(true)
}
.doOnError { exception ->
logger.error("Error sending event: [${event}].", exception)
}
.flatMap {
logger.info(
"Event: [$event] successfully sent with visibility timeout: [${it.value.timeNextVisible}] ms to queue: [${queueAsyncClient.queueName}]"
)
Mono.just(true)
}
.doOnError { exception ->
logger.error("Error sending event: [${event}].", exception)
}
}
}
.onErrorResume {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package it.pagopa.ecommerce.transactions.scheduler.publishers

import com.azure.storage.queue.QueueAsyncClient
import it.pagopa.ecommerce.commons.client.QueueAsyncClient
import it.pagopa.ecommerce.commons.documents.v1.*
import it.pagopa.ecommerce.commons.domain.v1.TransactionWithClosureError
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransaction
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransactionWithCancellationRequested
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransactionWithRequestedAuthorization
import it.pagopa.ecommerce.commons.generated.server.model.TransactionStatusDto
import it.pagopa.ecommerce.commons.queues.TracingUtils
import it.pagopa.ecommerce.transactions.scheduler.repositories.TransactionsEventStoreRepository
import it.pagopa.ecommerce.transactions.scheduler.repositories.TransactionsViewRepository
import org.slf4j.Logger
Expand All @@ -28,13 +29,15 @@ class TransactionExpiredEventPublisher(
@Value("\${pendingTransactions.batch.transactionsAnalyzer.parallelEventsToProcess}")
private val parallelEventToProcess: Int,
@Value("\${azurestorage.queues.transientQueues.ttlSeconds}")
private val transientQueueTTLSeconds: Int
private val transientQueueTTLSeconds: Int,
private val tracingUtils: TracingUtils
) :
EventPublisher<TransactionExpiredEvent>(
queueAsyncClient = expiredEventQueueAsyncClient,
logger = logger,
parallelEventsToProcess = parallelEventToProcess,
transientQueueTTLSeconds = transientQueueTTLSeconds
transientQueueTTLSeconds = transientQueueTTLSeconds,
tracingUtils = tracingUtils
) {

fun publishExpiryEvents(
Expand Down
Loading

0 comments on commit 29739bc

Please sign in to comment.