Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions prisma/migrations/20260222071209_add_notifications/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- CreateTable
CREATE TABLE "Notification" (
"id" SERIAL NOT NULL,
"userId" INTEGER NOT NULL,
"title" TEXT NOT NULL,
"message" TEXT NOT NULL,
"type" TEXT NOT NULL,
"read" BOOLEAN NOT NULL DEFAULT false,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "Notification_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE INDEX "Notification_userId_createdAt_idx" ON "Notification"("userId", "createdAt");

-- CreateIndex
CREATE INDEX "Notification_userId_read_idx" ON "Notification"("userId", "read");

-- AddForeignKey
ALTER TABLE "Notification" ADD CONSTRAINT "Notification_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
14 changes: 14 additions & 0 deletions prisma/models/notification.prisma
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
model Notification {
id Int @id @default(autoincrement())
userId Int
title String
message String
type String
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to have an enum / something different for the notification type ? If we keep it as a string, it might create to many very similar notification types while we could easily control what types we want for notifications

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this needs to be enum -- otherwise clients could send unknown notification types that just overload the system -- notification types should be checked when received

read Boolean @default(false)
createdAt DateTime @default(now())

user User @relation(fields: [userId], references: [id], onDelete: Cascade)

@@index([userId, createdAt])
@@index([userId, read])
}
1 change: 1 addition & 0 deletions prisma/models/user.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ model User {
hostingSession WorkSession[] @relation("WorkSessionHost")

refreshTokens RefreshToken[]
notifications Notification[]
}

model Role {
Expand Down
52 changes: 27 additions & 25 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { S3Module } from "@s3/s3.module";
import { UserModule } from "@user/user.module";
import { ProjectModule } from "@projects/project.module";
import { WorkSessionModule } from "./routes/work-session/work-session.module";
import { PrismaModule } from "@prisma/prisma.module";
import { AuthModule } from "@auth/auth.module";
import { ScheduleModule } from "@nestjs/schedule";
import { TasksModule } from "./tasks/tasks.module";

@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
ScheduleModule.forRoot(),
PrismaModule,
AuthModule,
S3Module,
UserModule,
ProjectModule,
WorkSessionModule,
TasksModule
]
})
export class AppModule {}
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { S3Module } from "@s3/s3.module";
import { UserModule } from "@user/user.module";
import { ProjectModule } from "@projects/project.module";
import { WorkSessionModule } from "./routes/work-session/work-session.module";
import { PrismaModule } from "@prisma/prisma.module";
import { AuthModule } from "@auth/auth.module";
import { ScheduleModule } from "@nestjs/schedule";
import { TasksModule } from "./tasks/tasks.module";
import { NotificationsModule } from "./notifications/notifications.module";

@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
ScheduleModule.forRoot(),
PrismaModule,
AuthModule,
S3Module,
UserModule,
ProjectModule,
WorkSessionModule,
TasksModule,
NotificationsModule,
]
})
export class AppModule {}
272 changes: 139 additions & 133 deletions src/collab/signaling/signal.ts
Original file line number Diff line number Diff line change
@@ -1,133 +1,139 @@
import { WebSocket, WebSocketServer } from "ws";
import http from "http";
import * as map from "lib0/map";

const WS_READY_STATE_CONNECTING = 0;
const WS_READY_STATE_OPEN = 1;
const PING_TIMEOUT = 30000;

const topics = new Map<string, Set<WebSocket>>();

interface WSMessage {
type: "subscribe" | "unsubscribe" | "publish" | "ping" | "pong";
topics?: string[];
topic?: string;
clients?: number;
}

export let wss: WebSocketServer;

export const setupWebSocketServer = (server: http.Server): void => {
wss = new WebSocketServer({ noServer: true });

const send = (conn: WebSocket, message: WSMessage): void => {
if (
conn.readyState !== WS_READY_STATE_CONNECTING &&
conn.readyState !== WS_READY_STATE_OPEN
) {
conn.close();
}
try {
conn.send(JSON.stringify(message));
} catch {
conn.close();
return;
}
};

const onConnection = (conn: WebSocket): void => {
const subscribedTopics = new Set<string>();
let closed = false;
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
conn.close();
clearInterval(pingInterval);
} else {
pongReceived = false;
try {
conn.ping();
} catch {
conn.close();
}
}
}, PING_TIMEOUT);

conn.on("pong", () => {
pongReceived = true;
});

conn.on("close", () => {
subscribedTopics.forEach((topicName) => {
const subs = topics.get(topicName) || new Set();
subs.delete(conn);
if (subs.size === 0) {
topics.delete(topicName);
}
});
subscribedTopics.clear();
closed = true;
});

conn.on("message", (raw: import("ws").RawData): void => {
let message: WSMessage;

try {
const parsed = JSON.parse(
typeof raw === "string" ? raw : raw.toString()
);
message = parsed as WSMessage;
} catch {
conn.close();
return;
}

if (message && message.type && !closed) {
switch (message.type) {
case "subscribe":
(message.topics || []).forEach((topicName: string) => {
if (typeof topicName === "string") {
const topic = map.setIfUndefined(
topics,
topicName,
() => new Set<WebSocket>()
);
topic.add(conn);
subscribedTopics.add(topicName);
}
});
break;
case "unsubscribe":
(message.topics || []).forEach((topicName: string) => {
const subs = topics.get(topicName);
if (subs) {
subs.delete(conn);
}
});
break;
case "publish":
if (message.topic) {
const receivers = topics.get(message.topic);
if (receivers) {
message.clients = receivers.size;
receivers.forEach((receiver) => send(receiver, message));
}
}
break;
case "ping":
send(conn, { type: "pong" } as WSMessage);
break;
}
}
});
};

wss.on("connection", onConnection);

server.on("upgrade", (request, socket, head) => {
const handleAuth = (ws: WebSocket): void => {
wss.emit("connection", ws, request);
};
wss.handleUpgrade(request, socket, head, handleAuth);
});
};
import { WebSocket, WebSocketServer } from "ws";
import http from "http";
import * as map from "lib0/map";

const WS_READY_STATE_CONNECTING = 0;
const WS_READY_STATE_OPEN = 1;
const PING_TIMEOUT = 30000;

const topics = new Map<string, Set<WebSocket>>();

interface WSMessage {
type: "subscribe" | "unsubscribe" | "publish" | "ping" | "pong";
topics?: string[];
topic?: string;
clients?: number;
}

export let wss: WebSocketServer;
type UpgradeRequest = http.IncomingMessage & { _wsHandled?: boolean };

export const setupWebSocketServer = (server: http.Server): void => {
wss = new WebSocketServer({ noServer: true });

const send = (conn: WebSocket, message: WSMessage): void => {
if (
conn.readyState !== WS_READY_STATE_CONNECTING &&
conn.readyState !== WS_READY_STATE_OPEN
) {
conn.close();
}
try {
conn.send(JSON.stringify(message));
} catch {
conn.close();
return;
}
};

const onConnection = (conn: WebSocket): void => {
const subscribedTopics = new Set<string>();
let closed = false;
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
conn.close();
clearInterval(pingInterval);
} else {
pongReceived = false;
try {
conn.ping();
} catch {
conn.close();
}
}
}, PING_TIMEOUT);

conn.on("pong", () => {
pongReceived = true;
});

conn.on("close", () => {
subscribedTopics.forEach((topicName) => {
const subs = topics.get(topicName) || new Set();
subs.delete(conn);
if (subs.size === 0) {
topics.delete(topicName);
}
});
subscribedTopics.clear();
closed = true;
});

conn.on("message", (raw: import("ws").RawData): void => {
let message: WSMessage;

try {
const parsed = JSON.parse(
typeof raw === "string" ? raw : raw.toString()
);
message = parsed as WSMessage;
} catch {
conn.close();
return;
}

if (message && message.type && !closed) {
switch (message.type) {
case "subscribe":
(message.topics || []).forEach((topicName: string) => {
if (typeof topicName === "string") {
const topic = map.setIfUndefined(
topics,
topicName,
() => new Set<WebSocket>()
);
topic.add(conn);
subscribedTopics.add(topicName);
}
});
break;
case "unsubscribe":
(message.topics || []).forEach((topicName: string) => {
const subs = topics.get(topicName);
if (subs) {
subs.delete(conn);
}
});
break;
case "publish":
if (message.topic) {
const receivers = topics.get(message.topic);
if (receivers) {
message.clients = receivers.size;
receivers.forEach((receiver) => send(receiver, message));
}
}
break;
case "ping":
send(conn, { type: "pong" } as WSMessage);
break;
}
}
});
};

wss.on("connection", onConnection);

server.on("upgrade", (request: UpgradeRequest, socket, head) => {
const url = request.url ?? "";
if (!url.startsWith("/socket/webrtc")) {
return;
}
request._wsHandled = true;
const handleAuth = (ws: WebSocket): void => {
wss.emit("connection", ws, request);
};
wss.handleUpgrade(request, socket, head, handleAuth);
});
};
Loading