-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IS-1646: Consume huskelapp-status in personoversiktstatus #316
Changes from 3 commits
2699d5f
3e95834
7c2dfd3
a815ada
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package no.nav.syfo.huskelapp | ||
|
||
import no.nav.syfo.application.database.DatabaseInterface | ||
import no.nav.syfo.huskelapp.domain.Huskelapp | ||
import no.nav.syfo.huskelapp.kafka.COUNT_KAFKA_CONSUMER_HUSKELAPP_READ | ||
import no.nav.syfo.personstatus.db.createPersonOversiktStatus | ||
import no.nav.syfo.personstatus.db.getPersonOversiktStatusList | ||
import no.nav.syfo.personstatus.db.updateHuskelappActive | ||
import org.slf4j.Logger | ||
import org.slf4j.LoggerFactory | ||
|
||
class HuskelappService( | ||
private val database: DatabaseInterface, | ||
) { | ||
fun processHuskelapp(records: List<Huskelapp>) { | ||
database.connection.use { connection -> | ||
records.forEach { huskelapp -> | ||
val existingPersonOversiktStatus = connection.getPersonOversiktStatusList( | ||
fnr = huskelapp.personIdent.value, | ||
).firstOrNull() | ||
|
||
if (existingPersonOversiktStatus == null) { | ||
val personoversiktStatus = huskelapp.toPersonoversiktStatus() | ||
connection.createPersonOversiktStatus( | ||
commit = false, | ||
personOversiktStatus = personoversiktStatus, | ||
) | ||
} else { | ||
connection.updateHuskelappActive( | ||
isHuskelappActive = huskelapp.isActive, | ||
personIdent = huskelapp.personIdent, | ||
) | ||
} | ||
|
||
log.info("Received huskelapp with uuid=${huskelapp.uuid}") | ||
COUNT_KAFKA_CONSUMER_HUSKELAPP_READ.increment() | ||
} | ||
connection.commit() | ||
} | ||
} | ||
|
||
companion object { | ||
val log: Logger = LoggerFactory.getLogger(this::class.java) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package no.nav.syfo.huskelapp.domain | ||
|
||
import no.nav.syfo.domain.PersonIdent | ||
import no.nav.syfo.personstatus.domain.PersonOversiktStatus | ||
import java.util.UUID | ||
|
||
data class Huskelapp( | ||
val uuid: UUID, | ||
val personIdent: PersonIdent, | ||
val isActive: Boolean, | ||
) { | ||
fun toPersonoversiktStatus() = PersonOversiktStatus( | ||
fnr = personIdent.value, | ||
).copy( | ||
huskelappActive = isActive, | ||
) | ||
Comment on lines
+12
to
+16
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gjorde sånn i stedet for å "fylle ut" det store objektet, nå som vi har default-values. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Hadde nok vært fint og ryddet litt opp i det ja, men får ta det i en egen PR. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package no.nav.syfo.huskelapp.kafka | ||
|
||
import no.nav.syfo.huskelapp.HuskelappService | ||
import no.nav.syfo.kafka.KafkaConsumerService | ||
import org.apache.kafka.clients.consumer.ConsumerRecords | ||
import org.apache.kafka.clients.consumer.KafkaConsumer | ||
import org.slf4j.LoggerFactory | ||
import java.time.Duration | ||
|
||
class HuskelappConsumer( | ||
private val huskelappService: HuskelappService | ||
) : KafkaConsumerService<KafkaHuskelapp> { | ||
|
||
override val pollDurationInMillis: Long = 1000 | ||
|
||
override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, KafkaHuskelapp>) { | ||
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis)) | ||
if (records.count() > 0) { | ||
processRecords(records) | ||
kafkaConsumer.commitSync() | ||
} | ||
} | ||
|
||
private fun processRecords( | ||
consumerRecords: ConsumerRecords<String, KafkaHuskelapp>, | ||
) { | ||
val (tombstones, validRecords) = consumerRecords.partition { it.value() == null } | ||
|
||
if (tombstones.isNotEmpty()) { | ||
val numberOfTombstones = tombstones.size | ||
log.error("Value of $numberOfTombstones ConsumerRecord are null, most probably due to a tombstone. Contact the owner of the topic if an error is suspected") | ||
COUNT_KAFKA_CONSUMER_HUSKELAPP_TOMBSTONE.increment(numberOfTombstones.toDouble()) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Vi sender vel ikke tombstones på denne topicen, så vet ikke om vi trenger dette? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nei, jeg tenker vi kan fjerne det. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Endret nå, prøvde å ta i bruk en There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
|
||
huskelappService.processHuskelapp( | ||
records = validRecords.map { it.value().toHuskelapp() } | ||
) | ||
} | ||
|
||
companion object { | ||
private val log = LoggerFactory.getLogger(HuskelappConsumer::class.java) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
package no.nav.syfo.huskelapp.kafka | ||
|
||
import io.micrometer.core.instrument.Counter | ||
import no.nav.syfo.metric.METRICS_NS | ||
import no.nav.syfo.metric.METRICS_REGISTRY | ||
|
||
const val KAFKA_CONSUMER_HUSKELAPP_BASE = "${METRICS_NS}_kafka_consumer_huskelapp" | ||
const val KAFKA_CONSUMER_HUSKELAPP_READ = "${KAFKA_CONSUMER_HUSKELAPP_BASE}_read" | ||
const val KAFKA_CONSUMER_HUSKELAPP_TOMBSTONE = "${KAFKA_CONSUMER_HUSKELAPP_BASE}_tombstone" | ||
|
||
val COUNT_KAFKA_CONSUMER_HUSKELAPP_READ: Counter = Counter.builder(KAFKA_CONSUMER_HUSKELAPP_READ) | ||
.description("Counts the number of reads from topic - huskelapp") | ||
.register(METRICS_REGISTRY) | ||
val COUNT_KAFKA_CONSUMER_HUSKELAPP_TOMBSTONE: Counter = Counter.builder(KAFKA_CONSUMER_HUSKELAPP_TOMBSTONE) | ||
.description("Counts the number of tombstones received from topic - huskelapp") | ||
.register(METRICS_REGISTRY) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package no.nav.syfo.huskelapp.kafka | ||
|
||
import no.nav.syfo.application.ApplicationState | ||
import no.nav.syfo.application.database.database | ||
import no.nav.syfo.application.kafka.KafkaEnvironment | ||
import no.nav.syfo.application.kafka.kafkaAivenConsumerConfig | ||
import no.nav.syfo.huskelapp.HuskelappService | ||
import no.nav.syfo.kafka.launchKafkaTask | ||
import no.nav.syfo.util.configuredJacksonMapper | ||
import org.apache.kafka.clients.consumer.ConsumerConfig | ||
import org.apache.kafka.common.serialization.Deserializer | ||
import java.util.* | ||
|
||
const val HUSKELAPP_TOPIC = | ||
"teamsykefravr.huskelapp" | ||
|
||
fun launchHuskelappConsumer( | ||
applicationState: ApplicationState, | ||
kafkaEnvironment: KafkaEnvironment, | ||
) { | ||
val huskelappService = HuskelappService( | ||
database = database | ||
) | ||
val huskelappConsumer = HuskelappConsumer( | ||
huskelappService = huskelappService, | ||
) | ||
val consumerProperties = Properties().apply { | ||
putAll(kafkaAivenConsumerConfig(kafkaEnvironment = kafkaEnvironment)) | ||
this[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = "1" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Én av gangen, eller skulle vi tatt flere? 🤷🏼♂️ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ser vi går for 1 noen steder og 10 andre steder... husker ikke om det var noen spesielle grunner til at man burde begrense til 1 🤔 Kanskje noe samtidighets-problematikk. Går jo an å prøve 10 (eller enda flere) så skrur vi heller ned hvis det blir et problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okey 👍🏼 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ja, i dette tilfellet går det sikkert bra med 10 eller flere. Vi bør begrense det til 1 dersom konsumeringen gjør noe mer enn å skrive til databasen, feks lager en ny record på et annet topic eller skriver til MQ eller noe annet "eksternt". |
||
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaHuskelappDeserializer::class.java.canonicalName | ||
} | ||
launchKafkaTask( | ||
applicationState = applicationState, | ||
topic = HUSKELAPP_TOPIC, | ||
consumerProperties = consumerProperties, | ||
kafkaConsumerService = huskelappConsumer | ||
) | ||
} | ||
|
||
class KafkaHuskelappDeserializer : Deserializer<KafkaHuskelapp> { | ||
private val mapper = configuredJacksonMapper() | ||
override fun deserialize(topic: String, data: ByteArray): KafkaHuskelapp = | ||
mapper.readValue(data, KafkaHuskelapp::class.java) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package no.nav.syfo.huskelapp.kafka | ||
|
||
import no.nav.syfo.domain.PersonIdent | ||
import no.nav.syfo.huskelapp.domain.Huskelapp | ||
import java.time.OffsetDateTime | ||
import java.util.UUID | ||
|
||
data class KafkaHuskelapp( | ||
val uuid: UUID, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Så at det var There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tror det skal gå bra, men vi får teste i dev 💥 |
||
val personIdent: String, | ||
val veilederIdent: String, | ||
val tekst: String, | ||
val isActive: Boolean, | ||
val createdAt: OffsetDateTime, | ||
val updatedAt: OffsetDateTime, | ||
) { | ||
fun toHuskelapp() = Huskelapp( | ||
uuid = uuid, | ||
personIdent = PersonIdent(personIdent), | ||
isActive = isActive, | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hvordan kan man evt implementere Repository-konseptet i slike consumers der vi ønsker å ha én connection oppe hele tiden? Jeg føler jo at logikken som skjer inne i forEachen hører hjemme i en service 🤷🏼♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Måtte kanskje hatt en repository-metode ala
createOrUpdatePersonoppgaver(huskelapp: List<Huskelapp>)
, men lit enig i at den logikken hører mer hjemme her...