diff --git a/src/main/java/net/dancier/dancer/core/events/ProfileUpdateEventListener.java b/src/main/java/net/dancier/dancer/core/events/ApplicationEventListener.java similarity index 81% rename from src/main/java/net/dancier/dancer/core/events/ProfileUpdateEventListener.java rename to src/main/java/net/dancier/dancer/core/events/ApplicationEventListener.java index 2efa6fb..914573c 100644 --- a/src/main/java/net/dancier/dancer/core/events/ProfileUpdateEventListener.java +++ b/src/main/java/net/dancier/dancer/core/events/ApplicationEventListener.java @@ -15,13 +15,17 @@ import org.springframework.stereotype.Component; import java.net.URI; +import java.time.OffsetDateTime; import java.util.UUID; @Component @RequiredArgsConstructor -public class ProfileUpdateEventListener { +public class ApplicationEventListener { - public static final Logger log = LoggerFactory.getLogger(ProfileUpdateEventListener.class); + public static final Logger log = LoggerFactory.getLogger(ApplicationEventListener.class); + + private static final URI FRONTEND_SOURCE = URI.create("http://dancier.net"); + private static final URI BACKEND_SOURCE = URI.create("http://dancer.dancier.net"); private final EventlogService eventlogService; @@ -43,8 +47,9 @@ public void handle(ProfileUpdatedEvent profileUpdatedEvent) { CloudEvent cloudEvent = CloudEventBuilder .v1() .withId(UUID.randomUUID().toString()) - .withSource(URI.create("F")) + .withSource(BACKEND_SOURCE) .withType("profile-updated") + .withTime(OffsetDateTime.now()) .withData(objectMapper.writeValueAsBytes(profileUpdatedEvent)).build(); scheduleMessageAdapter.schedule(cloudEvent, profileUpdatedEvent.getDancer().getId().toString()); } catch (JsonProcessingException jpe) { diff --git a/src/main/java/net/dancier/dancer/messaging/ScheduleMessageAdapter.java b/src/main/java/net/dancier/dancer/messaging/ScheduleMessageAdapter.java index c550639..5dbf5cb 100644 --- a/src/main/java/net/dancier/dancer/messaging/ScheduleMessageAdapter.java +++ b/src/main/java/net/dancier/dancer/messaging/ScheduleMessageAdapter.java @@ -1,5 +1,7 @@ package net.dancier.dancer.messaging; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.cloudevents.CloudEvent; import lombok.RequiredArgsConstructor; import net.dancier.dancer.eventlog.ScheduleMessagePort; @@ -14,14 +16,20 @@ public class ScheduleMessageAdapter implements ScheduleMessagePort { public static final Logger log = LoggerFactory.getLogger(ScheduleMessageAdapter.class); - private final KafkaTemplate kafkaTemplate; + private final OutboxJpaRepository outboxJpaRepository; + + private final ObjectMapper objectMapper; @Override public void schedule(CloudEvent cloudEvent, String key) { log.info("sending object: " + cloudEvent); log.info("with key:" + key); - kafkaTemplate.send(cloudEvent.getType(), - key, - cloudEvent); + OutboxJpaEntity outboxJpaEntity = new OutboxJpaEntity(); + outboxJpaEntity.setData(objectMapper.convertValue(cloudEvent, JsonNode.class)); + outboxJpaEntity.setType(cloudEvent.getType()); + outboxJpaEntity.setKey(key); + outboxJpaEntity.setCreatedAt(cloudEvent.getTime()); + outboxJpaEntity.setStatus(OutboxJpaEntity.STATUS.NEW); + outboxJpaRepository.save(outboxJpaEntity); } }