Files
funcode2.0/server/index.js
devbeni e60c801f4e feat: add initial Socket.IO queue server implementation
- Created a basic Socket.IO server that manages user connections and queues for events.
- Implemented queue logic to handle concurrent user limits and JWT token issuance.
- Added MySQL configuration for potential persistence of queue positions.
- Introduced environment variables for configuration through a .env.example file.
2025-09-19 18:49:12 +02:00

159 lines
5.0 KiB
JavaScript

/*
Simple Socket.IO queue server for demo purposes.
- Tracks users per-event
- When active concurrent users for an event reach QUEUE_THRESHOLD, turn on queueing
- Allow up to CONCURRENT_ACTIVE users on the site simultaneously (have access token)
- Issue JWT tokens valid for TOKEN_TTL_SECONDS when a user reaches front of queue
- Uses MySQL to persist queue positions (lightweight stub here)
NOTE: This is a minimal, synchronous-in-memory demo. For production you must persist state
in Redis or a DB, add authentication, rate-limiting, and proper error handling.
*/
require("dotenv").config();
const http = require("http");
const { Server } = require("socket.io");
const jwt = require("jsonwebtoken");
const mysql = require("mysql2/promise");
const PORT = process.env.SOCKET_PORT || 4000;
const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10);
const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10);
const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10);
// Simple in-memory structures keyed by eventId
const events = {};
function ensureEvent(eventId) {
if (!events[eventId]) {
events[eventId] = {
sockets: new Set(), // connected socket ids
queue: [], // array of socket ids in order
active: new Set(), // sockets that currently hold a token (allowed to buy)
queueOn: false,
};
}
return events[eventId];
}
async function createDbPool() {
if (!process.env.MYSQL_HOST) return null;
return mysql.createPool({
host: process.env.MYSQL_HOST,
user: process.env.MYSQL_USER,
password: process.env.MYSQL_PASSWORD,
database: process.env.MYSQL_DATABASE,
waitForConnections: true,
connectionLimit: 5,
});
}
;(async () => {
const db = await createDbPool();
const server = http.createServer();
const io = new Server(server, {
cors: { origin: true },
});
io.on("connection", (socket) => {
console.log("connect", socket.id);
socket.on("join_event", async ({ eventId }) => {
if (!eventId) return;
const ev = ensureEvent(eventId);
ev.sockets.add(socket.id);
socket.join(eventId);
// compute counts
const activeCount = ev.sockets.size;
// turn on queue if threshold reached
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true;
// if queueOn and socket not active, add to queue
if (ev.queueOn && !ev.active.has(socket.id)) {
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id);
}
// evaluate granting
evaluateQueue(eventId, io);
// send initial update
io.to(socket.id).emit("queue_update", buildUpdate(ev));
});
socket.on("disconnect", () => {
// remove from every event
for (const [eventId, ev] of Object.entries(events)) {
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id);
const qi = ev.queue.indexOf(socket.id);
if (qi !== -1) ev.queue.splice(qi, 1);
if (ev.active.has(socket.id)) ev.active.delete(socket.id);
// re-evaluate to grant next in line
evaluateQueue(eventId, io);
// notify remaining sockets
broadcastUpdate(eventId, io);
}
});
});
server.listen(PORT, () => console.log(`Socket server listening on ${PORT}`));
function buildUpdate(ev) {
const positionMap = {};
ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1));
return {
activeCount: ev.sockets.size,
queueOn: ev.queueOn,
estimatedWait: ev.queue.length * 5, // naive: 5s per person
positions: positionMap,
};
}
function broadcastUpdate(eventId, io) {
const ev = events[eventId];
if (!ev) return;
// notify all connected sockets in room
for (const sid of ev.sockets) {
const pos = ev.queue.indexOf(sid);
io.to(sid).emit("queue_update", {
activeCount: ev.sockets.size,
position: pos === -1 ? null : pos + 1,
estimatedWait: pos === -1 ? null : ev.queue.length * 5,
});
}
}
function evaluateQueue(eventId, io) {
const ev = events[eventId];
if (!ev) return;
// ensure active set size <= CONCURRENT_ACTIVE
while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) {
const next = ev.queue.shift();
if (!next) break;
// sign token
const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", {
expiresIn: TOKEN_TTL_SECONDS,
});
ev.active.add(next);
io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() });
}
// If too many active (rare), revoke oldest
if (ev.active.size > CONCURRENT_ACTIVE) {
const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE);
toRevoke.forEach((sid) => {
ev.active.delete(sid);
io.to(sid).emit("revoked");
});
}
// If queue no longer needed, clear it
if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false;
// broadcast
broadcastUpdate(eventId, io);
}
})();