Skip to content
This repository has been archived by the owner on Nov 5, 2024. It is now read-only.

Commit

Permalink
Start/stopp hendelse kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
maccyber committed Nov 6, 2023
1 parent f5e6940 commit 33c287a
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 0 deletions.
36 changes: 36 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<common.version>2.2022.09.26_07.11-9c0037f021cb</common.version>
<junit.jupiter.version>5.8.2</junit.jupiter.version>
<mockserver.version>5.13.2</mockserver.version>
<avro.version>1.11.3</avro.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -103,6 +104,26 @@
</dependency>

<!-- ymse -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>no.bekk.bekkopen</groupId>
<artifactId>nocommons</artifactId>
Expand Down Expand Up @@ -339,6 +360,21 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
8 changes: 8 additions & 0 deletions src/main/avro/StartetV1.avdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
@namespace("no.nav.paw.arbeidssokerregisteret.intern.v1")
protocol HendelseStartetV1 {
import idl "metadata-v1.avdl";
record Startet {
string identitetsnummer;
Metadata metadata;
}
}
8 changes: 8 additions & 0 deletions src/main/avro/StoppetV1.avdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
@namespace("no.nav.paw.arbeidssokerregisteret.intern.v1")
protocol HendelseStoppetV1 {
import idl "metadata-v1.avdl";
record Stoppet {
string identitetsnummer;
Metadata metadata;
}
}
11 changes: 11 additions & 0 deletions src/main/avro/bruker-v1.avdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
@namespace("no.nav.paw.arbeidssokerregisteret.intern.v1")
protocol Bruker {
enum BrukerType {
UKJENT_VERDI, UDEFINERT, VEILEDER, SYSTEM, SLUTTBRUKER
} = UKJENT_VERDI;

record Bruker {
BrukerType type;
string id;
}
}
12 changes: 12 additions & 0 deletions src/main/avro/metadata-v1.avdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
@namespace("no.nav.paw.arbeidssokerregisteret.intern.v1")
protocol Endring {
import idl "bruker-v1.avdl";

record Metadata {
@logicalType("timestamp-millis")
long tidspunkt;
Bruker utfoertAv;
string kilde;
string aarsak;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ interface ArbeidssokerperiodeRepository {
fun avsluttPeriode(id: Int, tilDato: LocalDateTime)
fun hentPerioder(foedselsnummer: Foedselsnummer): List<ArbeidssokerperiodeDto>
fun hentPerioder(gjeldendeFoedselsnummer: Foedselsnummer, historiskeFoedselsnummer: List<Foedselsnummer>): List<ArbeidssokerperiodeDto>
fun hentNesteArbeidssokerperioder(): List<ArbeidssokerperiodeDto>
fun settArbeidssokerperioderSomOverfort(listeMedIder: List<Int>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ class ArbeidssokerperiodeService(
.map { Periode(it.fra.toLocalDate(), it.til?.toLocalDate()) }
}

fun hentNesteArbeidssokerperioder(): List<ArbeidssokerperiodeDto> {
return repository.hentNesteArbeidssokerperioder()
}

fun settArbeidssokerperioderSomOverfort(listeMedIder: List<Int>) {
repository.settArbeidssokerperioderSomOverfort(listeMedIder)
}

private fun harAktivPeriode(bruker: Bruker): Boolean {
return hentAktivPeriode(bruker) != null
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package no.nav.fo.veilarbregistrering.arbeidssoker.perioder

import no.nav.common.job.leader_election.LeaderElectionClient
import no.nav.fo.veilarbregistrering.arbeidssoker.ArbeidssokerperiodeService
import no.nav.fo.veilarbregistrering.log.logger
import no.nav.paw.arbeidssokerregisteret.intern.v1.Bruker
import no.nav.paw.arbeidssokerregisteret.intern.v1.BrukerType
import no.nav.paw.arbeidssokerregisteret.intern.v1.Startet
import org.springframework.scheduling.annotation.Scheduled
import java.time.ZoneOffset

class ArbeidssokerperiodeScheduler(
private val leaderElectionClient: LeaderElectionClient,
private val arbeidssokerperiodeService: ArbeidssokerperiodeService,
) {
@Scheduled(cron = "* * * * *")
fun start() {
if (!leaderElectionClient.isLeader) {
return
}
logger.info("Starter jobb for å overføre arbeidssøkerperioder")

val arbeidssokerperioder = arbeidssokerperiodeService.hentNesteArbeidssokerperioder()

if (arbeidssokerperioder.isEmpty()) {
logger.info("Fant ingen arbeidssøkerperioder som skal overføres")
return
}

val startHendelseTilOverforing = arbeidssokerperioder.map {
Startet(
it.foedselsnummer.foedselsnummer,
no.nav.paw.arbeidssokerregisteret.intern.v1.Metadata(
it.fra.toInstant(ZoneOffset.of("Europe/Oslo")),
Bruker(
BrukerType.SLUTTBRUKER,
it.foedselsnummer.foedselsnummer,
),
"veilarbregistrering",
"overføring",
),
)
}

val stoppHendelseTilOverforing = arbeidssokerperioder
.filter { it.til != null }
.map {
no.nav.paw.arbeidssokerregisteret.intern.v1.Stoppet(
it.foedselsnummer.foedselsnummer,
no.nav.paw.arbeidssokerregisteret.intern.v1.Metadata(
it.til!!.toInstant(ZoneOffset.of("Europe/Oslo")),
Bruker(
BrukerType.SLUTTBRUKER,
it.foedselsnummer.foedselsnummer,
),
"veilarbregistrering",
"overføring",
),
)
}
logger.info("Start hendelser til overføring ${startHendelseTilOverforing.size}")
logger.info("Stopp hendelser til overføring ${stoppHendelseTilOverforing.size}")

// arbeidssokerperiodeService.settArbeidssokerperioderSomOverfort(arbeidssokerperioder.map { it.id })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import no.nav.fo.veilarbregistrering.arbeidssoker.meldekort.MeldekortService
import no.nav.fo.veilarbregistrering.arbeidssoker.meldekort.resources.MeldekortResource
import no.nav.fo.veilarbregistrering.arbeidssoker.perioder.ArbeidssokerService
import no.nav.fo.veilarbregistrering.arbeidssoker.perioder.ArbeidssokerperiodeAvsluttetProducer
import no.nav.fo.veilarbregistrering.arbeidssoker.perioder.ArbeidssokerperiodeScheduler
import no.nav.fo.veilarbregistrering.arbeidssoker.perioder.PopulerArbeidssokerperioderService
import no.nav.fo.veilarbregistrering.arbeidssoker.perioder.resources.ArbeidssokerResource
import no.nav.fo.veilarbregistrering.autorisasjon.TilgangskontrollService
Expand Down Expand Up @@ -454,4 +455,12 @@ class ServiceBeansConfig {
): ReparerArenaDataScheduler {
return ReparerArenaDataScheduler(formidlingsgruppeRepository, arbeidssokerperiodeRepository, pdlOppslagGateway, leaderElectionClient)
}

@Bean
fun arbeidssokerperiodeScheduler(
leaderElectionClient: LeaderElectionClient,
arbeidssokerperiodeService: ArbeidssokerperiodeService
): ArbeidssokerperiodeScheduler {
return ArbeidssokerperiodeScheduler(leaderElectionClient, arbeidssokerperiodeService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ class ArbeidssokerperiodeRepositoryImpl(private val db: NamedParameterJdbcTempla
return db.query(sql, params, arbeidssokerperiodeMapper)
}

override fun hentNesteArbeidssokerperioder(): List<ArbeidssokerperiodeDto> {
val sql = "SELECT * FROM $ARBEIDSSOKERPERIODE_TABELL WHERE overfort_kafka IS FALSE ORDER BY fra_og_med LIMIT 1"
return db.query(sql, arbeidssokerperiodeMapper)
}

override fun settArbeidssokerperioderSomOverfort(listeMedIder: List<Int>) {
val sql = "UPDATE $ARBEIDSSOKERPERIODE_TABELL SET overfort_kafka = TRUE WHERE id IN (:listeMedIder)"
val params = mapOf("ListeMedIder" to listeMedIder)
db.update(sql, params)
}

companion object {
const val ARBEIDSSOKERPERIODE_TABELL = "arbeidssokerperiode"
private val arbeidssokerperiodeMapper = RowMapper { rs, _ ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER table arbeidssokerperiode
ADD COLUMN overfort_kafka boolean NOT NULL DEFAULT false;

0 comments on commit 33c287a

Please sign in to comment.