feat: refactor Socket.IO server implementation and update README for queue system
This commit is contained in:
@@ -1,7 +1,3 @@
|
|||||||
# Socket server
|
|
||||||
SOCKET_PORT=4000
|
|
||||||
NEXT_PUBLIC_SOCKET_URL=http://localhost:4000
|
|
||||||
|
|
||||||
# Queue settings
|
# Queue settings
|
||||||
QUEUE_THRESHOLD=100
|
QUEUE_THRESHOLD=100
|
||||||
CONCURRENT_ACTIVE=50
|
CONCURRENT_ACTIVE=50
|
||||||
|
|||||||
17
README.md
17
README.md
@@ -47,8 +47,8 @@ This repository contains a minimal demo of a queue system for high-concurrency t
|
|||||||
|
|
||||||
Files added in this demo:
|
Files added in this demo:
|
||||||
|
|
||||||
- `server/index.js` — simple Socket.IO server with in-memory queue logic and JWT issuance (demo only).
|
- `pages/api/socket.js` — Next.js API route with Socket.IO server, in-memory queue logic and JWT issuance (demo only).
|
||||||
- `.env.example` — example environment variables for running the socket server.
|
- `.env.example` — example environment variables for the queue system.
|
||||||
- `app/page.tsx` — client UI (Next.js) that connects via Socket.IO and displays queue position, estimated wait, and purchase button.
|
- `app/page.tsx` — client UI (Next.js) that connects via Socket.IO and displays queue position, estimated wait, and purchase button.
|
||||||
|
|
||||||
Important notes and limitations:
|
Important notes and limitations:
|
||||||
@@ -59,26 +59,19 @@ Important notes and limitations:
|
|||||||
Running locally
|
Running locally
|
||||||
|
|
||||||
1. Copy `.env.example` to `.env` and adjust values (especially `JWT_SECRET`).
|
1. Copy `.env.example` to `.env` and adjust values (especially `JWT_SECRET`).
|
||||||
2. Install dependencies for the server and client:
|
2. Install dependencies (already included in package.json):
|
||||||
|
|
||||||
```powershell
|
```powershell
|
||||||
pnpm install
|
pnpm install
|
||||||
pnpm --filter . add socket.io socket.io-client jsonwebtoken dotenv mysql2
|
|
||||||
```
|
```
|
||||||
|
|
||||||
3. Start the Socket.IO server:
|
3. Start the Next.js dev server (includes the Socket.IO API route):
|
||||||
|
|
||||||
```powershell
|
|
||||||
node server/index.js
|
|
||||||
```
|
|
||||||
|
|
||||||
4. Start the Next.js dev server:
|
|
||||||
|
|
||||||
```powershell
|
```powershell
|
||||||
pnpm dev
|
pnpm dev
|
||||||
```
|
```
|
||||||
|
|
||||||
5. Open `http://localhost:3000` and the client will connect to the socket server (default `http://localhost:4000`).
|
4. Open `http://localhost:3000` and the client will connect to the Socket.IO API route at `/api/socket`.
|
||||||
|
|
||||||
Next steps / improvements
|
Next steps / improvements
|
||||||
|
|
||||||
|
|||||||
148
app/api/socket.js
Normal file
148
app/api/socket.js
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
import { Server } from "socket.io";
|
||||||
|
import jwt from "jsonwebtoken";
|
||||||
|
import mysql from "mysql2/promise";
|
||||||
|
|
||||||
|
// Simple in-memory structures keyed by eventId
|
||||||
|
const events = {};
|
||||||
|
|
||||||
|
const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10);
|
||||||
|
const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10);
|
||||||
|
const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10);
|
||||||
|
|
||||||
|
function ensureEvent(eventId) {
|
||||||
|
if (!events[eventId]) {
|
||||||
|
events[eventId] = {
|
||||||
|
sockets: new Set(), // connected socket ids
|
||||||
|
queue: [], // array of socket ids in order
|
||||||
|
active: new Set(), // sockets that currently hold a token (allowed to buy)
|
||||||
|
queueOn: false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return events[eventId];
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createDbPool() {
|
||||||
|
if (!process.env.MYSQL_HOST) return null;
|
||||||
|
return mysql.createPool({
|
||||||
|
host: process.env.MYSQL_HOST,
|
||||||
|
user: process.env.MYSQL_USER,
|
||||||
|
password: process.env.MYSQL_PASSWORD,
|
||||||
|
database: process.env.MYSQL_DATABASE,
|
||||||
|
waitForConnections: true,
|
||||||
|
connectionLimit: 5,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildUpdate(ev) {
|
||||||
|
const positionMap = {};
|
||||||
|
ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1));
|
||||||
|
return {
|
||||||
|
activeCount: ev.sockets.size,
|
||||||
|
queueOn: ev.queueOn,
|
||||||
|
estimatedWait: ev.queue.length * 5, // naive: 5s per person
|
||||||
|
positions: positionMap,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function broadcastUpdate(eventId, io) {
|
||||||
|
const ev = events[eventId];
|
||||||
|
if (!ev) return;
|
||||||
|
// notify all connected sockets in room
|
||||||
|
for (const sid of ev.sockets) {
|
||||||
|
const pos = ev.queue.indexOf(sid);
|
||||||
|
io.to(sid).emit("queue_update", {
|
||||||
|
activeCount: ev.sockets.size,
|
||||||
|
position: pos === -1 ? null : pos + 1,
|
||||||
|
estimatedWait: pos === -1 ? null : ev.queue.length * 5,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function evaluateQueue(eventId, io) {
|
||||||
|
const ev = events[eventId];
|
||||||
|
if (!ev) return;
|
||||||
|
|
||||||
|
// ensure active set size <= CONCURRENT_ACTIVE
|
||||||
|
while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) {
|
||||||
|
const next = ev.queue.shift();
|
||||||
|
if (!next) break;
|
||||||
|
// sign token
|
||||||
|
const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", {
|
||||||
|
expiresIn: TOKEN_TTL_SECONDS,
|
||||||
|
});
|
||||||
|
ev.active.add(next);
|
||||||
|
io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() });
|
||||||
|
}
|
||||||
|
|
||||||
|
// If too many active (rare), revoke oldest
|
||||||
|
if (ev.active.size > CONCURRENT_ACTIVE) {
|
||||||
|
const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE);
|
||||||
|
toRevoke.forEach((sid) => {
|
||||||
|
ev.active.delete(sid);
|
||||||
|
io.to(sid).emit("revoked");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// If queue no longer needed, clear it
|
||||||
|
if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false;
|
||||||
|
|
||||||
|
// broadcast
|
||||||
|
broadcastUpdate(eventId, io);
|
||||||
|
}
|
||||||
|
|
||||||
|
export default async function handler(req, res) {
|
||||||
|
if (res.socket.server.io) {
|
||||||
|
console.log("Socket is already running");
|
||||||
|
} else {
|
||||||
|
console.log("Socket is initializing");
|
||||||
|
const io = new Server(res.socket.server, {
|
||||||
|
path: "/api/socket",
|
||||||
|
cors: { origin: true },
|
||||||
|
});
|
||||||
|
res.socket.server.io = io;
|
||||||
|
|
||||||
|
const db = await createDbPool();
|
||||||
|
|
||||||
|
io.on("connection", (socket) => {
|
||||||
|
console.log("connect", socket.id);
|
||||||
|
|
||||||
|
socket.on("join_event", async ({ eventId }) => {
|
||||||
|
if (!eventId) return;
|
||||||
|
const ev = ensureEvent(eventId);
|
||||||
|
ev.sockets.add(socket.id);
|
||||||
|
socket.join(eventId);
|
||||||
|
|
||||||
|
// compute counts
|
||||||
|
const activeCount = ev.sockets.size;
|
||||||
|
// turn on queue if threshold reached
|
||||||
|
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true;
|
||||||
|
|
||||||
|
// if queueOn and socket not active, add to queue
|
||||||
|
if (ev.queueOn && !ev.active.has(socket.id)) {
|
||||||
|
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// evaluate granting
|
||||||
|
evaluateQueue(eventId, io);
|
||||||
|
|
||||||
|
// send initial update
|
||||||
|
io.to(socket.id).emit("queue_update", buildUpdate(ev));
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.on("disconnect", () => {
|
||||||
|
// remove from every event
|
||||||
|
for (const [eventId, ev] of Object.entries(events)) {
|
||||||
|
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id);
|
||||||
|
const qi = ev.queue.indexOf(socket.id);
|
||||||
|
if (qi !== -1) ev.queue.splice(qi, 1);
|
||||||
|
if (ev.active.has(socket.id)) ev.active.delete(socket.id);
|
||||||
|
// re-evaluate to grant next in line
|
||||||
|
evaluateQueue(eventId, io);
|
||||||
|
// notify remaining sockets
|
||||||
|
broadcastUpdate(eventId, io);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
11
app/page.tsx
11
app/page.tsx
@@ -15,11 +15,12 @@ export default function Home() {
|
|||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
let mounted = true;
|
let mounted = true;
|
||||||
// dynamic import to avoid bundling issues on server
|
// dynamic import to avoid bundling issues on server
|
||||||
import("socket.io-client").then(({ io }) => {
|
import("socket.io-client").then(({ io }) => {
|
||||||
if (!mounted) return;
|
if (!mounted) return;
|
||||||
// default to the same origin and connect to the Next.js API route at /api/socket
|
const socket = io({
|
||||||
const serverUrl = process.env.NEXT_PUBLIC_SOCKET_URL || window.location.origin;
|
path: "/api/socket",
|
||||||
const socket = io(serverUrl, { autoConnect: true, path: "/api/socket" });
|
autoConnect: true
|
||||||
|
});
|
||||||
socketRef.current = socket;
|
socketRef.current = socket;
|
||||||
|
|
||||||
socket.on("connect", () => {
|
socket.on("connect", () => {
|
||||||
|
|||||||
148
pages/api/socket.js
Normal file
148
pages/api/socket.js
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
import { Server } from "socket.io";
|
||||||
|
import jwt from "jsonwebtoken";
|
||||||
|
import mysql from "mysql2/promise";
|
||||||
|
|
||||||
|
// Simple in-memory structures keyed by eventId
|
||||||
|
const events = {};
|
||||||
|
|
||||||
|
const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10);
|
||||||
|
const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10);
|
||||||
|
const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10);
|
||||||
|
|
||||||
|
function ensureEvent(eventId) {
|
||||||
|
if (!events[eventId]) {
|
||||||
|
events[eventId] = {
|
||||||
|
sockets: new Set(), // connected socket ids
|
||||||
|
queue: [], // array of socket ids in order
|
||||||
|
active: new Set(), // sockets that currently hold a token (allowed to buy)
|
||||||
|
queueOn: false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return events[eventId];
|
||||||
|
}
|
||||||
|
|
||||||
|
async function createDbPool() {
|
||||||
|
if (!process.env.MYSQL_HOST) return null;
|
||||||
|
return mysql.createPool({
|
||||||
|
host: process.env.MYSQL_HOST,
|
||||||
|
user: process.env.MYSQL_USER,
|
||||||
|
password: process.env.MYSQL_PASSWORD,
|
||||||
|
database: process.env.MYSQL_DATABASE,
|
||||||
|
waitForConnections: true,
|
||||||
|
connectionLimit: 5,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildUpdate(ev) {
|
||||||
|
const positionMap = {};
|
||||||
|
ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1));
|
||||||
|
return {
|
||||||
|
activeCount: ev.sockets.size,
|
||||||
|
queueOn: ev.queueOn,
|
||||||
|
estimatedWait: ev.queue.length * 5, // naive: 5s per person
|
||||||
|
positions: positionMap,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function broadcastUpdate(eventId, io) {
|
||||||
|
const ev = events[eventId];
|
||||||
|
if (!ev) return;
|
||||||
|
// notify all connected sockets in room
|
||||||
|
for (const sid of ev.sockets) {
|
||||||
|
const pos = ev.queue.indexOf(sid);
|
||||||
|
io.to(sid).emit("queue_update", {
|
||||||
|
activeCount: ev.sockets.size,
|
||||||
|
position: pos === -1 ? null : pos + 1,
|
||||||
|
estimatedWait: pos === -1 ? null : ev.queue.length * 5,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function evaluateQueue(eventId, io) {
|
||||||
|
const ev = events[eventId];
|
||||||
|
if (!ev) return;
|
||||||
|
|
||||||
|
// ensure active set size <= CONCURRENT_ACTIVE
|
||||||
|
while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) {
|
||||||
|
const next = ev.queue.shift();
|
||||||
|
if (!next) break;
|
||||||
|
// sign token
|
||||||
|
const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", {
|
||||||
|
expiresIn: TOKEN_TTL_SECONDS,
|
||||||
|
});
|
||||||
|
ev.active.add(next);
|
||||||
|
io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() });
|
||||||
|
}
|
||||||
|
|
||||||
|
// If too many active (rare), revoke oldest
|
||||||
|
if (ev.active.size > CONCURRENT_ACTIVE) {
|
||||||
|
const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE);
|
||||||
|
toRevoke.forEach((sid) => {
|
||||||
|
ev.active.delete(sid);
|
||||||
|
io.to(sid).emit("revoked");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// If queue no longer needed, clear it
|
||||||
|
if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false;
|
||||||
|
|
||||||
|
// broadcast
|
||||||
|
broadcastUpdate(eventId, io);
|
||||||
|
}
|
||||||
|
|
||||||
|
export default async function handler(req, res) {
|
||||||
|
if (res.socket.server.io) {
|
||||||
|
console.log("Socket is already running");
|
||||||
|
} else {
|
||||||
|
console.log("Socket is initializing");
|
||||||
|
const io = new Server(res.socket.server, {
|
||||||
|
path: "/api/socket",
|
||||||
|
cors: { origin: true },
|
||||||
|
});
|
||||||
|
res.socket.server.io = io;
|
||||||
|
|
||||||
|
const db = await createDbPool();
|
||||||
|
|
||||||
|
io.on("connection", (socket) => {
|
||||||
|
console.log("connect", socket.id);
|
||||||
|
|
||||||
|
socket.on("join_event", async ({ eventId }) => {
|
||||||
|
if (!eventId) return;
|
||||||
|
const ev = ensureEvent(eventId);
|
||||||
|
ev.sockets.add(socket.id);
|
||||||
|
socket.join(eventId);
|
||||||
|
|
||||||
|
// compute counts
|
||||||
|
const activeCount = ev.sockets.size;
|
||||||
|
// turn on queue if threshold reached
|
||||||
|
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true;
|
||||||
|
|
||||||
|
// if queueOn and socket not active, add to queue
|
||||||
|
if (ev.queueOn && !ev.active.has(socket.id)) {
|
||||||
|
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// evaluate granting
|
||||||
|
evaluateQueue(eventId, io);
|
||||||
|
|
||||||
|
// send initial update
|
||||||
|
io.to(socket.id).emit("queue_update", buildUpdate(ev));
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.on("disconnect", () => {
|
||||||
|
// remove from every event
|
||||||
|
for (const [eventId, ev] of Object.entries(events)) {
|
||||||
|
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id);
|
||||||
|
const qi = ev.queue.indexOf(socket.id);
|
||||||
|
if (qi !== -1) ev.queue.splice(qi, 1);
|
||||||
|
if (ev.active.has(socket.id)) ev.active.delete(socket.id);
|
||||||
|
// re-evaluate to grant next in line
|
||||||
|
evaluateQueue(eventId, io);
|
||||||
|
// notify remaining sockets
|
||||||
|
broadcastUpdate(eventId, io);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
158
server/index.js
158
server/index.js
@@ -1,158 +0,0 @@
|
|||||||
/*
|
|
||||||
Simple Socket.IO queue server for demo purposes.
|
|
||||||
- Tracks users per-event
|
|
||||||
- When active concurrent users for an event reach QUEUE_THRESHOLD, turn on queueing
|
|
||||||
- Allow up to CONCURRENT_ACTIVE users on the site simultaneously (have access token)
|
|
||||||
- Issue JWT tokens valid for TOKEN_TTL_SECONDS when a user reaches front of queue
|
|
||||||
- Uses MySQL to persist queue positions (lightweight stub here)
|
|
||||||
|
|
||||||
NOTE: This is a minimal, synchronous-in-memory demo. For production you must persist state
|
|
||||||
in Redis or a DB, add authentication, rate-limiting, and proper error handling.
|
|
||||||
*/
|
|
||||||
|
|
||||||
require("dotenv").config();
|
|
||||||
const http = require("http");
|
|
||||||
const { Server } = require("socket.io");
|
|
||||||
const jwt = require("jsonwebtoken");
|
|
||||||
const mysql = require("mysql2/promise");
|
|
||||||
|
|
||||||
const PORT = process.env.SOCKET_PORT || 4000;
|
|
||||||
const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10);
|
|
||||||
const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10);
|
|
||||||
const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10);
|
|
||||||
|
|
||||||
// Simple in-memory structures keyed by eventId
|
|
||||||
const events = {};
|
|
||||||
|
|
||||||
function ensureEvent(eventId) {
|
|
||||||
if (!events[eventId]) {
|
|
||||||
events[eventId] = {
|
|
||||||
sockets: new Set(), // connected socket ids
|
|
||||||
queue: [], // array of socket ids in order
|
|
||||||
active: new Set(), // sockets that currently hold a token (allowed to buy)
|
|
||||||
queueOn: false,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
return events[eventId];
|
|
||||||
}
|
|
||||||
|
|
||||||
async function createDbPool() {
|
|
||||||
if (!process.env.MYSQL_HOST) return null;
|
|
||||||
return mysql.createPool({
|
|
||||||
host: process.env.MYSQL_HOST,
|
|
||||||
user: process.env.MYSQL_USER,
|
|
||||||
password: process.env.MYSQL_PASSWORD,
|
|
||||||
database: process.env.MYSQL_DATABASE,
|
|
||||||
waitForConnections: true,
|
|
||||||
connectionLimit: 5,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
;(async () => {
|
|
||||||
const db = await createDbPool();
|
|
||||||
|
|
||||||
const server = http.createServer();
|
|
||||||
const io = new Server(server, {
|
|
||||||
cors: { origin: true },
|
|
||||||
});
|
|
||||||
|
|
||||||
io.on("connection", (socket) => {
|
|
||||||
console.log("connect", socket.id);
|
|
||||||
|
|
||||||
socket.on("join_event", async ({ eventId }) => {
|
|
||||||
if (!eventId) return;
|
|
||||||
const ev = ensureEvent(eventId);
|
|
||||||
ev.sockets.add(socket.id);
|
|
||||||
socket.join(eventId);
|
|
||||||
|
|
||||||
// compute counts
|
|
||||||
const activeCount = ev.sockets.size;
|
|
||||||
// turn on queue if threshold reached
|
|
||||||
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true;
|
|
||||||
|
|
||||||
// if queueOn and socket not active, add to queue
|
|
||||||
if (ev.queueOn && !ev.active.has(socket.id)) {
|
|
||||||
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id);
|
|
||||||
}
|
|
||||||
|
|
||||||
// evaluate granting
|
|
||||||
evaluateQueue(eventId, io);
|
|
||||||
|
|
||||||
// send initial update
|
|
||||||
io.to(socket.id).emit("queue_update", buildUpdate(ev));
|
|
||||||
});
|
|
||||||
|
|
||||||
socket.on("disconnect", () => {
|
|
||||||
// remove from every event
|
|
||||||
for (const [eventId, ev] of Object.entries(events)) {
|
|
||||||
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id);
|
|
||||||
const qi = ev.queue.indexOf(socket.id);
|
|
||||||
if (qi !== -1) ev.queue.splice(qi, 1);
|
|
||||||
if (ev.active.has(socket.id)) ev.active.delete(socket.id);
|
|
||||||
// re-evaluate to grant next in line
|
|
||||||
evaluateQueue(eventId, io);
|
|
||||||
// notify remaining sockets
|
|
||||||
broadcastUpdate(eventId, io);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
server.listen(PORT, () => console.log(`Socket server listening on ${PORT}`));
|
|
||||||
|
|
||||||
function buildUpdate(ev) {
|
|
||||||
const positionMap = {};
|
|
||||||
ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1));
|
|
||||||
return {
|
|
||||||
activeCount: ev.sockets.size,
|
|
||||||
queueOn: ev.queueOn,
|
|
||||||
estimatedWait: ev.queue.length * 5, // naive: 5s per person
|
|
||||||
positions: positionMap,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
function broadcastUpdate(eventId, io) {
|
|
||||||
const ev = events[eventId];
|
|
||||||
if (!ev) return;
|
|
||||||
// notify all connected sockets in room
|
|
||||||
for (const sid of ev.sockets) {
|
|
||||||
const pos = ev.queue.indexOf(sid);
|
|
||||||
io.to(sid).emit("queue_update", {
|
|
||||||
activeCount: ev.sockets.size,
|
|
||||||
position: pos === -1 ? null : pos + 1,
|
|
||||||
estimatedWait: pos === -1 ? null : ev.queue.length * 5,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function evaluateQueue(eventId, io) {
|
|
||||||
const ev = events[eventId];
|
|
||||||
if (!ev) return;
|
|
||||||
|
|
||||||
// ensure active set size <= CONCURRENT_ACTIVE
|
|
||||||
while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) {
|
|
||||||
const next = ev.queue.shift();
|
|
||||||
if (!next) break;
|
|
||||||
// sign token
|
|
||||||
const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", {
|
|
||||||
expiresIn: TOKEN_TTL_SECONDS,
|
|
||||||
});
|
|
||||||
ev.active.add(next);
|
|
||||||
io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() });
|
|
||||||
}
|
|
||||||
|
|
||||||
// If too many active (rare), revoke oldest
|
|
||||||
if (ev.active.size > CONCURRENT_ACTIVE) {
|
|
||||||
const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE);
|
|
||||||
toRevoke.forEach((sid) => {
|
|
||||||
ev.active.delete(sid);
|
|
||||||
io.to(sid).emit("revoked");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// If queue no longer needed, clear it
|
|
||||||
if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false;
|
|
||||||
|
|
||||||
// broadcast
|
|
||||||
broadcastUpdate(eventId, io);
|
|
||||||
}
|
|
||||||
})();
|
|
||||||
Reference in New Issue
Block a user