Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions prisma/migrations/20260222071209_add_notifications/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- CreateTable
CREATE TABLE "Notification" (
"id" SERIAL NOT NULL,
"userId" INTEGER NOT NULL,
"title" TEXT NOT NULL,
"message" TEXT NOT NULL,
"type" TEXT NOT NULL,
"read" BOOLEAN NOT NULL DEFAULT false,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,

CONSTRAINT "Notification_pkey" PRIMARY KEY ("id")
);

-- CreateIndex
CREATE INDEX "Notification_userId_createdAt_idx" ON "Notification"("userId", "createdAt");

-- CreateIndex
CREATE INDEX "Notification_userId_read_idx" ON "Notification"("userId", "read");

-- AddForeignKey
ALTER TABLE "Notification" ADD CONSTRAINT "Notification_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
19 changes: 19 additions & 0 deletions prisma/models/notification.prisma
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
enum NotificationType {
INFO
WARNING
}

model Notification {
id Int @id @default(autoincrement())
userId Int
title String
message String
type NotificationType
read Boolean @default(false)
createdAt DateTime @default(now())

user User @relation(fields: [userId], references: [id], onDelete: Cascade)

@@index([userId, createdAt])
@@index([userId, read])
}
1 change: 1 addition & 0 deletions prisma/models/user.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ model User {
hostingSession WorkSession[] @relation("WorkSessionHost")

refreshTokens RefreshToken[]
notifications Notification[]
}

model Role {
Expand Down
52 changes: 27 additions & 25 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { S3Module } from "@s3/s3.module";
import { UserModule } from "@user/user.module";
import { ProjectModule } from "@projects/project.module";
import { WorkSessionModule } from "./routes/work-session/work-session.module";
import { PrismaModule } from "@prisma/prisma.module";
import { AuthModule } from "@auth/auth.module";
import { ScheduleModule } from "@nestjs/schedule";
import { TasksModule } from "./tasks/tasks.module";

@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
ScheduleModule.forRoot(),
PrismaModule,
AuthModule,
S3Module,
UserModule,
ProjectModule,
WorkSessionModule,
TasksModule
]
})
export class AppModule {}
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { S3Module } from "@s3/s3.module";
import { UserModule } from "@user/user.module";
import { ProjectModule } from "@projects/project.module";
import { WorkSessionModule } from "./routes/work-session/work-session.module";
import { PrismaModule } from "@prisma/prisma.module";
import { AuthModule } from "@auth/auth.module";
import { ScheduleModule } from "@nestjs/schedule";
import { TasksModule } from "./tasks/tasks.module";
import { NotificationsModule } from "./notifications/notifications.module";

@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
ScheduleModule.forRoot(),
PrismaModule,
AuthModule,
S3Module,
UserModule,
ProjectModule,
WorkSessionModule,
TasksModule,
NotificationsModule,
]
})
export class AppModule {}
272 changes: 139 additions & 133 deletions src/collab/signaling/signal.ts
Original file line number Diff line number Diff line change
@@ -1,133 +1,139 @@
import { WebSocket, WebSocketServer } from "ws";
import http from "http";
import * as map from "lib0/map";

const WS_READY_STATE_CONNECTING = 0;
const WS_READY_STATE_OPEN = 1;
const PING_TIMEOUT = 30000;

const topics = new Map<string, Set<WebSocket>>();

interface WSMessage {
type: "subscribe" | "unsubscribe" | "publish" | "ping" | "pong";
topics?: string[];
topic?: string;
clients?: number;
}

export let wss: WebSocketServer;

export const setupWebSocketServer = (server: http.Server): void => {
wss = new WebSocketServer({ noServer: true });

const send = (conn: WebSocket, message: WSMessage): void => {
if (
conn.readyState !== WS_READY_STATE_CONNECTING &&
conn.readyState !== WS_READY_STATE_OPEN
) {
conn.close();
}
try {
conn.send(JSON.stringify(message));
} catch {
conn.close();
return;
}
};

const onConnection = (conn: WebSocket): void => {
const subscribedTopics = new Set<string>();
let closed = false;
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
conn.close();
clearInterval(pingInterval);
} else {
pongReceived = false;
try {
conn.ping();
} catch {
conn.close();
}
}
}, PING_TIMEOUT);

conn.on("pong", () => {
pongReceived = true;
});

conn.on("close", () => {
subscribedTopics.forEach((topicName) => {
const subs = topics.get(topicName) || new Set();
subs.delete(conn);
if (subs.size === 0) {
topics.delete(topicName);
}
});
subscribedTopics.clear();
closed = true;
});

conn.on("message", (raw: import("ws").RawData): void => {
let message: WSMessage;

try {
const parsed = JSON.parse(
typeof raw === "string" ? raw : raw.toString()
);
message = parsed as WSMessage;
} catch {
conn.close();
return;
}

if (message && message.type && !closed) {
switch (message.type) {
case "subscribe":
(message.topics || []).forEach((topicName: string) => {
if (typeof topicName === "string") {
const topic = map.setIfUndefined(
topics,
topicName,
() => new Set<WebSocket>()
);
topic.add(conn);
subscribedTopics.add(topicName);
}
});
break;
case "unsubscribe":
(message.topics || []).forEach((topicName: string) => {
const subs = topics.get(topicName);
if (subs) {
subs.delete(conn);
}
});
break;
case "publish":
if (message.topic) {
const receivers = topics.get(message.topic);
if (receivers) {
message.clients = receivers.size;
receivers.forEach((receiver) => send(receiver, message));
}
}
break;
case "ping":
send(conn, { type: "pong" } as WSMessage);
break;
}
}
});
};

wss.on("connection", onConnection);

server.on("upgrade", (request, socket, head) => {
const handleAuth = (ws: WebSocket): void => {
wss.emit("connection", ws, request);
};
wss.handleUpgrade(request, socket, head, handleAuth);
});
};
import { WebSocket, WebSocketServer } from "ws";
import http from "http";
import * as map from "lib0/map";

const WS_READY_STATE_CONNECTING = 0;
const WS_READY_STATE_OPEN = 1;
const PING_TIMEOUT = 30000;

const topics = new Map<string, Set<WebSocket>>();

interface WSMessage {
type: "subscribe" | "unsubscribe" | "publish" | "ping" | "pong";
topics?: string[];
topic?: string;
clients?: number;
}

export let wss: WebSocketServer;
type UpgradeRequest = http.IncomingMessage & { _wsHandled?: boolean };

export const setupCollaborationSocket = (server: http.Server): void => {
wss = new WebSocketServer({ noServer: true });

const send = (conn: WebSocket, message: WSMessage): void => {
if (
conn.readyState !== WS_READY_STATE_CONNECTING &&
conn.readyState !== WS_READY_STATE_OPEN
) {
conn.close();
}
try {
conn.send(JSON.stringify(message));
} catch {
conn.close();
return;
}
};

const onConnection = (conn: WebSocket): void => {
const subscribedTopics = new Set<string>();
let closed = false;
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
conn.close();
clearInterval(pingInterval);
} else {
pongReceived = false;
try {
conn.ping();
} catch {
conn.close();
}
}
}, PING_TIMEOUT);

conn.on("pong", () => {
pongReceived = true;
});

conn.on("close", () => {
subscribedTopics.forEach((topicName) => {
const subs = topics.get(topicName) || new Set();
subs.delete(conn);
if (subs.size === 0) {
topics.delete(topicName);
}
});
subscribedTopics.clear();
closed = true;
});

conn.on("message", (raw: import("ws").RawData): void => {
let message: WSMessage;

try {
const parsed = JSON.parse(
typeof raw === "string" ? raw : raw.toString()
);
message = parsed as WSMessage;
} catch {
conn.close();
return;
}

if (message && message.type && !closed) {
switch (message.type) {
case "subscribe":
(message.topics || []).forEach((topicName: string) => {
if (typeof topicName === "string") {
const topic = map.setIfUndefined(
topics,
topicName,
() => new Set<WebSocket>()
);
topic.add(conn);
subscribedTopics.add(topicName);
}
});
break;
case "unsubscribe":
(message.topics || []).forEach((topicName: string) => {
const subs = topics.get(topicName);
if (subs) {
subs.delete(conn);
}
});
break;
case "publish":
if (message.topic) {
const receivers = topics.get(message.topic);
if (receivers) {
message.clients = receivers.size;
receivers.forEach((receiver) => send(receiver, message));
}
}
break;
case "ping":
send(conn, { type: "pong" } as WSMessage);
break;
}
}
});
};

wss.on("connection", onConnection);

server.on("upgrade", (request: UpgradeRequest, socket, head) => {
const url = request.url ?? "";
if (!url.startsWith("/socket/webrtc")) {
return;
}
request._wsHandled = true;
const handleAuth = (ws: WebSocket): void => {
wss.emit("connection", ws, request);
};
wss.handleUpgrade(request, socket, head, handleAuth);
});
};
Loading