From cb326f7190b3583bf9396054d5749906a92b69e4 Mon Sep 17 00:00:00 2001 From: devbeni Date: Fri, 19 Sep 2025 18:58:22 +0200 Subject: [PATCH] feat: implement Socket.IO server with queue management and update environment variables --- .env.example | 6 +- README.md | 6 +- app/api/{socket.js => socket/route.js} | 103 +++++++++-------- app/api/socketio/route.js | 154 +++++++++++++++++++++++++ app/page.tsx | 85 ++++++++------ 5 files changed, 265 insertions(+), 89 deletions(-) rename app/api/{socket.js => socket/route.js} (61%) create mode 100644 app/api/socketio/route.js diff --git a/.env.example b/.env.example index da87527..ea7c875 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,7 @@ +# Socket server +SOCKET_PORT=4000 +NEXT_PUBLIC_SOCKET_PORT=4000 + # Queue settings QUEUE_THRESHOLD=100 CONCURRENT_ACTIVE=50 @@ -10,4 +14,4 @@ JWT_SECRET=your_jwt_secret_here MYSQL_HOST=localhost MYSQL_USER=root MYSQL_PASSWORD=yourpassword -MYSQL_DATABASE=queue_demo +MYSQL_DATABASE=queue_demo \ No newline at end of file diff --git a/README.md b/README.md index 0699230..a455f1d 100644 --- a/README.md +++ b/README.md @@ -47,7 +47,7 @@ This repository contains a minimal demo of a queue system for high-concurrency t Files added in this demo: -- `pages/api/socket.js` — Next.js API route with Socket.IO server, in-memory queue logic and JWT issuance (demo only). +- `app/api/socketio/route.js` — Next.js App Router API route that initializes the Socket.IO server, in-memory queue logic and JWT issuance (demo only). - `.env.example` — example environment variables for the queue system. - `app/page.tsx` — client UI (Next.js) that connects via Socket.IO and displays queue position, estimated wait, and purchase button. @@ -65,13 +65,13 @@ Running locally pnpm install ``` -3. Start the Next.js dev server (includes the Socket.IO API route): +3. Start the Next.js dev server: ```powershell pnpm dev ``` -4. Open `http://localhost:3000` and the client will connect to the Socket.IO API route at `/api/socket`. +4. Open `http://localhost:3000` - the page will automatically initialize the Socket.IO server via API call to `/api/socketio`, then connect to it on port 4000. Next steps / improvements diff --git a/app/api/socket.js b/app/api/socket/route.js similarity index 61% rename from app/api/socket.js rename to app/api/socket/route.js index 64656f7..27a47da 100644 --- a/app/api/socket.js +++ b/app/api/socket/route.js @@ -1,6 +1,7 @@ import { Server } from "socket.io"; import jwt from "jsonwebtoken"; import mysql from "mysql2/promise"; +import { NextRequest, NextResponse } from "next/server"; // Simple in-memory structures keyed by eventId const events = {}; @@ -90,59 +91,67 @@ function evaluateQueue(eventId, io) { broadcastUpdate(eventId, io); } -export default async function handler(req, res) { - if (res.socket.server.io) { +export async function GET(req) { + const res = NextResponse.next(); + + if (global.io) { console.log("Socket is already running"); } else { console.log("Socket is initializing"); - const io = new Server(res.socket.server, { - path: "/api/socket", - cors: { origin: true }, - }); - res.socket.server.io = io; - - const db = await createDbPool(); - - io.on("connection", (socket) => { - console.log("connect", socket.id); - - socket.on("join_event", async ({ eventId }) => { - if (!eventId) return; - const ev = ensureEvent(eventId); - ev.sockets.add(socket.id); - socket.join(eventId); - - // compute counts - const activeCount = ev.sockets.size; - // turn on queue if threshold reached - if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true; - - // if queueOn and socket not active, add to queue - if (ev.queueOn && !ev.active.has(socket.id)) { - if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id); - } - - // evaluate granting - evaluateQueue(eventId, io); - - // send initial update - io.to(socket.id).emit("queue_update", buildUpdate(ev)); + const httpServer = req.socket?.server; + + if (httpServer) { + const io = new Server(httpServer, { + path: "/api/socket", + cors: { origin: true }, }); + + global.io = io; - socket.on("disconnect", () => { - // remove from every event - for (const [eventId, ev] of Object.entries(events)) { - if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id); - const qi = ev.queue.indexOf(socket.id); - if (qi !== -1) ev.queue.splice(qi, 1); - if (ev.active.has(socket.id)) ev.active.delete(socket.id); - // re-evaluate to grant next in line + const db = await createDbPool(); + + io.on("connection", (socket) => { + console.log("connect", socket.id); + + socket.on("join_event", async ({ eventId }) => { + if (!eventId) return; + const ev = ensureEvent(eventId); + ev.sockets.add(socket.id); + socket.join(eventId); + + // compute counts + const activeCount = ev.sockets.size; + // turn on queue if threshold reached + if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true; + + // if queueOn and socket not active, add to queue + if (ev.queueOn && !ev.active.has(socket.id)) { + if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id); + } + + // evaluate granting evaluateQueue(eventId, io); - // notify remaining sockets - broadcastUpdate(eventId, io); - } + + // send initial update + io.to(socket.id).emit("queue_update", buildUpdate(ev)); + }); + + socket.on("disconnect", () => { + // remove from every event + for (const [eventId, ev] of Object.entries(events)) { + if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id); + const qi = ev.queue.indexOf(socket.id); + if (qi !== -1) ev.queue.splice(qi, 1); + if (ev.active.has(socket.id)) ev.active.delete(socket.id); + // re-evaluate to grant next in line + evaluateQueue(eventId, io); + // notify remaining sockets + broadcastUpdate(eventId, io); + } + }); }); - }); + } } - res.end(); + + return new Response("Socket.IO server initialized", { status: 200 }); } diff --git a/app/api/socketio/route.js b/app/api/socketio/route.js new file mode 100644 index 0000000..f22bf4f --- /dev/null +++ b/app/api/socketio/route.js @@ -0,0 +1,154 @@ +import { Server } from 'socket.io' +import jwt from 'jsonwebtoken' + +// Simple in-memory structures keyed by eventId +const events = {} + +const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10) +const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10) +const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10) + +function ensureEvent(eventId) { + if (!events[eventId]) { + events[eventId] = { + sockets: new Set(), // connected socket ids + queue: [], // array of socket ids in order + active: new Set(), // sockets that currently hold a token (allowed to buy) + queueOn: false, + } + } + return events[eventId] +} + +function broadcastUpdate(eventId, io) { + const ev = events[eventId] + if (!ev) return + // notify all connected sockets in room + for (const sid of ev.sockets) { + const pos = ev.queue.indexOf(sid) + io.to(sid).emit("queue_update", { + activeCount: ev.sockets.size, + position: pos === -1 ? null : pos + 1, + estimatedWait: pos === -1 ? null : ev.queue.length * 5, + }) + } +} + +function evaluateQueue(eventId, io) { + const ev = events[eventId] + if (!ev) return + + // ensure active set size <= CONCURRENT_ACTIVE + while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) { + const next = ev.queue.shift() + if (!next) break + // sign token + const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", { + expiresIn: TOKEN_TTL_SECONDS, + }) + ev.active.add(next) + io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() }) + } + + // If too many active (rare), revoke oldest + if (ev.active.size > CONCURRENT_ACTIVE) { + const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE) + toRevoke.forEach((sid) => { + ev.active.delete(sid) + io.to(sid).emit("revoked") + }) + } + + // If queue no longer needed, clear it + if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false + + // broadcast + broadcastUpdate(eventId, io) +} + +export async function GET(req) { + if (global.io) { + console.log("Socket.IO már fut") + return Response.json({ message: "Socket.IO már inicializálva" }) + } + + console.log("Socket.IO inicializálása...") + + try { + // Get the HTTP server instance from Next.js + const server = req.nextUrl.protocol === 'https:' + ? require('https').createServer() + : require('http').createServer() + + const io = new Server(server, { + cors: { + origin: "*", + methods: ["GET", "POST"] + } + }) + + global.io = io + + io.on("connection", (socket) => { + console.log("Csatlakozott:", socket.id) + + socket.on("join_event", async ({ eventId }) => { + if (!eventId) return + const ev = ensureEvent(eventId) + ev.sockets.add(socket.id) + socket.join(eventId) + + // compute counts + const activeCount = ev.sockets.size + // turn on queue if threshold reached + if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true + + // if queueOn and socket not active, add to queue + if (ev.queueOn && !ev.active.has(socket.id)) { + if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id) + } + + // evaluate granting + evaluateQueue(eventId, io) + + // send initial update + const pos = ev.queue.indexOf(socket.id) + io.to(socket.id).emit("queue_update", { + activeCount: ev.sockets.size, + position: pos === -1 ? null : pos + 1, + estimatedWait: pos === -1 ? null : ev.queue.length * 5, + }) + }) + + socket.on("disconnect", () => { + console.log("Lecsatlakozott:", socket.id) + // remove from every event + for (const [eventId, ev] of Object.entries(events)) { + if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id) + const qi = ev.queue.indexOf(socket.id) + if (qi !== -1) ev.queue.splice(qi, 1) + if (ev.active.has(socket.id)) ev.active.delete(socket.id) + // re-evaluate to grant next in line + evaluateQueue(eventId, io) + // notify remaining sockets + broadcastUpdate(eventId, io) + } + }) + }) + + // Start the Socket.IO server on a different port + const SOCKET_PORT = process.env.SOCKET_PORT || 4000 + server.listen(SOCKET_PORT, () => { + console.log(`Socket.IO szerver fut a ${SOCKET_PORT} porton`) + }) + + return Response.json({ + message: "Socket.IO szerver inicializálva", + port: SOCKET_PORT + }) + + } catch (error) { + console.error("Socket.IO inicializálási hiba:", error) + return Response.json({ error: "Nem sikerült inicializálni a Socket.IO szervert" }, { status: 500 }) + } +} diff --git a/app/page.tsx b/app/page.tsx index 2759e4c..a94c702 100644 --- a/app/page.tsx +++ b/app/page.tsx @@ -14,50 +14,59 @@ export default function Home() { useEffect(() => { let mounted = true; - // dynamic import to avoid bundling issues on server - import("socket.io-client").then(({ io }) => { - if (!mounted) return; - const socket = io({ - path: "/api/socket", - autoConnect: true - }); - socketRef.current = socket; + + // Először inicializáljuk a Socket.IO szervert + fetch('/api/socketio') + .then(() => { + // dynamic import to avoid bundling issues on server + return import("socket.io-client"); + }) + .then(({ io }) => { + if (!mounted) return; + const socketPort = process.env.NEXT_PUBLIC_SOCKET_PORT || "4000"; + const socket = io(`http://localhost:${socketPort}`, { + autoConnect: true + }); + socketRef.current = socket; - socket.on("connect", () => { - setConnected(true); - // join a named event (example eventId: pamkutya) - socket.emit("join_event", { eventId: "pamkutya" }); - }); + socket.on("connect", () => { + setConnected(true); + // join a named event (example eventId: pamkutya) + socket.emit("join_event", { eventId: "pamkutya" }); + }); - socket.on("disconnect", () => { - setConnected(false); - setPosition(null); - setHasAccess(false); - setTokenExpiry(null); - }); + socket.on("disconnect", () => { + setConnected(false); + setPosition(null); + setHasAccess(false); + setTokenExpiry(null); + }); - socket.on("queue_update", (data: any) => { - setPosition(data.position ?? null); - setEstimatedWait(data.estimatedWait ?? null); - setActiveUsers(data.activeCount ?? 0); - }); + socket.on("queue_update", (data: any) => { + setPosition(data.position ?? null); + setEstimatedWait(data.estimatedWait ?? null); + setActiveUsers(data.activeCount ?? 0); + }); - socket.on("granted", (data: any) => { - // { token, expiresAt } - setHasAccess(true); - setTokenExpiry(data.expiresAt ? Date.parse(data.expiresAt) : Date.now() + 15 * 60 * 1000); - // store token locally for API calls - try { - localStorage.setItem("event_token", data.token); - } catch (e) {} - }); + socket.on("granted", (data: any) => { + // { token, expiresAt } + setHasAccess(true); + setTokenExpiry(data.expiresAt ? Date.parse(data.expiresAt) : Date.now() + 15 * 60 * 1000); + // store token locally for API calls + try { + localStorage.setItem("event_token", data.token); + } catch (e) {} + }); - socket.on("revoked", () => { - setHasAccess(false); - setTokenExpiry(null); - localStorage.removeItem("event_token"); + socket.on("revoked", () => { + setHasAccess(false); + setTokenExpiry(null); + localStorage.removeItem("event_token"); + }); + }) + .catch(error => { + console.error("Socket inicializálási hiba:", error); }); - }); return () => { mounted = false;