Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHK-1565] feat(tracing): propagate tracing information when enqueueing events #59

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<java.version>17</java.version>
<azure.functions.java.library.version>3.0.0</azure.functions.java.library.version>
<spring.cloud.azure.dependencies>4.0.0</spring.cloud.azure.dependencies>
<pagopa-ecommerce-commons.version>0.19.2</pagopa-ecommerce-commons.version>
<pagopa-ecommerce-commons.version>0.19.4</pagopa-ecommerce-commons.version>
giovanniberti marked this conversation as resolved.
Show resolved Hide resolved
<spotless.version>2.28.0</spotless.version>
<jacoco.version>0.8.8</jacoco.version>
<maven.compiler.plugin.version>3.10.1</maven.compiler.plugin.version>
Expand Down Expand Up @@ -61,6 +61,14 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-serializer-json-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kotlin</groupId>
<artifactId>reactor-kotlin-extensions</artifactId>
Expand All @@ -87,6 +95,24 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.27.0-alpha</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand Down Expand Up @@ -202,6 +228,14 @@
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-bom</artifactId>
<version>1.27.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package it.pagopa.ecommerce.transactions.scheduler.configurations

import com.azure.storage.queue.QueueAsyncClient
import com.azure.storage.queue.QueueClientBuilder
import it.pagopa.ecommerce.commons.client.QueueAsyncClient
import it.pagopa.ecommerce.commons.queues.StrictJsonSerializerProvider
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand All @@ -19,6 +20,6 @@ class AzureStorageConfig {
.queueName(queueEventInitName)
.buildAsyncClient()
queueAsyncClient.createIfNotExists().block()
return queueAsyncClient
return QueueAsyncClient(queueAsyncClient, StrictJsonSerializerProvider().createInstance())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package it.pagopa.ecommerce.transactions.scheduler.configurations;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import it.pagopa.ecommerce.commons.queues.TracingUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TracingConfig {
@Bean
public OpenTelemetry openTelemetry() {
return GlobalOpenTelemetry.get();
}

@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("pagopa-ecommerce-transactions-scheduler-service");
}

@Bean
public TracingUtils tracingUtils(OpenTelemetry openTelemetry, Tracer tracer) {
return new TracingUtils(openTelemetry, tracer);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package it.pagopa.ecommerce.transactions.scheduler.publishers

import com.azure.core.util.BinaryData
import com.azure.storage.queue.QueueAsyncClient
import it.pagopa.ecommerce.commons.client.QueueAsyncClient
import it.pagopa.ecommerce.commons.documents.v1.TransactionEvent
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransaction
import it.pagopa.ecommerce.commons.generated.server.model.TransactionStatusDto
import it.pagopa.ecommerce.commons.queues.QueueEvent
import it.pagopa.ecommerce.commons.queues.TracingUtils
import java.time.Duration
import java.util.concurrent.atomic.AtomicLong
import org.slf4j.Logger
Expand All @@ -17,7 +18,8 @@ abstract class EventPublisher<E>(
private val queueAsyncClient: QueueAsyncClient,
private val logger: Logger,
private val parallelEventsToProcess: Int,
private val transientQueueTTLSeconds: Int
private val transientQueueTTLSeconds: Int,
private val tracingUtils: TracingUtils,
) where E : TransactionEvent<*> {

private fun publishEvent(
Expand All @@ -28,21 +30,23 @@ abstract class EventPublisher<E>(
return Mono.just(baseTransaction)
.flatMap { storeEventAndUpdateView(it, newStatus) }
.flatMap { event ->
queueAsyncClient
.sendMessageWithResponse(
BinaryData.fromObject(event),
Duration.ofMillis(visibilityTimeoutMillis),
Duration.ofSeconds(transientQueueTTLSeconds.toLong())
)
.flatMap {
logger.info(
"Event: [$event] successfully sent with visibility timeout: [${it.value.timeNextVisible}] ms to queue: [${queueAsyncClient.queueName}]"
tracingUtils.traceMono(this.javaClass.simpleName) { tracingInfo ->
queueAsyncClient
.sendMessageWithResponse(
QueueEvent(event, tracingInfo),
Duration.ofMillis(visibilityTimeoutMillis),
Duration.ofSeconds(transientQueueTTLSeconds.toLong())
)
Mono.just(true)
}
.doOnError { exception ->
logger.error("Error sending event: [${event}].", exception)
}
.flatMap {
logger.info(
"Event: [$event] successfully sent with visibility timeout: [${it.value.timeNextVisible}] ms to queue: [${queueAsyncClient.queueName}]"
)
Mono.just(true)
}
.doOnError { exception ->
logger.error("Error sending event: [${event}].", exception)
}
}
}
.onErrorResume {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package it.pagopa.ecommerce.transactions.scheduler.publishers

import com.azure.storage.queue.QueueAsyncClient
import it.pagopa.ecommerce.commons.client.QueueAsyncClient
import it.pagopa.ecommerce.commons.documents.v1.*
import it.pagopa.ecommerce.commons.domain.v1.TransactionWithClosureError
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransaction
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransactionWithCancellationRequested
import it.pagopa.ecommerce.commons.domain.v1.pojos.BaseTransactionWithRequestedAuthorization
import it.pagopa.ecommerce.commons.generated.server.model.TransactionStatusDto
import it.pagopa.ecommerce.commons.queues.TracingUtils
import it.pagopa.ecommerce.transactions.scheduler.repositories.TransactionsEventStoreRepository
import it.pagopa.ecommerce.transactions.scheduler.repositories.TransactionsViewRepository
import org.slf4j.Logger
Expand All @@ -28,13 +29,15 @@ class TransactionExpiredEventPublisher(
@Value("\${pendingTransactions.batch.transactionsAnalyzer.parallelEventsToProcess}")
private val parallelEventToProcess: Int,
@Value("\${azurestorage.queues.transientQueues.ttlSeconds}")
private val transientQueueTTLSeconds: Int
private val transientQueueTTLSeconds: Int,
@Autowired private val tracingUtils: TracingUtils
) :
EventPublisher<TransactionExpiredEvent>(
queueAsyncClient = expiredEventQueueAsyncClient,
logger = logger,
parallelEventsToProcess = parallelEventToProcess,
transientQueueTTLSeconds = transientQueueTTLSeconds
transientQueueTTLSeconds = transientQueueTTLSeconds,
tracingUtils = tracingUtils
) {

fun publishExpiryEvents(
Expand Down
Loading