Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tiltak_over_flere_perioder #66

Merged
merged 6 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ data class Oppfolgingsperiode (
val startDato: ZonedDateTime,
val sluttDato: ZonedDateTime?
) {
fun contains(opprettet: ChronoZonedDateTime<*>): Boolean {
val startetIPeriode = opprettet.isAfter(startDato) || opprettet.isEqual(startDato)
val opprettetFørSluttDato = sluttDato == null || sluttDato.isAfter(opprettet)
return startetIPeriode && opprettetFørSluttDato
fun tidspunktInnenforPeriode(tidspunkt: ChronoZonedDateTime<*>): Boolean {
val startetIPeriode = tidspunkt.isAfter(startDato) || tidspunkt.isEqual(startDato)
val foerSluttDato = sluttDato == null || sluttDato.isAfter(tidspunkt)
return startetIPeriode && foerSluttDato
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
package no.nav.arena_tiltak_aktivitet_acl.processors

import ArenaOrdsProxyClient
import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.Oppfolgingsperiode
import no.nav.arena_tiltak_aktivitet_acl.domain.db.IngestStatus
import no.nav.arena_tiltak_aktivitet_acl.domain.db.toUpsertInputWithStatusHandled
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.*
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetKategori
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.AktivitetskortHeaders
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Operation
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.aktivitet.Tiltak
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.ArenaDeltakerKafkaMessage
import no.nav.arena_tiltak_aktivitet_acl.domain.kafka.arena.tiltak.TiltakDeltaker
import no.nav.arena_tiltak_aktivitet_acl.exceptions.DependencyNotIngestedException
import no.nav.arena_tiltak_aktivitet_acl.exceptions.IgnoredException
import no.nav.arena_tiltak_aktivitet_acl.exceptions.OppfolgingsperiodeNotFoundException
import no.nav.arena_tiltak_aktivitet_acl.exceptions.OutOfOrderException
import no.nav.arena_tiltak_aktivitet_acl.processors.converters.ArenaDeltakerConverter
import no.nav.arena_tiltak_aktivitet_acl.repositories.AktivitetDbo
import no.nav.arena_tiltak_aktivitet_acl.repositories.ArenaDataRepository
import no.nav.arena_tiltak_aktivitet_acl.repositories.GjennomforingRepository
import no.nav.arena_tiltak_aktivitet_acl.repositories.PersonSporingDbo
import no.nav.arena_tiltak_aktivitet_acl.services.*
import no.nav.arena_tiltak_aktivitet_acl.services.OppfolgingsperiodeService.Companion.merEnnEnUkeMellom
import no.nav.arena_tiltak_aktivitet_acl.services.OppfolgingsperiodeService.Companion.defaultSlakk
import no.nav.arena_tiltak_aktivitet_acl.services.OppfolgingsperiodeService.Companion.tidspunktTidligereEnnRettFoerStartDato
import no.nav.arena_tiltak_aktivitet_acl.utils.SecureLog.secureLog
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.Month
import java.time.ZonedDateTime
import java.util.*

@Component
open class DeltakerProcessor(
private val arenaDataRepository: ArenaDataRepository,
private val arenaIdTranslationService: TranslationService,
private val ordsClient: ArenaOrdsProxyClient,
private val kafkaProducerService: KafkaProducerService,
private val gjennomforingRepository: GjennomforingRepository,
private val aktivitetService: AktivitetService,
Expand All @@ -55,13 +56,6 @@ open class DeltakerProcessor(
if (deltaker.regDato.isBefore(AKTIVITETSPLAN_LANSERINGSDATO)) {
throw IgnoredException("Deltakeren registrert=${deltaker.regDato} opprettet før aktivitetsplan skal ikke håndteres")
}

// Translation opprettes ikke før denne er kjørt.
val aktivitetId = arenaIdTranslationService.hentEllerOpprettAktivitetId(deltaker.tiltakdeltakerId, AktivitetKategori.TILTAKSAKTIVITET)
val personIdent = personsporingService.get(deltaker.personId, arenaGjennomforingId)?.fodselsnummer ?: ordsClient.hentFnr(deltaker.personId)
?: throw IllegalStateException("Expected person with personId=${deltaker.personId} to exist")
personsporingService.upsert(PersonSporingDbo(personIdent = deltaker.personId, fodselsnummer = personIdent, tiltakgjennomforingId = arenaGjennomforingId))

val ingestStatus: IngestStatus? = runCatching {
arenaDataRepository.get(
message.arenaTableName,
Expand All @@ -84,39 +78,65 @@ open class DeltakerProcessor(
throw IgnoredException("Deltakeren har status=${arenaDeltaker.DELTAKERSTATUSKODE} og administrasjonskode=${tiltak.administrasjonskode} som ikke skal håndteres")
}

val aktivitetskort = aktivitetService.get(aktivitetId)
val erNyAktivitet = aktivitetskort != null
// Translation opprettes ikke her lenger
val eksisterendeAktivitetsId = arenaIdTranslationService.hentAktivitetIdForArenaId(deltaker.tiltakdeltakerId, AktivitetKategori.TILTAKSAKTIVITET)
val erNyDeltakelse = (eksisterendeAktivitetsId == null)

val personIdent = personsporingService.get(deltaker.personId, arenaGjennomforingId).fodselsnummer

/*
Hvis oppfølgingsperiode ikke finnes,
hopper vi ut her, enten med retry eller ignored, siden handleOppfolgingsperiodeNull kaster exception alltid.
Dette er viktig for å ikke opprette ny aktivitetsid før vi faktisk lagrer et aktivitetskort.
*/
val oppfolgingsperiodePaaEndringsTidspunkt = getOppfolgingsPeriodeOrThrow(deltaker, personIdent, deltaker.modDato ?: deltaker.regDato, deltaker.tiltakdeltakerId)


val (skalOppretteNyAktivitet, faktiskAktivitetsId) =
if (!erNyDeltakelse) {
val gammeltAktivitetskort = aktivitetService.get(eksisterendeAktivitetsId!!)!!
if (oppfolgingsperiodePaaEndringsTidspunkt!!.uuid != gammeltAktivitetskort.oppfolgingsperiodeUUID) {
// Har har det kommet en endring på kortet under en annen oppfølgingsperiode enn den opprinnelige oppfølgingsperioden. Vi oppretter et helt nytt aktivitetskort.
secureLog.info("Endring på deltakelse ${deltaker.tiltakdeltakerId} fra oppfølgingperiode ${gammeltAktivitetskort.oppfolgingsperiodeUUID} til oppfølgingsperiode ${oppfolgingsperiodePaaEndringsTidspunkt}. " +
"Oppretter nytt aktivitetskort for personIdent $personIdent og endrer eksisterende translation entry")
val nyAktivitetsId = UUID.randomUUID()
arenaIdTranslationService.oppdaterAktivitetId( eksisterendeAktivitetsId, nyAktivitetsId)
true to nyAktivitetsId
} else {
// oppfølgingsperiode har ikke endret seg (happy case)
false to eksisterendeAktivitetsId
}
} else { // Ny aktivitet
true to arenaIdTranslationService.opprettAktivitetsId(deltaker.tiltakdeltakerId, AktivitetKategori.TILTAKSAKTIVITET)
}


val fallbackGjennomforingNavn = "Ukjent navn"

val aktivitet = ArenaDeltakerConverter
.convertToTiltaksaktivitet(
deltaker = deltaker,
aktivitetId = aktivitetId,
aktivitetId = faktiskAktivitetsId,
personIdent = personIdent,
arrangorNavn = gjennomforing.arrangorNavn,
gjennomforingNavn = gjennomforing.navn ?: fallbackGjennomforingNavn,
tiltak = tiltak,
erNyAktivitet = erNyAktivitet,
erNyAktivitet = skalOppretteNyAktivitet,
)

val oppfolgingsperiode = aktivitetskort?.oppfolgingsPeriode()
?: getOppfolgingsPeriodeOrThrow(aktivitet, deltaker.regDato, deltaker.tiltakdeltakerId)

val aktivitetskortHeaders = AktivitetskortHeaders(
arenaId = KafkaProducerService.TILTAK_ID_PREFIX + deltaker.tiltakdeltakerId.toString(),
tiltakKode = tiltak.kode,
oppfolgingsperiode = oppfolgingsperiode.id,
oppfolgingsSluttDato = oppfolgingsperiode.oppfolgingsSluttDato
oppfolgingsperiode = oppfolgingsperiodePaaEndringsTidspunkt!!.uuid,
oppfolgingsSluttDato = oppfolgingsperiodePaaEndringsTidspunkt.sluttDato
)
val outgoingMessage = aktivitet.toKafkaMessage()
kafkaProducerService.sendTilAktivitetskortTopic(
aktivitet.id,
outgoingMessage,
aktivitetskortHeaders
)
secureLog.info("Melding for deltaker id=$aktivitetId arenaId=${deltaker.tiltakdeltakerId} personId=${deltaker.personId} fnr=$personIdent er sendt")
log.info("Melding id=${outgoingMessage.messageId} arenaId=${deltaker.tiltakdeltakerId} type=${outgoingMessage.actionType} er sendt")
secureLog.info("Melding for aktivitetskort id=$faktiskAktivitetsId arenaId=${deltaker.tiltakdeltakerId} personId=${deltaker.personId} fnr=$personIdent er sendt")
log.info("Melding id=${outgoingMessage.messageId} aktivitetskort id=$faktiskAktivitetsId arenaId=${deltaker.tiltakdeltakerId} type=${outgoingMessage.actionType} er sendt")
aktivitetService.upsert(aktivitet, aktivitetskortHeaders)
arenaDataRepository.upsert(message.toUpsertInputWithStatusHandled(deltaker.tiltakdeltakerId))
}
Expand All @@ -128,33 +148,28 @@ open class DeltakerProcessor(
&& administrasjonskode in listOf(Tiltak.Administrasjonskode.IND, Tiltak.Administrasjonskode.INST)
}

data class AktivitetskortOppfolgingsperiode(
val id: UUID,
val oppfolgingsSluttDato: ZonedDateTime?
)
private fun getOppfolgingsPeriodeOrThrow(aktivitet: Aktivitetskort, opprettetTidspunkt: LocalDateTime, tiltakDeltakerId: Long): AktivitetskortOppfolgingsperiode {
val personIdent = aktivitet.personIdent
val oppfolgingsperiode = oppfolgingsperiodeService.finnOppfolgingsperiode(personIdent, opprettetTidspunkt)
?.let { AktivitetskortOppfolgingsperiode(it.uuid , it.sluttDato) }
if (oppfolgingsperiode == null) {
secureLog.info("Fant ikke oppfølgingsperiode for personIdent=${personIdent}")
val aktivitetStatus = aktivitet.aktivitetStatus
val erFerdig = aktivitet.sluttDato?.isBefore(LocalDate.now()) ?: false
when {
aktivitetStatus.erAvsluttet() || erFerdig ->
throw IgnoredException("Avsluttet deltakelse og ingen oppfølgingsperiode, id=${tiltakDeltakerId}")
merEnnEnUkeMellom(opprettetTidspunkt, LocalDateTime.now()) ->
throw IgnoredException("Opprettet for over 1 uke siden og ingen oppfølgingsperiode, id=${tiltakDeltakerId}")
else -> throw OppfolgingsperiodeNotFoundException("Pågående deltakelse opprettetTidspunkt=${opprettetTidspunkt}, oppfølgingsperiode ikke startet/oppfolgingsperiode eldre enn en uke, id=${tiltakDeltakerId}")
}
} else {
return oppfolgingsperiode
private fun handleOppfolgingsperiodeNull(deltaker: TiltakDeltaker, personIdent: String, tidspunkt: LocalDateTime, tiltakDeltakerId: Long) {
secureLog.info("Fant ikke oppfølgingsperiode for personIdent=$personIdent")
val aktivitetStatus = ArenaDeltakerConverter.toAktivitetStatus(deltaker.deltakerStatusKode)
val erFerdig = deltaker.datoTil?.isBefore(LocalDate.now()) ?: false
when {
aktivitetStatus.erAvsluttet() || erFerdig ->
throw IgnoredException("Avsluttet deltakelse og ingen oppfølgingsperiode, id=${tiltakDeltakerId}")
tidspunktTidligereEnnRettFoerStartDato(tidspunkt, LocalDateTime.now(), defaultSlakk) ->
throw IgnoredException("Opprettet for mer enn $defaultSlakk siden og ingen oppfølgingsperiode, id=${tiltakDeltakerId}")
else -> throw OppfolgingsperiodeNotFoundException("Deltakelse endret tidspunkt=${tidspunkt}, Finner ingen passende oppfølgingsperiode, id=${tiltakDeltakerId}")
}
}
}

fun AktivitetDbo.oppfolgingsPeriode() = this.oppfolgingsperiodeUUID?.let {
DeltakerProcessor.AktivitetskortOppfolgingsperiode(it, this.oppfolgingsSluttTidspunkt)
private fun getOppfolgingsPeriodeOrThrow(deltaker: TiltakDeltaker, personIdent: String, tidspunkt: LocalDateTime, tiltakDeltakerId: Long): Oppfolgingsperiode? {
val oppfolgingsperiode = oppfolgingsperiodeService.finnOppfolgingsperiode(personIdent, tidspunkt)
return if (oppfolgingsperiode == null) {
handleOppfolgingsperiodeNull(deltaker, personIdent, tidspunkt, tiltakDeltakerId) // throws always
null
} else oppfolgingsperiode
}
}




Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ open class GjennomforingProcessor(

arenaDataRepository.upsert(message.toUpsertInputWithStatusHandled(gjennomforing.arenaId))
gjennomforingRepository.upsert(gjennomforingDbo)
log.info("Melding for gjennomføring " +
"arenaId=${gjennomforing.arenaId} er håndtert")
log.info("Upsert av gjennomføring " +
"arenaId=${gjennomforing.arenaId} navn=${gjennomforing.lokaltNavn} tiltakkode=${gjennomforing.tiltakKode}")
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.springframework.dao.DuplicateKeyException
import org.springframework.jdbc.core.RowMapper
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.stereotype.Component
import java.util.*

@Component
open class TranslationRepository(
Expand Down Expand Up @@ -35,6 +36,19 @@ open class TranslationRepository(
}
}

/*
aktivitet_id er primærnøkkel, men har ingen fremmednøkler knyttet til seg, så update går fint.
*/
fun updateAktivitetId(oldAktivitetId: UUID, newAktivitetId: UUID) {
val sql = """
UPDATE translation SET aktivitet_id = :newAktivitetId where aktivitet_id = :oldAktivitetId
""".trimIndent()
val parameters = sqlParameters(
"oldAktivitetId" to oldAktivitetId,
"newAktivitetId" to newAktivitetId)
template.update(sql, parameters)
}

fun get(arenaId: Long, aktivitetKategori: AktivitetKategori): TranslationDbo? {
val sql = """
SELECT *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ open class ArenaMessageProcessorService(
arenaDataRepository.upsert(msg.toUpsertInput(arenaId, ingestStatus = IngestStatus.FAILED, note = e.message))
}
is OppfolgingsperiodeNotFoundException -> {
log.info("Oppfolgingsperiode not found for $arenaId in table $arenaTableName: '${e.message}'")
log.info("Oppfolgingsperiode not found for deltakerId: $arenaId in table $arenaTableName: '${e.message}'")
arenaDataRepository.upsert(msg.toUpsertInput(arenaId, ingestStatus = IngestStatus.RETRY, note = e.message))
}
else -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,61 @@ package no.nav.arena_tiltak_aktivitet_acl.services

import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.OppfolgingClient
import no.nav.arena_tiltak_aktivitet_acl.clients.oppfolging.Oppfolgingsperiode
import org.slf4j.LoggerFactory
import no.nav.arena_tiltak_aktivitet_acl.utils.SecureLog.secureLog
import org.springframework.stereotype.Service
import java.time.Duration
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.chrono.ChronoZonedDateTime
import java.time.temporal.ChronoUnit
import java.time.temporal.TemporalAmount
import kotlin.math.abs

@Service
open class OppfolgingsperiodeService(
private val oppfolgingClient: OppfolgingClient
) {
private val log = LoggerFactory.getLogger(javaClass)

companion object {
fun mindreEnnEnUkeMellom(opprettetTidspunkt: LocalDateTime, periodeStartDato: LocalDateTime): Boolean {
return opprettetTidspunkt.plusDays(7).isAfter(periodeStartDato)
fun tidspunktRettFoerStartDatoEllerSenere(tidspunkt: LocalDateTime, startDato: LocalDateTime, slakk: TemporalAmount): Boolean {
return tidspunkt.plus(slakk).isAfter(startDato)
}
fun merEnnEnUkeMellom(opprettetTidspunkt: LocalDateTime, periodeStartDato: LocalDateTime): Boolean {
return !mindreEnnEnUkeMellom(opprettetTidspunkt, periodeStartDato)
fun tidspunktTidligereEnnRettFoerStartDato(tidspunkt: LocalDateTime, startDato: LocalDateTime, slakk: TemporalAmount): Boolean {
return !tidspunktRettFoerStartDatoEllerSenere(tidspunkt, startDato, slakk)
}
val defaultSlakk = Duration.of(7, ChronoUnit.DAYS)
}


fun finnOppfolgingsperiode(fnr: String, opprettetTidspunkt: LocalDateTime): Oppfolgingsperiode? {
fun finnOppfolgingsperiode(fnr: String, tidspunkt: LocalDateTime): Oppfolgingsperiode? {
val oppfolgingsperioder = oppfolgingClient.hentOppfolgingsperioder(fnr)
.sortedByDescending { it.startDato }
if (oppfolgingsperioder.isEmpty()) {
log.info(
"Arenatiltak finn oppfølgingsperiode - bruker har ingen oppfølgingsperioder - fnr={}, opprettetTidspunkt={}, oppfolgingsperioder={}",
fnr, opprettetTidspunkt, listOf<Oppfolgingsperiode>()
secureLog.info(
"Arenatiltak finn oppfølgingsperiode - bruker har ingen oppfølgingsperioder - fnr={}, tidspunkt={}, oppfolgingsperioder={}",
fnr, tidspunkt, listOf<Oppfolgingsperiode>()
)
return null
}

val opprettetTidspunktCZDT = ChronoZonedDateTime.from(opprettetTidspunkt.atZone(ZoneId.systemDefault()))
val tidspunktCZDT = ChronoZonedDateTime.from(tidspunkt.atZone(ZoneId.systemDefault()))
val oppfolgingsperiode = oppfolgingsperioder
.find {periode -> periode.contains(opprettetTidspunktCZDT) }
.find {periode -> periode.tidspunktInnenforPeriode(tidspunktCZDT) }

return oppfolgingsperiode ?: oppfolgingsperioder
.filter { it.sluttDato == null || it.sluttDato.isAfter(opprettetTidspunktCZDT) }
.minByOrNull { abs(ChronoUnit.MILLIS.between(opprettetTidspunktCZDT, it.startDato)) }
.filter { it.sluttDato == null || it.sluttDato.isAfter(tidspunktCZDT) }
.minByOrNull { abs(ChronoUnit.MILLIS.between(tidspunktCZDT, it.startDato)) }
.let { periodeMatch ->
if (periodeMatch == null || !mindreEnnEnUkeMellom(opprettetTidspunkt, periodeMatch.startDato.toLocalDateTime())) {
log.info(
"Arenatiltak finn oppfølgingsperiode - opprettetTidspunkt har ingen god match på oppfølgingsperioder) - fnr={}, opprettetTidspunkt={}, oppfolgingsperioder={}",
fnr, opprettetTidspunkt, oppfolgingsperioder
if (periodeMatch == null || !tidspunktRettFoerStartDatoEllerSenere(tidspunkt, periodeMatch.startDato.toLocalDateTime(), defaultSlakk)) {
secureLog.info(
"Arenatiltak finn oppfølgingsperiode - tidspunkt har ingen god match på oppfølgingsperioder) - fnr={}, tidspunkt={}, oppfolgingsperioder={}",
fnr, tidspunkt, oppfolgingsperioder
)
null
} else {
log.info(
"Arenatiltak finn oppfølgingsperiode - opprettetdato innen 1 uke oppfølging startdato) - fnr={}, opprettetTidspunkt={}, oppfolgingsperioder={}",
fnr, opprettetTidspunkt, oppfolgingsperioder
secureLog.info(
"Arenatiltak finn oppfølgingsperiode - tidspunkt innen {} oppfølging startdato) - fnr={}, tidspunkt={}, oppfolgingsperioder={}",
defaultSlakk, fnr, tidspunkt, oppfolgingsperioder
)
periodeMatch
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
package no.nav.arena_tiltak_aktivitet_acl.services

import ArenaOrdsProxyClient
import no.nav.arena_tiltak_aktivitet_acl.repositories.PersonSporingDbo
import no.nav.arena_tiltak_aktivitet_acl.repositories.PersonSporingRepository
import org.springframework.stereotype.Service

@Service
class PersonsporingService (
val personSporingRepository: PersonSporingRepository
val personSporingRepository: PersonSporingRepository,
private val ordsClient: ArenaOrdsProxyClient
) {
fun upsert(personSporingDbo: PersonSporingDbo) = personSporingRepository.upsert(personSporingDbo)
fun get(personId: Long, gjennomforingId: Long) = personSporingRepository.get(personId, gjennomforingId)
fun get(arenaPersonId: Long, gjennomforingId: Long): PersonSporingDbo {
val personSporingDbo = personSporingRepository.get(arenaPersonId, gjennomforingId)
if (personSporingDbo != null) {
return personSporingDbo
}
val fnr = ordsClient.hentFnr(arenaPersonId) ?: throw IllegalStateException("Expected person with personId=${arenaPersonId} to exist")
val newDbo = PersonSporingDbo(arenaPersonId, fnr, gjennomforingId)
personSporingRepository.upsert(newDbo)
return PersonSporingDbo(arenaPersonId, fnr, gjennomforingId)
}
}
Loading