feat: remove Socket.IO server implementation from socket.js
This commit is contained in:
@@ -1,148 +0,0 @@
|
||||
import { Server } from "socket.io";
|
||||
import jwt from "jsonwebtoken";
|
||||
import mysql from "mysql2/promise";
|
||||
|
||||
// Simple in-memory structures keyed by eventId
|
||||
const events = {};
|
||||
|
||||
const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10);
|
||||
const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10);
|
||||
const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10);
|
||||
|
||||
function ensureEvent(eventId) {
|
||||
if (!events[eventId]) {
|
||||
events[eventId] = {
|
||||
sockets: new Set(), // connected socket ids
|
||||
queue: [], // array of socket ids in order
|
||||
active: new Set(), // sockets that currently hold a token (allowed to buy)
|
||||
queueOn: false,
|
||||
};
|
||||
}
|
||||
return events[eventId];
|
||||
}
|
||||
|
||||
async function createDbPool() {
|
||||
if (!process.env.MYSQL_HOST) return null;
|
||||
return mysql.createPool({
|
||||
host: process.env.MYSQL_HOST,
|
||||
user: process.env.MYSQL_USER,
|
||||
password: process.env.MYSQL_PASSWORD,
|
||||
database: process.env.MYSQL_DATABASE,
|
||||
waitForConnections: true,
|
||||
connectionLimit: 5,
|
||||
});
|
||||
}
|
||||
|
||||
function buildUpdate(ev) {
|
||||
const positionMap = {};
|
||||
ev.queue.forEach((sid, idx) => (positionMap[sid] = idx + 1));
|
||||
return {
|
||||
activeCount: ev.sockets.size,
|
||||
queueOn: ev.queueOn,
|
||||
estimatedWait: ev.queue.length * 5, // naive: 5s per person
|
||||
positions: positionMap,
|
||||
};
|
||||
}
|
||||
|
||||
function broadcastUpdate(eventId, io) {
|
||||
const ev = events[eventId];
|
||||
if (!ev) return;
|
||||
// notify all connected sockets in room
|
||||
for (const sid of ev.sockets) {
|
||||
const pos = ev.queue.indexOf(sid);
|
||||
io.to(sid).emit("queue_update", {
|
||||
activeCount: ev.sockets.size,
|
||||
position: pos === -1 ? null : pos + 1,
|
||||
estimatedWait: pos === -1 ? null : ev.queue.length * 5,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function evaluateQueue(eventId, io) {
|
||||
const ev = events[eventId];
|
||||
if (!ev) return;
|
||||
|
||||
// ensure active set size <= CONCURRENT_ACTIVE
|
||||
while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) {
|
||||
const next = ev.queue.shift();
|
||||
if (!next) break;
|
||||
// sign token
|
||||
const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", {
|
||||
expiresIn: TOKEN_TTL_SECONDS,
|
||||
});
|
||||
ev.active.add(next);
|
||||
io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() });
|
||||
}
|
||||
|
||||
// If too many active (rare), revoke oldest
|
||||
if (ev.active.size > CONCURRENT_ACTIVE) {
|
||||
const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE);
|
||||
toRevoke.forEach((sid) => {
|
||||
ev.active.delete(sid);
|
||||
io.to(sid).emit("revoked");
|
||||
});
|
||||
}
|
||||
|
||||
// If queue no longer needed, clear it
|
||||
if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false;
|
||||
|
||||
// broadcast
|
||||
broadcastUpdate(eventId, io);
|
||||
}
|
||||
|
||||
export default async function handler(req, res) {
|
||||
if (res.socket.server.io) {
|
||||
console.log("Socket is already running");
|
||||
} else {
|
||||
console.log("Socket is initializing");
|
||||
const io = new Server(res.socket.server, {
|
||||
path: "/api/socket",
|
||||
cors: { origin: true },
|
||||
});
|
||||
res.socket.server.io = io;
|
||||
|
||||
const db = await createDbPool();
|
||||
|
||||
io.on("connection", (socket) => {
|
||||
console.log("connect", socket.id);
|
||||
|
||||
socket.on("join_event", async ({ eventId }) => {
|
||||
if (!eventId) return;
|
||||
const ev = ensureEvent(eventId);
|
||||
ev.sockets.add(socket.id);
|
||||
socket.join(eventId);
|
||||
|
||||
// compute counts
|
||||
const activeCount = ev.sockets.size;
|
||||
// turn on queue if threshold reached
|
||||
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true;
|
||||
|
||||
// if queueOn and socket not active, add to queue
|
||||
if (ev.queueOn && !ev.active.has(socket.id)) {
|
||||
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id);
|
||||
}
|
||||
|
||||
// evaluate granting
|
||||
evaluateQueue(eventId, io);
|
||||
|
||||
// send initial update
|
||||
io.to(socket.id).emit("queue_update", buildUpdate(ev));
|
||||
});
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
// remove from every event
|
||||
for (const [eventId, ev] of Object.entries(events)) {
|
||||
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id);
|
||||
const qi = ev.queue.indexOf(socket.id);
|
||||
if (qi !== -1) ev.queue.splice(qi, 1);
|
||||
if (ev.active.has(socket.id)) ev.active.delete(socket.id);
|
||||
// re-evaluate to grant next in line
|
||||
evaluateQueue(eventId, io);
|
||||
// notify remaining sockets
|
||||
broadcastUpdate(eventId, io);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
res.end();
|
||||
}
|
||||
Reference in New Issue
Block a user