From ff81967a59e6c8ed2758ddf90f3de80904d43034 Mon Sep 17 00:00:00 2001 From: devbeni Date: Fri, 19 Sep 2025 18:51:28 +0200 Subject: [PATCH] feat: refactor Socket.IO server implementation and update README for queue system --- .env.example | 4 -- README.md | 17 ++--- app/api/socket.js | 148 +++++++++++++++++++++++++++++++++++++++++ app/page.tsx | 11 +-- pages/api/socket.js | 148 +++++++++++++++++++++++++++++++++++++++++ server/index.js | 158 -------------------------------------------- 6 files changed, 307 insertions(+), 179 deletions(-) create mode 100644 app/api/socket.js create mode 100644 pages/api/socket.js delete mode 100644 server/index.js diff --git a/.env.example b/.env.example index c8a15a2..da87527 100644 --- a/.env.example +++ b/.env.example @@ -1,7 +1,3 @@ -# Socket server -SOCKET_PORT=4000 -NEXT_PUBLIC_SOCKET_URL=http://localhost:4000 - # Queue settings QUEUE_THRESHOLD=100 CONCURRENT_ACTIVE=50 diff --git a/README.md b/README.md index a1cf9bd..0699230 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,8 @@ This repository contains a minimal demo of a queue system for high-concurrency t Files added in this demo: -- `server/index.js` — simple Socket.IO server with in-memory queue logic and JWT issuance (demo only). -- `.env.example` — example environment variables for running the socket server. +- `pages/api/socket.js` — Next.js API route with Socket.IO server, in-memory queue logic and JWT issuance (demo only). +- `.env.example` — example environment variables for the queue system. - `app/page.tsx` — client UI (Next.js) that connects via Socket.IO and displays queue position, estimated wait, and purchase button. Important notes and limitations: @@ -59,26 +59,19 @@ Important notes and limitations: Running locally 1. Copy `.env.example` to `.env` and adjust values (especially `JWT_SECRET`). -2. Install dependencies for the server and client: +2. Install dependencies (already included in package.json): ```powershell pnpm install -pnpm --filter . add socket.io socket.io-client jsonwebtoken dotenv mysql2 ``` -3. Start the Socket.IO server: - -```powershell -node server/index.js -``` - -4. Start the Next.js dev server: +3. Start the Next.js dev server (includes the Socket.IO API route): ```powershell pnpm dev ``` -5. Open `http://localhost:3000` and the client will connect to the socket server (default `http://localhost:4000`). +4. Open `http://localhost:3000` and the client will connect to the Socket.IO API route at `/api/socket`. Next steps / improvements diff --git a/app/api/socket.js b/app/api/socket.js new file mode 100644 index 0000000..64656f7 --- /dev/null +++ b/app/api/socket.js @@ -0,0 +1,148 @@ +import { Server } from "socket.io"; +import jwt from "jsonwebtoken"; +import mysql from "mysql2/promise"; + +// Simple in-memory structures keyed by eventId +const events = {}; + +const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10); +const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10); +const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10); + +function ensureEvent(eventId) { + if (!events[eventId]) { + events[eventId] = { + sockets: new Set(), // connected socket ids + queue: [], // array of socket ids in order + active: new Set(), // sockets that currently hold a token (allowed to buy) + queueOn: false, + }; + } + return events[eventId]; +} + +async function createDbPool() { + if (!process.env.MYSQL_HOST) return null; + return mysql.createPool({ + host: process.env.MYSQL_HOST, + user: process.env.MYSQL_USER, + password: process.env.MYSQL_PASSWORD, + database: process.env.MYSQL_DATABASE, + waitForConnections: true, + connectionLimit: 5, + }); +} + +function buildUpdate(ev) { + const positionMap = {}; + ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1)); + return { + activeCount: ev.sockets.size, + queueOn: ev.queueOn, + estimatedWait: ev.queue.length * 5, // naive: 5s per person + positions: positionMap, + }; +} + +function broadcastUpdate(eventId, io) { + const ev = events[eventId]; + if (!ev) return; + // notify all connected sockets in room + for (const sid of ev.sockets) { + const pos = ev.queue.indexOf(sid); + io.to(sid).emit("queue_update", { + activeCount: ev.sockets.size, + position: pos === -1 ? null : pos + 1, + estimatedWait: pos === -1 ? null : ev.queue.length * 5, + }); + } +} + +function evaluateQueue(eventId, io) { + const ev = events[eventId]; + if (!ev) return; + + // ensure active set size <= CONCURRENT_ACTIVE + while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) { + const next = ev.queue.shift(); + if (!next) break; + // sign token + const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", { + expiresIn: TOKEN_TTL_SECONDS, + }); + ev.active.add(next); + io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() }); + } + + // If too many active (rare), revoke oldest + if (ev.active.size > CONCURRENT_ACTIVE) { + const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE); + toRevoke.forEach((sid) => { + ev.active.delete(sid); + io.to(sid).emit("revoked"); + }); + } + + // If queue no longer needed, clear it + if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false; + + // broadcast + broadcastUpdate(eventId, io); +} + +export default async function handler(req, res) { + if (res.socket.server.io) { + console.log("Socket is already running"); + } else { + console.log("Socket is initializing"); + const io = new Server(res.socket.server, { + path: "/api/socket", + cors: { origin: true }, + }); + res.socket.server.io = io; + + const db = await createDbPool(); + + io.on("connection", (socket) => { + console.log("connect", socket.id); + + socket.on("join_event", async ({ eventId }) => { + if (!eventId) return; + const ev = ensureEvent(eventId); + ev.sockets.add(socket.id); + socket.join(eventId); + + // compute counts + const activeCount = ev.sockets.size; + // turn on queue if threshold reached + if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true; + + // if queueOn and socket not active, add to queue + if (ev.queueOn && !ev.active.has(socket.id)) { + if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id); + } + + // evaluate granting + evaluateQueue(eventId, io); + + // send initial update + io.to(socket.id).emit("queue_update", buildUpdate(ev)); + }); + + socket.on("disconnect", () => { + // remove from every event + for (const [eventId, ev] of Object.entries(events)) { + if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id); + const qi = ev.queue.indexOf(socket.id); + if (qi !== -1) ev.queue.splice(qi, 1); + if (ev.active.has(socket.id)) ev.active.delete(socket.id); + // re-evaluate to grant next in line + evaluateQueue(eventId, io); + // notify remaining sockets + broadcastUpdate(eventId, io); + } + }); + }); + } + res.end(); +} diff --git a/app/page.tsx b/app/page.tsx index d80556b..2759e4c 100644 --- a/app/page.tsx +++ b/app/page.tsx @@ -15,11 +15,12 @@ export default function Home() { useEffect(() => { let mounted = true; // dynamic import to avoid bundling issues on server - import("socket.io-client").then(({ io }) => { - if (!mounted) return; - // default to the same origin and connect to the Next.js API route at /api/socket - const serverUrl = process.env.NEXT_PUBLIC_SOCKET_URL || window.location.origin; - const socket = io(serverUrl, { autoConnect: true, path: "/api/socket" }); + import("socket.io-client").then(({ io }) => { + if (!mounted) return; + const socket = io({ + path: "/api/socket", + autoConnect: true + }); socketRef.current = socket; socket.on("connect", () => { diff --git a/pages/api/socket.js b/pages/api/socket.js new file mode 100644 index 0000000..64656f7 --- /dev/null +++ b/pages/api/socket.js @@ -0,0 +1,148 @@ +import { Server } from "socket.io"; +import jwt from "jsonwebtoken"; +import mysql from "mysql2/promise"; + +// Simple in-memory structures keyed by eventId +const events = {}; + +const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10); +const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10); +const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10); + +function ensureEvent(eventId) { + if (!events[eventId]) { + events[eventId] = { + sockets: new Set(), // connected socket ids + queue: [], // array of socket ids in order + active: new Set(), // sockets that currently hold a token (allowed to buy) + queueOn: false, + }; + } + return events[eventId]; +} + +async function createDbPool() { + if (!process.env.MYSQL_HOST) return null; + return mysql.createPool({ + host: process.env.MYSQL_HOST, + user: process.env.MYSQL_USER, + password: process.env.MYSQL_PASSWORD, + database: process.env.MYSQL_DATABASE, + waitForConnections: true, + connectionLimit: 5, + }); +} + +function buildUpdate(ev) { + const positionMap = {}; + ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1)); + return { + activeCount: ev.sockets.size, + queueOn: ev.queueOn, + estimatedWait: ev.queue.length * 5, // naive: 5s per person + positions: positionMap, + }; +} + +function broadcastUpdate(eventId, io) { + const ev = events[eventId]; + if (!ev) return; + // notify all connected sockets in room + for (const sid of ev.sockets) { + const pos = ev.queue.indexOf(sid); + io.to(sid).emit("queue_update", { + activeCount: ev.sockets.size, + position: pos === -1 ? null : pos + 1, + estimatedWait: pos === -1 ? null : ev.queue.length * 5, + }); + } +} + +function evaluateQueue(eventId, io) { + const ev = events[eventId]; + if (!ev) return; + + // ensure active set size <= CONCURRENT_ACTIVE + while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) { + const next = ev.queue.shift(); + if (!next) break; + // sign token + const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", { + expiresIn: TOKEN_TTL_SECONDS, + }); + ev.active.add(next); + io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() }); + } + + // If too many active (rare), revoke oldest + if (ev.active.size > CONCURRENT_ACTIVE) { + const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE); + toRevoke.forEach((sid) => { + ev.active.delete(sid); + io.to(sid).emit("revoked"); + }); + } + + // If queue no longer needed, clear it + if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false; + + // broadcast + broadcastUpdate(eventId, io); +} + +export default async function handler(req, res) { + if (res.socket.server.io) { + console.log("Socket is already running"); + } else { + console.log("Socket is initializing"); + const io = new Server(res.socket.server, { + path: "/api/socket", + cors: { origin: true }, + }); + res.socket.server.io = io; + + const db = await createDbPool(); + + io.on("connection", (socket) => { + console.log("connect", socket.id); + + socket.on("join_event", async ({ eventId }) => { + if (!eventId) return; + const ev = ensureEvent(eventId); + ev.sockets.add(socket.id); + socket.join(eventId); + + // compute counts + const activeCount = ev.sockets.size; + // turn on queue if threshold reached + if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true; + + // if queueOn and socket not active, add to queue + if (ev.queueOn && !ev.active.has(socket.id)) { + if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id); + } + + // evaluate granting + evaluateQueue(eventId, io); + + // send initial update + io.to(socket.id).emit("queue_update", buildUpdate(ev)); + }); + + socket.on("disconnect", () => { + // remove from every event + for (const [eventId, ev] of Object.entries(events)) { + if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id); + const qi = ev.queue.indexOf(socket.id); + if (qi !== -1) ev.queue.splice(qi, 1); + if (ev.active.has(socket.id)) ev.active.delete(socket.id); + // re-evaluate to grant next in line + evaluateQueue(eventId, io); + // notify remaining sockets + broadcastUpdate(eventId, io); + } + }); + }); + } + res.end(); +} diff --git a/server/index.js b/server/index.js deleted file mode 100644 index 55c045f..0000000 --- a/server/index.js +++ /dev/null @@ -1,158 +0,0 @@ -/* - Simple Socket.IO queue server for demo purposes. - - Tracks users per-event - - When active concurrent users for an event reach QUEUE_THRESHOLD, turn on queueing - - Allow up to CONCURRENT_ACTIVE users on the site simultaneously (have access token) - - Issue JWT tokens valid for TOKEN_TTL_SECONDS when a user reaches front of queue - - Uses MySQL to persist queue positions (lightweight stub here) - - NOTE: This is a minimal, synchronous-in-memory demo. For production you must persist state - in Redis or a DB, add authentication, rate-limiting, and proper error handling. -*/ - -require("dotenv").config(); -const http = require("http"); -const { Server } = require("socket.io"); -const jwt = require("jsonwebtoken"); -const mysql = require("mysql2/promise"); - -const PORT = process.env.SOCKET_PORT || 4000; -const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10); -const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10); -const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10); - -// Simple in-memory structures keyed by eventId -const events = {}; - -function ensureEvent(eventId) { - if (!events[eventId]) { - events[eventId] = { - sockets: new Set(), // connected socket ids - queue: [], // array of socket ids in order - active: new Set(), // sockets that currently hold a token (allowed to buy) - queueOn: false, - }; - } - return events[eventId]; -} - -async function createDbPool() { - if (!process.env.MYSQL_HOST) return null; - return mysql.createPool({ - host: process.env.MYSQL_HOST, - user: process.env.MYSQL_USER, - password: process.env.MYSQL_PASSWORD, - database: process.env.MYSQL_DATABASE, - waitForConnections: true, - connectionLimit: 5, - }); -} - -;(async () => { - const db = await createDbPool(); - - const server = http.createServer(); - const io = new Server(server, { - cors: { origin: true }, - }); - - io.on("connection", (socket) => { - console.log("connect", socket.id); - - socket.on("join_event", async ({ eventId }) => { - if (!eventId) return; - const ev = ensureEvent(eventId); - ev.sockets.add(socket.id); - socket.join(eventId); - - // compute counts - const activeCount = ev.sockets.size; - // turn on queue if threshold reached - if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true; - - // if queueOn and socket not active, add to queue - if (ev.queueOn && !ev.active.has(socket.id)) { - if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id); - } - - // evaluate granting - evaluateQueue(eventId, io); - - // send initial update - io.to(socket.id).emit("queue_update", buildUpdate(ev)); - }); - - socket.on("disconnect", () => { - // remove from every event - for (const [eventId, ev] of Object.entries(events)) { - if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id); - const qi = ev.queue.indexOf(socket.id); - if (qi !== -1) ev.queue.splice(qi, 1); - if (ev.active.has(socket.id)) ev.active.delete(socket.id); - // re-evaluate to grant next in line - evaluateQueue(eventId, io); - // notify remaining sockets - broadcastUpdate(eventId, io); - } - }); - }); - - server.listen(PORT, () => console.log(`Socket server listening on ${PORT}`)); - - function buildUpdate(ev) { - const positionMap = {}; - ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1)); - return { - activeCount: ev.sockets.size, - queueOn: ev.queueOn, - estimatedWait: ev.queue.length * 5, // naive: 5s per person - positions: positionMap, - }; - } - - function broadcastUpdate(eventId, io) { - const ev = events[eventId]; - if (!ev) return; - // notify all connected sockets in room - for (const sid of ev.sockets) { - const pos = ev.queue.indexOf(sid); - io.to(sid).emit("queue_update", { - activeCount: ev.sockets.size, - position: pos === -1 ? null : pos + 1, - estimatedWait: pos === -1 ? null : ev.queue.length * 5, - }); - } - } - - function evaluateQueue(eventId, io) { - const ev = events[eventId]; - if (!ev) return; - - // ensure active set size <= CONCURRENT_ACTIVE - while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) { - const next = ev.queue.shift(); - if (!next) break; - // sign token - const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", { - expiresIn: TOKEN_TTL_SECONDS, - }); - ev.active.add(next); - io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() }); - } - - // If too many active (rare), revoke oldest - if (ev.active.size > CONCURRENT_ACTIVE) { - const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE); - toRevoke.forEach((sid) => { - ev.active.delete(sid); - io.to(sid).emit("revoked"); - }); - } - - // If queue no longer needed, clear it - if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false; - - // broadcast - broadcastUpdate(eventId, io); - } -})();