Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
"express": "^5.2.1",
"jsonwebtoken": "^9.0.3",
"mongoose": "^9.1.5",
"morgan": "^1.10.1"
"morgan": "^1.10.1",
"socket.io": "^4.8.3"
},
"devDependencies": {
"@babel/eslint-parser": "^7.28.6",
Expand Down
153 changes: 152 additions & 1 deletion server.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,26 @@ import { fileURLToPath } from 'url';
import { dirname, join } from 'path';
import * as AuthControllers from './src/controllers/AuthControllers.js';
import { auth, authorizeRole } from './src/middleware/auth.js';
import { createServer } from 'http';
import { Server } from 'socket.io';

const app = express();
app.use(morgan('dev'));
const PORT = process.env.PORT || 3000;
const MONGODB_URI = process.env.MONGODB_URI;

// Create HTTP server and attach Socket.IO
const httpServer = createServer(app);
const io = new Server(httpServer, {
cors: {
origin: process.env.CLIENT_URL
? [process.env.CLIENT_URL]
: ['http://localhost:5173', 'http://localhost:3000'],
methods: ['GET', 'POST'],
credentials: true,
},
});

// Recreate __dirname since it is not available in ES Modules by default
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
Expand Down Expand Up @@ -132,16 +146,153 @@ app.get('/health', (req,res)=>{
});
});

// ============== WEBRTC SIGNALING SERVER ==============
// Track active streams: { roomId: { broadcaster: socketId, viewers: [socketId] } }
const activeStreams = new Map();

// API: Get list of active live streams
app.get('/api/live-streams', (req, res) => {
const streams = [];
activeStreams.forEach((value, key) => {
streams.push({ roomId: key, viewerCount: value.viewers.length });
});
res.json(streams);
});

/**
* Validate that a roomId exists in activeStreams and that the socket
* has joined the corresponding Socket.IO room.
* Returns the stream object on success, or null after emitting an error.
*/
function validateRoom(socket, roomId, eventName) {
if (!roomId || !activeStreams.has(roomId)) {
socket.emit('error', {
event: eventName,
message: `Room "${roomId}" does not exist in active streams`,
});
return null;
}

if (!socket.rooms.has(roomId)) {
socket.emit('error', {
event: eventName,
message: `Socket is not a member of room "${roomId}"`,
});
return null;
}

return activeStreams.get(roomId);
}

io.on('connection', (socket) => {
console.log(`[SOCKET] User connected: ${socket.id}`);

// Broadcaster starts a stream
socket.on('start-stream', (roomId) => {
if (activeStreams.has(roomId)) {
socket.emit('error', { message: 'Room already exists' });
return;
}
socket.join(roomId);
activeStreams.set(roomId, { broadcaster: socket.id, viewers: [] });
console.log(`[LIVE] Stream started: ${roomId} by ${socket.id}`);
socket.emit('stream-started', { roomId });
});

// Viewer joins a stream
socket.on('join-stream', (roomId) => {
const stream = activeStreams.get(roomId);
if (!stream) {
socket.emit('error', { message: 'Stream not found' });
return;
}
socket.join(roomId);
stream.viewers.push(socket.id);
console.log(`[VIEWER] ${socket.id} joined ${roomId}`);

// Notify broadcaster that a new viewer joined
io.to(stream.broadcaster).emit('viewer-joined', {
viewerId: socket.id,
});
});

// WebRTC signaling: Offer (broadcaster -> viewer)
socket.on('offer', ({ roomId, offer, viewerId }) => {
const stream = validateRoom(socket, roomId, 'offer');
if (!stream) return;

io.to(viewerId).emit('offer', { offer, broadcasterId: socket.id });
});

// WebRTC signaling: Answer (viewer -> broadcaster)
socket.on('answer', ({ roomId, answer, broadcasterId }) => {
const stream = validateRoom(socket, roomId, 'answer');
if (!stream) return;

io.to(broadcasterId).emit('answer', { answer, viewerId: socket.id });
});

// WebRTC signaling: ICE Candidate
socket.on('ice-candidate', ({ roomId, candidate, targetId }) => {
const stream = validateRoom(socket, roomId, 'ice-candidate');
if (!stream) return;

io.to(targetId).emit('ice-candidate', {
candidate,
senderId: socket.id,
});
});

// Stop stream
socket.on('stop-stream', (roomId) => {
if (activeStreams.has(roomId)) {
io.to(roomId).emit('stream-ended');
activeStreams.delete(roomId);
console.log(`[ENDED] Stream ended: ${roomId}`);
}
});

// Disconnect handling
socket.on('disconnect', () => {
console.log(`[SOCKET] User disconnected: ${socket.id}`);

// Check all streams for this socket
activeStreams.forEach((stream, roomId) => {
if (stream.broadcaster === socket.id) {
// Broadcaster disconnected - end the stream for all viewers
io.to(roomId).emit('stream-ended', {
reason: 'Broadcaster disconnected',
});
activeStreams.delete(roomId);
console.log(
`[ENDED] Stream ${roomId} ended (broadcaster disconnected)`
);
} else if (stream.viewers.includes(socket.id)) {
// Viewer disconnected - remove from list and notify broadcaster
stream.viewers = stream.viewers.filter((id) => id !== socket.id);
io.to(stream.broadcaster).emit('viewer-left', {
viewerId: socket.id,
viewerCount: stream.viewers.length,
});
console.log(
`[VIEWER] ${socket.id} left ${roomId}. Viewers remaining: ${stream.viewers.length}`
);
}
});
});
});

// 404 Not Found handler (must be after all routes)
app.use((req, res) => {
res.status(404).json({
error: 'Route not found',
});
});

app.listen(PORT, () => {
httpServer.listen(PORT, () => {
console.log(`\nServer successfully started!`);
console.log(`Home: http://localhost:${PORT}`);
console.log('\n✅ Server successfully started!');
console.log(`🏠 Home: http://localhost:${PORT}`);
console.log(`WebRTC Signaling: ws://localhost:${PORT}`);
});