Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Thumbs.db
#*
*#

# Build Files #
*.log

# Build Files #
bin
target
build
Expand Down
4 changes: 3 additions & 1 deletion assets/03_hands-on.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ Select a trace and view the results. Notice that the trace now extends from the

Let's take it one step further to see what happens when multiple consumers/streams process the same event. The Customer Stream Count can be found on the `solutions` branch so you'll first need to checkout that branch before running the Stream.

> Before continuing, double check that the `tracingEnabled` flag is `true` in [gradle.properties](https://github.com/schroedermatt/stream-processing-workshop/blob/main/gradle.properties#L2)
Before continuing,
* double check that the `tracingEnabled` flag is `true` in [gradle.properties](https://github.com/schroedermatt/stream-processing-workshop/blob/main/gradle.properties#L2)
* kill the "Top Customer Artists" process

```bash
# run from the root of the stream-processing-workshop
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dockerCompose {

tracing {
useComposeFiles = [
'./observability/jaeger/docker-compose.yml'
'./observability/tempo/docker-compose.yml'
]
}
}
Expand Down
5 changes: 4 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
runtimeMode=kafka

# options: true, false
tracingEnabled=false
tracingEnabled=true

# options: true, false
fileLoggingEnabled=true

# initial load volumes (only needed if running the mockdata-daemon)
initialLoadCustomers=20
Expand Down
7 changes: 5 additions & 2 deletions kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'redis.clients:jedis'

implementation 'org.apache.kafka:kafka-clients:3.3.1'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.4.2'
implementation 'org.apache.kafka:kafka-clients:3.4.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
implementation 'net.logstash.logback:logstash-logback-encoder:7.3'
implementation 'org.apache.logging.log4j:log4j-api:2.20.0'
implementation 'org.apache.logging.log4j:log4j-core:2.20.0'
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;

import static net.logstash.logback.argument.StructuredArguments.v;

@Slf4j
@Service
@Profile("kafka")
Expand Down Expand Up @@ -46,16 +48,16 @@ public long customerCount() {

@SneakyThrows
private void produceCustomer(FullCustomer customer) {
log.info("Producing Customer ({}) to Kafka", customer.customer().id());
log.info("Producing Customer ({}) to Kafka", v("customer_id", customer.customer().id()));
kafkaProducer.send(new ProducerRecord<>(topics.customers(), customer.customer().id(), customer.customer())).get();

log.info("Producing Address ({}) for Customer ({}) to Kafka", customer.address().id(), customer.customer().id());
log.info("Producing Address ({}) for Customer ({}) to Kafka", v("address_id", customer.address().id()), v("customer_id", customer.customer().id()));
kafkaProducer.send(new ProducerRecord<>(topics.addresses(), customer.address().id(), customer.address())).get();

log.info("Producing Phone ({}) for Customer ({}) to Kafka", customer.phone().id(), customer.customer().id());
log.info("Producing Phone ({}) for Customer ({}) to Kafka", v("phone_id", customer.phone().id()), v("customer_id", customer.customer().id()));
kafkaProducer.send(new ProducerRecord<>(topics.phones(), customer.phone().id(), customer.phone())).get();

log.info("Producing Email ({}) for Customer ({}) to Kafka", customer.email().id(), customer.customer().id());
log.info("Producing Email ({}) for Customer ({}) to Kafka", v("email_id", customer.email().id()), v("customer_id", customer.customer().id()));
kafkaProducer.send(new ProducerRecord<>(topics.emails(), customer.email().id(), customer.email())).get();
}
}
43 changes: 33 additions & 10 deletions kafka/src/main/java/org/msse/demo/music/KafkaMusicService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.Optional;

import static net.logstash.logback.argument.StructuredArguments.v;

@Slf4j
@Service
@Profile("kafka")
Expand All @@ -32,7 +34,7 @@ public class KafkaMusicService implements MusicService {
public Artist createArtist() {
Artist artist = musicCache.createArtist();

log.info("Producing Artist ({}) to Kafka", artist.id());
log.info("Producing Artist ({}) to Kafka", v("artist_id", artist.id()));
send(topics.artists(), artist.id(), artist);

return artist;
Expand All @@ -42,7 +44,7 @@ public Artist createArtist() {
public Artist createArtist(String artistId) {
Artist artist = musicCache.createArtist(artistId);

log.info("Producing Artist ({}) to Kafka", artist.id());
log.info("Producing Artist ({}) to Kafka", v("artist_id", artist.id()));
send(topics.artists(), artist.id(), artist);

return artist;
Expand All @@ -58,7 +60,9 @@ public Optional<Venue> createVenue() {
Optional<Venue> venue = musicCache.createVenue();

venue.ifPresent(value -> {
log.info("Producing Venue ({}) at Address ({}) to Kafka", value.id(), value.addressid());
log.info("Producing Venue ({}) at Address ({}) to Kafka",
v("venue_id", value.id()),
v("address_id", value.addressid()));
send(topics.venues(), value.id(), value);
});

Expand All @@ -70,7 +74,9 @@ public Optional<Venue> createVenue(String addressId) {
Optional<Venue> venue = musicCache.createVenue(addressId);

venue.ifPresent(value -> {
log.info("Producing Venue ({}) at Address ({}) to Kafka", value.id(), value.addressid());
log.info("Producing Venue ({}) at Address ({}) to Kafka",
v("venue_id", value.id()),
v("address_id", value.addressid()));
send(topics.venues(), value.id(), value);
});

Expand All @@ -87,7 +93,9 @@ public Optional<Event> createEvent() {
Optional<Event> event = musicCache.createEvent();

event.ifPresent(value -> {
log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka", value.id(), value.venueid(), value.artistid());
log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka",
v("event_id", value.id()),
v("venue_id", value.venueid()), value.artistid());
send(topics.events(), value.id(), value);
});

Expand All @@ -99,7 +107,10 @@ public Optional<Event> createEvent(String artistId, String venueId) {
Optional<Event> event = musicCache.createEvent(artistId, venueId);

event.ifPresent(value -> {
log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka", value.id(), value.venueid(), value.artistid());
log.info("Producing Event ({}) at Venue ({}) for Artist ({}) to Kafka",
v("event_id", value.id()),
v("venue_id", value.venueid()),
v("artist_id", value.artistid()));
send(topics.events(), value.id(), value);
});

Expand All @@ -116,7 +127,10 @@ public Optional<Ticket> bookTicket() {
Optional<Ticket> ticket = musicCache.bookTicket();

ticket.ifPresent(value -> {
log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka", value.id(), value.customerid(), value.eventid());
log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka",
v("ticket_id", value.id()),
v("customer_id", value.customerid()),
v("event_id", value.eventid()));
send(topics.tickets(), value.id(), value);
});

Expand All @@ -128,7 +142,10 @@ public Optional<Ticket> bookTicket(String eventId, String customerId) {
Optional<Ticket> ticket = musicCache.bookTicket(eventId, customerId);

ticket.ifPresent(value -> {
log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka", value.id(), value.customerid(), value.eventid());
log.info("Producing Ticket ({}) for Customer ({}) to Event ({}) to Kafka",
v("ticket_id", value.id()),
v("customer_id", value.customerid()),
v("event_id", value.eventid()));
send(topics.tickets(), value.id(), value);
});

Expand All @@ -145,7 +162,10 @@ public Optional<Stream> streamArtist() {
Optional<Stream> stream = musicCache.streamArtist();

stream.ifPresent(value -> {
log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka", value.id(), value.artistid(), value.customerid());
log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka",
v("stream_id", value.id()),
v("artist_id", value.artistid()),
v("customer_id", value.customerid()));
send(topics.streams(), value.id(), value);
});

Expand All @@ -157,7 +177,10 @@ public Optional<Stream> streamArtist(String artistId, String customerId) {
Optional<Stream> stream = musicCache.streamArtist(customerId, artistId);

stream.ifPresent(value -> {
log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka", value.id(), value.artistid(), value.customerid());
log.info("Producing Stream ({}) for Artist ({}) from Customer ({}) to Kafka",
v("stream_id", value.id()),
v("artist_id", value.artistid()),
v("customer_id", value.customerid()));
send(topics.streams(), value.id(), value);
});

Expand Down
8 changes: 8 additions & 0 deletions mockdata-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ dependencies {

implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.logstash.logback:logstash-logback-encoder:7.3'

// conditional processing in the logback file
implementation 'org.codehaus.janino:janino:3.1.9'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}

Expand All @@ -24,6 +27,7 @@ task bootRunAPI(type: BootRun, dependsOn: 'build') {
mainClass = project.mainClassName

doFirst() {

if ("postgres" == project.runtimeMode) {
systemProperty 'spring.profiles.active', "postgres"
} else if ("kafka" == project.runtimeMode) {
Expand All @@ -32,6 +36,10 @@ task bootRunAPI(type: BootRun, dependsOn: 'build') {
systemProperty 'spring.profiles.active', "kafka,ccloud"
}

if ("true" == project.fileLoggingEnabled) {
systemProperty "logback.file-appender.enabled", "true"
}

// configure tracing properties if enabled
if ("true" == project.tracingEnabled) {
systemProperty "otel.traces.exporter", 'otlp'
Expand Down
31 changes: 31 additions & 0 deletions mockdata-api/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%.-4thread] %-5level %logger{6} [%X{trace_id}-%X{span_id}-%X{trace_flags}] %msg%n</pattern>
</encoder>
</appender>

<!-- Activate appender only when 'mySystemProperty' is set to 'true' -->
<if condition='property("logback.file-appender.enabled").contains("true")'>
<then>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>../data-demo.log</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"service":"data-demo"}</customFields>
</encoder>
</appender>

<!-- enable both console & file appender -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</root>
</then>
<else>
<!-- enable only console appender -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</else>
</if>
</configuration>
11 changes: 11 additions & 0 deletions mockdata-daemon/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ dependencies {
implementation project(':kafka')

implementation 'org.springframework.boot:spring-boot-starter'

implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.logstash.logback:logstash-logback-encoder:7.3'

// conditional processing in the logback file
implementation 'org.codehaus.janino:janino:3.1.9'
}

task bootRunDaemon(type: BootRun, dependsOn: 'build') {
Expand All @@ -36,6 +43,10 @@ task bootRunDaemon(type: BootRun, dependsOn: 'build') {
systemProperty 'initial-load.tickets', project.initialLoadTickets
systemProperty 'initial-load.streams', project.initialLoadStreams

if ("true" == project.fileLoggingEnabled) {
systemProperty "logback.file-appender.enabled", "true"
}

// configure tracing properties if enabled
if ("true" == project.tracingEnabled) {
systemProperty "otel.traces.exporter", 'otlp'
Expand Down
31 changes: 31 additions & 0 deletions mockdata-daemon/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%.-4thread] %-5level %logger{6} [%X{trace_id}-%X{span_id}-%X{trace_flags}] %msg%n</pattern>
</encoder>
</appender>

<!-- Activate appender only when 'mySystemProperty' is set to 'true' -->
<if condition='property("logback.file-appender.enabled").contains("true")'>
<then>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>../file.log</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<customFields>{"service":"data-demo"}</customFields>
</encoder>
</appender>

<!-- enable both console & file appender -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</root>
</then>
<else>
<!-- enable only console appender -->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
</root>
</else>
</if>
</configuration>
Loading