Skip to content

Commit

Permalink
Add user course points realtime consumer; scale batch consumer down (#…
Browse files Browse the repository at this point in the history
…1185)

* add user course points realtime consumer; scale batch consumer down

* fix helm chart config key
  • Loading branch information
mipyykko committed May 12, 2023
1 parent 79d5e05 commit baa62a1
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Message as UserPointsMessage } from "../common/userPoints/interfaces"
import { Message as UserPointsMessage } from "../userPoints/interfaces"

export interface Message {
timestamp: string
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { ok } from "../../../util/result"
import { KafkaContext } from "../common/kafkaContext"
import { checkCompletion } from "../common/userFunctions"
import { ok } from "../../../../util/result"
import { KafkaContext } from "../kafkaContext"
import { checkCompletion } from "../userFunctions"
import {
createExerciseCompletion,
getCreatedAndUpdatedExerciseCompletions,
pruneExerciseCompletions,
updateExerciseCompletion,
} from "../common/userPoints/exerciseCompletionFunctions"
import { getCourse, getTimestamp, getUser } from "../common/userPoints/util"
} from "../userPoints/exerciseCompletionFunctions"
import { getCourse, getTimestamp, getUser } from "../userPoints/util"
import { Message } from "./interfaces"

export const saveToDatabase = async (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as yup from "yup"

import { MessageYupSchema as UserPointsMessageYupSchema } from "../common/userPoints/validate"
import { MessageYupSchema as UserPointsMessageYupSchema } from "../userPoints/validate"

const CURRENT_MESSAGE_FORMAT_VERSION = 1

Expand Down
3 changes: 3 additions & 0 deletions backend/bin/kafkaConsumer/kafkaConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ const kafkaConfig = {
user_course_points_consumer: {
topic_name: "user-course-points-batch",
},
user_course_points_realtime_consumer: {
topic_name: "user-course-points-realtime",
},
commit_interval: 50,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import { createKafkaConsumer } from "../common/createKafkaConsumer"
import { handleMessage } from "../common/handleMessage"
import { KafkaContext } from "../common/kafkaContext"
import { handledRecently, setHandledRecently } from "../common/messageHashCache"
import { saveToDatabase } from "../common/userCoursePoints/saveToDB"
import { MessageYupSchema } from "../common/userCoursePoints/validate"
import config from "../kafkaConfig"
import { saveToDatabase } from "./saveToDB"
import { MessageYupSchema } from "./validate"

const TOPIC_NAME = [config.user_course_points_consumer.topic_name]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Message as KafkaMessage, LibrdKafkaError } from "node-rdkafka"

import { Mutex } from "../../../lib/await-semaphore"
import { KafkaError } from "../../../lib/errors"
import sentryLogger from "../../../lib/logger"
import prisma from "../../../prisma"
import knex from "../../../services/knex"
import { createKafkaConsumer } from "../common/createKafkaConsumer"
import { handleMessage } from "../common/handleMessage"
import { KafkaContext } from "../common/kafkaContext"
import { Message } from "../common/userCoursePoints/interfaces"
import { saveToDatabase } from "../common/userCoursePoints/saveToDB"
import { MessageYupSchema } from "../common/userCoursePoints/validate"
import config from "../kafkaConfig"

const TOPIC_NAME = [config.user_course_points_realtime_consumer.topic_name]

const mutex = new Mutex()

const logger = sentryLogger({
service: "kafka-consumer-user-course-points-realtime",
})
const consumer = createKafkaConsumer({ logger, prisma })

consumer.connect()

const context: KafkaContext = {
prisma,
logger,
mutex,
consumer,
knex,
topic_name: TOPIC_NAME[0],
}

consumer.on("ready", () => {
logger.info("Ready to consume")
consumer.subscribe(TOPIC_NAME)

const consumerImpl = async (
error: LibrdKafkaError,
messages: KafkaMessage[],
) => {
if (error) {
logger.error(new KafkaError("Error while consuming", error))
process.exit(-1)
}

if (messages.length > 0) {
await handleMessage<Message>({
context,
kafkaMessage: messages[0],
MessageYupSchema,
saveToDatabase,
})
setImmediate(() => {
consumer.consume(1, consumerImpl)
})
} else {
setTimeout(() => {
consumer.consume(1, consumerImpl)
}, 10)
}
}

consumer.consume(1, consumerImpl)
})
1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"kafka-consumer-user-points-batch": "node ./dist/bin/kafkaConsumer/userPointsConsumer/kafkaConsumer.js",
"kafka-consumer-user-points-realtime": "node ./dist/bin/kafkaConsumer/userPointsRealtimeConsumer/kafkaConsumer.js",
"kafka-consumer-user-course-points-batch": "node ./dist/bin/kafkaConsumer/userCoursePointsConsumer/kafkaConsumer.js",
"kafka-consumer-user-course-points-realtime": "node ./dist/bin/kafkaConsumer/userCoursePointsRealtimeConsumer/kafkaConsumer.js",
"kafka-consumer-exercises": "node ./dist/bin/kafkaConsumer/exerciseConsumer/kafkaConsumer.js",
"generate": "npm run generate:prisma && npm run generate:nexus",
"generate:prisma": "prisma generate",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-consumer-user-course-points-realtime
labels:
{{- include "helm.labels" . | nindent 4 }}
spec:
selector:
matchLabels:
app: kafka-consumer-user-course-points-realtime
{{- include "helm.selectorLabels" . | nindent 6 }}
replicas: {{ .Values.kafkaConsumer.userCoursePointsRealtime.replicaCount }}
template:
metadata:
labels:
app: kafka-consumer-user-course-points-realtime
{{- include "helm.selectorLabels" . | nindent 8 }}
spec:
containers:
- name: kafka-consumer-user-course-points-realtime
image: "{{ .Values.image.repository }}/moocfi-backend:{{ .Values.image.tag | default .Chart.AppVersion }}"
command: ["sh", "-c", "npm run kafka-consumer-user-course-points-realtime"]
imagePullPolicy: Always
ports:
- name: backend-http
containerPort: 4000
resources:
limits:
memory: 800Mi
cpu: 80m
requests:
memory: 200Mi
cpu: 20m
envFrom:
- secretRef:
name: backend-secret
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis
key: redis-password
volumeMounts:
- name: google-cloud-storage-serviceaccount
mountPath: "/etc/gcs"
readOnly: true
volumes:
- name: google-cloud-storage-serviceaccount
secret:
secretName: google-cloud-storage-serviceaccount
items:
- key: account.json
path: account.json
13 changes: 13 additions & 0 deletions helm/templates/kafka/user-course-points-realtime-topic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
name: user-course-points-realtime
namespace: kafka
labels:
strimzi.io/cluster: kafka-cluster
spec:
partitions: 10
replicas: 2
config:
retention.ms: 604800000
segment.bytes: 107374182
4 changes: 3 additions & 1 deletion helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ kafkaConsumer:
userPointsRealtime:
replicaCount: 1
userCoursePoints:
replicaCount: 6
replicaCount: 2
userCoursePointsRealtime:
replicaCount: 1

shibboTest:
replicaCount: 0
Expand Down

0 comments on commit baa62a1

Please sign in to comment.