Skip to content

Commit

Permalink
Move domain-logic for createOrUpdate from kafka-consumer to service
Browse files Browse the repository at this point in the history
  • Loading branch information
eirikdahlen committed Jul 25, 2023
1 parent e71296c commit 985e859
Show file tree
Hide file tree
Showing 16 changed files with 322 additions and 487 deletions.
14 changes: 14 additions & 0 deletions src/main/kotlin/no/nav/syfo/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import no.nav.syfo.application.cache.RedisStore
import no.nav.syfo.application.database.database
import no.nav.syfo.application.database.databaseModule
import no.nav.syfo.client.azuread.AzureAdClient
import no.nav.syfo.client.pdl.PdlClient
import no.nav.syfo.cronjob.launchCronjobModule
import no.nav.syfo.kafka.launchKafkaModule
import no.nav.syfo.personstatus.PersonoversiktStatusService
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit

Expand All @@ -39,6 +41,16 @@ fun main() {
redisStore = redisStore,
)

val pdlClient = PdlClient(
azureAdClient = azureAdClient,
clientEnvironment = environment.clients.pdl,
)

val personoversiktStatusService = PersonoversiktStatusService(
database = database,
pdlClient = pdlClient,
)

val applicationEngineEnvironment = applicationEngineEnvironment {
log = logger
config = HoconApplicationConfig(ConfigFactory.load())
Expand All @@ -57,6 +69,7 @@ fun main() {
environment = environment,
wellKnownVeilederV2 = wellKnownVeilederV2,
azureAdClient = azureAdClient,
personoversiktStatusService = personoversiktStatusService,
)
}
}
Expand All @@ -77,6 +90,7 @@ fun main() {
applicationState = applicationState,
environment = environment,
azureAdClient = azureAdClient,
personoversiktStatusService = personoversiktStatusService,
)
launchCronjobModule(
applicationState = applicationState,
Expand Down
12 changes: 1 addition & 11 deletions src/main/kotlin/no/nav/syfo/application/api/ApiModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import no.nav.syfo.application.Environment
import no.nav.syfo.application.api.authentication.*
import no.nav.syfo.application.database.DatabaseInterface
import no.nav.syfo.client.azuread.AzureAdClient
import no.nav.syfo.client.pdl.PdlClient
import no.nav.syfo.client.veiledertilgang.VeilederTilgangskontrollClient
import no.nav.syfo.personstatus.PersonTildelingService
import no.nav.syfo.personstatus.PersonoversiktStatusService
Expand All @@ -21,6 +20,7 @@ fun Application.apiModule(
environment: Environment,
wellKnownVeilederV2: WellKnown,
azureAdClient: AzureAdClient,
personoversiktStatusService: PersonoversiktStatusService,
) {
installCallId()
installContentNegotiation()
Expand All @@ -41,16 +41,6 @@ fun Application.apiModule(
database = database,
)

val pdlClient = PdlClient(
azureAdClient = azureAdClient,
clientEnvironment = environment.clients.pdl,
)

val personoversiktStatusService = PersonoversiktStatusService(
database = database,
pdlClient = pdlClient,
)

val tilgangskontrollConsumer = VeilederTilgangskontrollClient(
azureAdClient = azureAdClient,
clientEnvironment = environment.clients.syfotilgangskontroll,
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/no/nav/syfo/kafka/KafkaModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import no.nav.syfo.identhendelse.kafka.launchKafkaTaskIdenthendelse
import no.nav.syfo.oppfolgingstilfelle.kafka.launchKafkaTaskOppfolgingstilfellePerson
import no.nav.syfo.pdlpersonhendelse.kafka.launchKafkaTaskPersonhendelse
import no.nav.syfo.personoppgavehendelse.kafka.launchKafkaTaskPersonoppgavehendelse
import no.nav.syfo.personstatus.PersonoversiktStatusService

fun launchKafkaModule(
applicationState: ApplicationState,
environment: Environment,
azureAdClient: AzureAdClient,
personoversiktStatusService: PersonoversiktStatusService,
) {
launchKafkaTaskPersonoppgavehendelse(
applicationState = applicationState,
kafkaEnvironment = environment.kafka,
personoversiktStatusService = personoversiktStatusService,
)
launchKafkaTaskOppfolgingstilfellePerson(
applicationState = applicationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,5 @@ import no.nav.syfo.personstatus.domain.OversikthendelseType

data class KPersonoppgavehendelse(
val personident: String,
val hendelsetype: String,
val hendelsetype: OversikthendelseType,
)

fun KPersonoppgavehendelse.oversikthendelseType(): OversikthendelseType? =
OversikthendelseType.values().firstOrNull {
it.name == this.hendelsetype
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package no.nav.syfo.personoppgavehendelse.kafka

import no.nav.syfo.application.kafka.KafkaEnvironment
import no.nav.syfo.application.ApplicationState
import no.nav.syfo.application.database.database
import no.nav.syfo.application.kafka.kafkaAivenConsumerConfig
import no.nav.syfo.kafka.launchKafkaTask
import no.nav.syfo.personstatus.PersonoversiktStatusService
import no.nav.syfo.util.configuredJacksonMapper
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Deserializer
Expand All @@ -15,8 +15,9 @@ val PERSONOPPGAVEHENDELSE_TOPIC = "teamsykefravr.personoppgavehendelse"
fun launchKafkaTaskPersonoppgavehendelse(
applicationState: ApplicationState,
kafkaEnvironment: KafkaEnvironment,
personoversiktStatusService: PersonoversiktStatusService,
) {
val personoppgavehendelseService = PersonoppgavehendelseService(database = database)
val personoppgavehendelseConsumerService = PersonoppgavehendelseConsumerService(personoversiktStatusService)
val consumerProperties = Properties().apply {
putAll(kafkaAivenConsumerConfig(kafkaEnvironment = kafkaEnvironment))
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] =
Expand All @@ -26,7 +27,7 @@ fun launchKafkaTaskPersonoppgavehendelse(
applicationState = applicationState,
topic = PERSONOPPGAVEHENDELSE_TOPIC,
consumerProperties = consumerProperties,
kafkaConsumerService = personoppgavehendelseService,
kafkaConsumerService = personoppgavehendelseConsumerService,
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package no.nav.syfo.personoppgavehendelse.kafka

import no.nav.syfo.kafka.KafkaConsumerService
import no.nav.syfo.personstatus.PersonoversiktStatusService
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.time.Duration

class PersonoppgavehendelseConsumerService(
private val personoversiktStatusService: PersonoversiktStatusService,
) : KafkaConsumerService<KPersonoppgavehendelse> {
override val pollDurationInMillis: Long = 100

override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, KPersonoppgavehendelse>) {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))

if (records.count() > 0) {
log.info("TRACE: Received ${records.count()} records")
personoversiktStatusService.createOrUpdatePersonoversiktStatuser(
personoppgavehendelser = records.map { it.value() }
)
kafkaConsumer.commitSync()
}
}

companion object {
private val log = LoggerFactory.getLogger(PersonoppgavehendelseConsumerService::class.java)
}
}

This file was deleted.

Loading

0 comments on commit 985e859

Please sign in to comment.