feat: implement Socket.IO server with queue management and update environment variables
This commit is contained in:
@@ -1,3 +1,7 @@
|
||||
# Socket server
|
||||
SOCKET_PORT=4000
|
||||
NEXT_PUBLIC_SOCKET_PORT=4000
|
||||
|
||||
# Queue settings
|
||||
QUEUE_THRESHOLD=100
|
||||
CONCURRENT_ACTIVE=50
|
||||
@@ -10,4 +14,4 @@ JWT_SECRET=your_jwt_secret_here
|
||||
MYSQL_HOST=localhost
|
||||
MYSQL_USER=root
|
||||
MYSQL_PASSWORD=yourpassword
|
||||
MYSQL_DATABASE=queue_demo
|
||||
MYSQL_DATABASE=queue_demo
|
||||
@@ -47,7 +47,7 @@ This repository contains a minimal demo of a queue system for high-concurrency t
|
||||
|
||||
Files added in this demo:
|
||||
|
||||
- `pages/api/socket.js` — Next.js API route with Socket.IO server, in-memory queue logic and JWT issuance (demo only).
|
||||
- `app/api/socketio/route.js` — Next.js App Router API route that initializes the Socket.IO server, in-memory queue logic and JWT issuance (demo only).
|
||||
- `.env.example` — example environment variables for the queue system.
|
||||
- `app/page.tsx` — client UI (Next.js) that connects via Socket.IO and displays queue position, estimated wait, and purchase button.
|
||||
|
||||
@@ -65,13 +65,13 @@ Running locally
|
||||
pnpm install
|
||||
```
|
||||
|
||||
3. Start the Next.js dev server (includes the Socket.IO API route):
|
||||
3. Start the Next.js dev server:
|
||||
|
||||
```powershell
|
||||
pnpm dev
|
||||
```
|
||||
|
||||
4. Open `http://localhost:3000` and the client will connect to the Socket.IO API route at `/api/socket`.
|
||||
4. Open `http://localhost:3000` - the page will automatically initialize the Socket.IO server via API call to `/api/socketio`, then connect to it on port 4000.
|
||||
|
||||
Next steps / improvements
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { Server } from "socket.io";
|
||||
import jwt from "jsonwebtoken";
|
||||
import mysql from "mysql2/promise";
|
||||
import { NextRequest, NextResponse } from "next/server";
|
||||
|
||||
// Simple in-memory structures keyed by eventId
|
||||
const events = {};
|
||||
@@ -90,59 +91,67 @@ function evaluateQueue(eventId, io) {
|
||||
broadcastUpdate(eventId, io);
|
||||
}
|
||||
|
||||
export default async function handler(req, res) {
|
||||
if (res.socket.server.io) {
|
||||
export async function GET(req) {
|
||||
const res = NextResponse.next();
|
||||
|
||||
if (global.io) {
|
||||
console.log("Socket is already running");
|
||||
} else {
|
||||
console.log("Socket is initializing");
|
||||
const io = new Server(res.socket.server, {
|
||||
path: "/api/socket",
|
||||
cors: { origin: true },
|
||||
});
|
||||
res.socket.server.io = io;
|
||||
|
||||
const db = await createDbPool();
|
||||
|
||||
io.on("connection", (socket) => {
|
||||
console.log("connect", socket.id);
|
||||
|
||||
socket.on("join_event", async ({ eventId }) => {
|
||||
if (!eventId) return;
|
||||
const ev = ensureEvent(eventId);
|
||||
ev.sockets.add(socket.id);
|
||||
socket.join(eventId);
|
||||
|
||||
// compute counts
|
||||
const activeCount = ev.sockets.size;
|
||||
// turn on queue if threshold reached
|
||||
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true;
|
||||
|
||||
// if queueOn and socket not active, add to queue
|
||||
if (ev.queueOn && !ev.active.has(socket.id)) {
|
||||
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id);
|
||||
}
|
||||
|
||||
// evaluate granting
|
||||
evaluateQueue(eventId, io);
|
||||
|
||||
// send initial update
|
||||
io.to(socket.id).emit("queue_update", buildUpdate(ev));
|
||||
const httpServer = req.socket?.server;
|
||||
|
||||
if (httpServer) {
|
||||
const io = new Server(httpServer, {
|
||||
path: "/api/socket",
|
||||
cors: { origin: true },
|
||||
});
|
||||
|
||||
global.io = io;
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
// remove from every event
|
||||
for (const [eventId, ev] of Object.entries(events)) {
|
||||
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id);
|
||||
const qi = ev.queue.indexOf(socket.id);
|
||||
if (qi !== -1) ev.queue.splice(qi, 1);
|
||||
if (ev.active.has(socket.id)) ev.active.delete(socket.id);
|
||||
// re-evaluate to grant next in line
|
||||
const db = await createDbPool();
|
||||
|
||||
io.on("connection", (socket) => {
|
||||
console.log("connect", socket.id);
|
||||
|
||||
socket.on("join_event", async ({ eventId }) => {
|
||||
if (!eventId) return;
|
||||
const ev = ensureEvent(eventId);
|
||||
ev.sockets.add(socket.id);
|
||||
socket.join(eventId);
|
||||
|
||||
// compute counts
|
||||
const activeCount = ev.sockets.size;
|
||||
// turn on queue if threshold reached
|
||||
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true;
|
||||
|
||||
// if queueOn and socket not active, add to queue
|
||||
if (ev.queueOn && !ev.active.has(socket.id)) {
|
||||
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id);
|
||||
}
|
||||
|
||||
// evaluate granting
|
||||
evaluateQueue(eventId, io);
|
||||
// notify remaining sockets
|
||||
broadcastUpdate(eventId, io);
|
||||
}
|
||||
|
||||
// send initial update
|
||||
io.to(socket.id).emit("queue_update", buildUpdate(ev));
|
||||
});
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
// remove from every event
|
||||
for (const [eventId, ev] of Object.entries(events)) {
|
||||
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id);
|
||||
const qi = ev.queue.indexOf(socket.id);
|
||||
if (qi !== -1) ev.queue.splice(qi, 1);
|
||||
if (ev.active.has(socket.id)) ev.active.delete(socket.id);
|
||||
// re-evaluate to grant next in line
|
||||
evaluateQueue(eventId, io);
|
||||
// notify remaining sockets
|
||||
broadcastUpdate(eventId, io);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
res.end();
|
||||
|
||||
return new Response("Socket.IO server initialized", { status: 200 });
|
||||
}
|
||||
154
app/api/socketio/route.js
Normal file
154
app/api/socketio/route.js
Normal file
@@ -0,0 +1,154 @@
|
||||
import { Server } from 'socket.io'
|
||||
import jwt from 'jsonwebtoken'
|
||||
|
||||
// Simple in-memory structures keyed by eventId
|
||||
const events = {}
|
||||
|
||||
const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10)
|
||||
const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10)
|
||||
const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10)
|
||||
|
||||
function ensureEvent(eventId) {
|
||||
if (!events[eventId]) {
|
||||
events[eventId] = {
|
||||
sockets: new Set(), // connected socket ids
|
||||
queue: [], // array of socket ids in order
|
||||
active: new Set(), // sockets that currently hold a token (allowed to buy)
|
||||
queueOn: false,
|
||||
}
|
||||
}
|
||||
return events[eventId]
|
||||
}
|
||||
|
||||
function broadcastUpdate(eventId, io) {
|
||||
const ev = events[eventId]
|
||||
if (!ev) return
|
||||
// notify all connected sockets in room
|
||||
for (const sid of ev.sockets) {
|
||||
const pos = ev.queue.indexOf(sid)
|
||||
io.to(sid).emit("queue_update", {
|
||||
activeCount: ev.sockets.size,
|
||||
position: pos === -1 ? null : pos + 1,
|
||||
estimatedWait: pos === -1 ? null : ev.queue.length * 5,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function evaluateQueue(eventId, io) {
|
||||
const ev = events[eventId]
|
||||
if (!ev) return
|
||||
|
||||
// ensure active set size <= CONCURRENT_ACTIVE
|
||||
while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) {
|
||||
const next = ev.queue.shift()
|
||||
if (!next) break
|
||||
// sign token
|
||||
const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", {
|
||||
expiresIn: TOKEN_TTL_SECONDS,
|
||||
})
|
||||
ev.active.add(next)
|
||||
io.to(next).emit("granted", { token, expiresAt: new Date(Date.now() + TOKEN_TTL_SECONDS * 1000).toISOString() })
|
||||
}
|
||||
|
||||
// If too many active (rare), revoke oldest
|
||||
if (ev.active.size > CONCURRENT_ACTIVE) {
|
||||
const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE)
|
||||
toRevoke.forEach((sid) => {
|
||||
ev.active.delete(sid)
|
||||
io.to(sid).emit("revoked")
|
||||
})
|
||||
}
|
||||
|
||||
// If queue no longer needed, clear it
|
||||
if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false
|
||||
|
||||
// broadcast
|
||||
broadcastUpdate(eventId, io)
|
||||
}
|
||||
|
||||
export async function GET(req) {
|
||||
if (global.io) {
|
||||
console.log("Socket.IO már fut")
|
||||
return Response.json({ message: "Socket.IO már inicializálva" })
|
||||
}
|
||||
|
||||
console.log("Socket.IO inicializálása...")
|
||||
|
||||
try {
|
||||
// Get the HTTP server instance from Next.js
|
||||
const server = req.nextUrl.protocol === 'https:'
|
||||
? require('https').createServer()
|
||||
: require('http').createServer()
|
||||
|
||||
const io = new Server(server, {
|
||||
cors: {
|
||||
origin: "*",
|
||||
methods: ["GET", "POST"]
|
||||
}
|
||||
})
|
||||
|
||||
global.io = io
|
||||
|
||||
io.on("connection", (socket) => {
|
||||
console.log("Csatlakozott:", socket.id)
|
||||
|
||||
socket.on("join_event", async ({ eventId }) => {
|
||||
if (!eventId) return
|
||||
const ev = ensureEvent(eventId)
|
||||
ev.sockets.add(socket.id)
|
||||
socket.join(eventId)
|
||||
|
||||
// compute counts
|
||||
const activeCount = ev.sockets.size
|
||||
// turn on queue if threshold reached
|
||||
if (activeCount >= QUEUE_THRESHOLD) ev.queueOn = true
|
||||
|
||||
// if queueOn and socket not active, add to queue
|
||||
if (ev.queueOn && !ev.active.has(socket.id)) {
|
||||
if (!ev.queue.includes(socket.id)) ev.queue.push(socket.id)
|
||||
}
|
||||
|
||||
// evaluate granting
|
||||
evaluateQueue(eventId, io)
|
||||
|
||||
// send initial update
|
||||
const pos = ev.queue.indexOf(socket.id)
|
||||
io.to(socket.id).emit("queue_update", {
|
||||
activeCount: ev.sockets.size,
|
||||
position: pos === -1 ? null : pos + 1,
|
||||
estimatedWait: pos === -1 ? null : ev.queue.length * 5,
|
||||
})
|
||||
})
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
console.log("Lecsatlakozott:", socket.id)
|
||||
// remove from every event
|
||||
for (const [eventId, ev] of Object.entries(events)) {
|
||||
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id)
|
||||
const qi = ev.queue.indexOf(socket.id)
|
||||
if (qi !== -1) ev.queue.splice(qi, 1)
|
||||
if (ev.active.has(socket.id)) ev.active.delete(socket.id)
|
||||
// re-evaluate to grant next in line
|
||||
evaluateQueue(eventId, io)
|
||||
// notify remaining sockets
|
||||
broadcastUpdate(eventId, io)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Start the Socket.IO server on a different port
|
||||
const SOCKET_PORT = process.env.SOCKET_PORT || 4000
|
||||
server.listen(SOCKET_PORT, () => {
|
||||
console.log(`Socket.IO szerver fut a ${SOCKET_PORT} porton`)
|
||||
})
|
||||
|
||||
return Response.json({
|
||||
message: "Socket.IO szerver inicializálva",
|
||||
port: SOCKET_PORT
|
||||
})
|
||||
|
||||
} catch (error) {
|
||||
console.error("Socket.IO inicializálási hiba:", error)
|
||||
return Response.json({ error: "Nem sikerült inicializálni a Socket.IO szervert" }, { status: 500 })
|
||||
}
|
||||
}
|
||||
85
app/page.tsx
85
app/page.tsx
@@ -14,50 +14,59 @@ export default function Home() {
|
||||
|
||||
useEffect(() => {
|
||||
let mounted = true;
|
||||
// dynamic import to avoid bundling issues on server
|
||||
import("socket.io-client").then(({ io }) => {
|
||||
if (!mounted) return;
|
||||
const socket = io({
|
||||
path: "/api/socket",
|
||||
autoConnect: true
|
||||
});
|
||||
socketRef.current = socket;
|
||||
|
||||
// Először inicializáljuk a Socket.IO szervert
|
||||
fetch('/api/socketio')
|
||||
.then(() => {
|
||||
// dynamic import to avoid bundling issues on server
|
||||
return import("socket.io-client");
|
||||
})
|
||||
.then(({ io }) => {
|
||||
if (!mounted) return;
|
||||
const socketPort = process.env.NEXT_PUBLIC_SOCKET_PORT || "4000";
|
||||
const socket = io(`http://localhost:${socketPort}`, {
|
||||
autoConnect: true
|
||||
});
|
||||
socketRef.current = socket;
|
||||
|
||||
socket.on("connect", () => {
|
||||
setConnected(true);
|
||||
// join a named event (example eventId: pamkutya)
|
||||
socket.emit("join_event", { eventId: "pamkutya" });
|
||||
});
|
||||
socket.on("connect", () => {
|
||||
setConnected(true);
|
||||
// join a named event (example eventId: pamkutya)
|
||||
socket.emit("join_event", { eventId: "pamkutya" });
|
||||
});
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
setConnected(false);
|
||||
setPosition(null);
|
||||
setHasAccess(false);
|
||||
setTokenExpiry(null);
|
||||
});
|
||||
socket.on("disconnect", () => {
|
||||
setConnected(false);
|
||||
setPosition(null);
|
||||
setHasAccess(false);
|
||||
setTokenExpiry(null);
|
||||
});
|
||||
|
||||
socket.on("queue_update", (data: any) => {
|
||||
setPosition(data.position ?? null);
|
||||
setEstimatedWait(data.estimatedWait ?? null);
|
||||
setActiveUsers(data.activeCount ?? 0);
|
||||
});
|
||||
socket.on("queue_update", (data: any) => {
|
||||
setPosition(data.position ?? null);
|
||||
setEstimatedWait(data.estimatedWait ?? null);
|
||||
setActiveUsers(data.activeCount ?? 0);
|
||||
});
|
||||
|
||||
socket.on("granted", (data: any) => {
|
||||
// { token, expiresAt }
|
||||
setHasAccess(true);
|
||||
setTokenExpiry(data.expiresAt ? Date.parse(data.expiresAt) : Date.now() + 15 * 60 * 1000);
|
||||
// store token locally for API calls
|
||||
try {
|
||||
localStorage.setItem("event_token", data.token);
|
||||
} catch (e) {}
|
||||
});
|
||||
socket.on("granted", (data: any) => {
|
||||
// { token, expiresAt }
|
||||
setHasAccess(true);
|
||||
setTokenExpiry(data.expiresAt ? Date.parse(data.expiresAt) : Date.now() + 15 * 60 * 1000);
|
||||
// store token locally for API calls
|
||||
try {
|
||||
localStorage.setItem("event_token", data.token);
|
||||
} catch (e) {}
|
||||
});
|
||||
|
||||
socket.on("revoked", () => {
|
||||
setHasAccess(false);
|
||||
setTokenExpiry(null);
|
||||
localStorage.removeItem("event_token");
|
||||
socket.on("revoked", () => {
|
||||
setHasAccess(false);
|
||||
setTokenExpiry(null);
|
||||
localStorage.removeItem("event_token");
|
||||
});
|
||||
})
|
||||
.catch(error => {
|
||||
console.error("Socket inicializálási hiba:", error);
|
||||
});
|
||||
});
|
||||
|
||||
return () => {
|
||||
mounted = false;
|
||||
|
||||
Reference in New Issue
Block a user