import { Server } from 'socket.io' import jwt from 'jsonwebtoken' import mysql from 'mysql2/promise' // Simple in-memory structures keyed by eventId (for fast access) const events = {} const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10) const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10) const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10) // MySQL kapcsolat let db = null async function getDbConnection() { if (!db && process.env.MYSQL_HOST) { db = await mysql.createConnection({ host: process.env.MYSQL_HOST, user: process.env.MYSQL_USER, password: process.env.MYSQL_PASSWORD, database: process.env.MYSQL_DATABASE, }) } return db } function ensureEvent(eventId) { if (!events[eventId]) { events[eventId] = { sockets: new Set(), // connected socket ids queue: [], // array of socket ids in order active: new Set(), // sockets that currently hold a token (allowed to buy) queueOn: false, } } return events[eventId] } function broadcastUpdate(eventId, io) { const ev = events[eventId] if (!ev) return // notify all connected sockets in room for (const sid of ev.sockets) { const pos = ev.queue.indexOf(sid) io.to(sid).emit("queue_update", { activeCount: ev.sockets.size, position: pos === -1 ? null : pos + 1, estimatedWait: pos === -1 ? null : ev.queue.length * 5, }) } } async function evaluateQueue(eventId, io) { const ev = events[eventId] if (!ev) return const connection = await getDbConnection() // ensure active set size <= CONCURRENT_ACTIVE while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) { const next = ev.queue.shift() if (!next) break // sign token const expiresAt = new Date(Date.now() + TOKEN_TTL_SECONDS * 1000) const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", { expiresIn: TOKEN_TTL_SECONDS, }) ev.active.add(next) // Save to database if (connection) { try { await connection.execute( 'INSERT INTO active_sessions (event_id, socket_id, jwt_token, expires_at) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE jwt_token = VALUES(jwt_token), expires_at = VALUES(expires_at)', [eventId, next, token, expiresAt] ) } catch (error) { console.error('DB error saving active session:', error) } } io.to(next).emit("granted", { token, expiresAt: expiresAt.toISOString() }) } // If too many active (rare), revoke oldest if (ev.active.size > CONCURRENT_ACTIVE) { const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE) for (const sid of toRevoke) { ev.active.delete(sid) io.to(sid).emit("revoked") // Remove from database if (connection) { try { await connection.execute( 'DELETE FROM active_sessions WHERE event_id = ? AND socket_id = ?', [eventId, sid] ) } catch (error) { console.error('DB error removing active session:', error) } } } } // If queue no longer needed, clear it if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false // broadcast broadcastUpdate(eventId, io) } export async function GET(req) { if (global.io) { console.log("Socket.IO már fut") return Response.json({ message: "Socket.IO már inicializálva" }) } console.log("Socket.IO inicializálása...") try { // Get the HTTP server instance from Next.js const server = req.nextUrl.protocol === 'https:' ? require('https').createServer() : require('http').createServer() const io = new Server(server, { cors: { origin: "*", methods: ["GET", "POST"] } }) global.io = io io.on("connection", (socket) => { console.log("Csatlakozott:", socket.id) socket.on("join_event", async ({ eventId }) => { if (!eventId) return const ev = ensureEvent(eventId) ev.sockets.add(socket.id) socket.join(eventId) const connection = await getDbConnection() // Get event threshold from database let eventThreshold = QUEUE_THRESHOLD if (connection) { try { const [rows] = await connection.execute( 'SELECT max_concurrent_users FROM events WHERE id = ?', [eventId] ) if (rows.length > 0) { eventThreshold = rows[0].max_concurrent_users } } catch (error) { console.error('DB error getting event:', error) } } // compute counts const activeCount = ev.sockets.size console.log(`Event ${eventId}: ${activeCount} users, threshold: ${eventThreshold}`) // turn on queue if threshold reached if (activeCount >= eventThreshold) { ev.queueOn = true console.log(`Queue activated for event ${eventId}`) } // if queueOn and socket not active, add to queue if (ev.queueOn && !ev.active.has(socket.id)) { if (!ev.queue.includes(socket.id)) { ev.queue.push(socket.id) console.log(`Added ${socket.id} to queue at position ${ev.queue.length}`) // Save queue position to database if (connection) { try { await connection.execute( 'INSERT INTO queue_entries (event_id, socket_id, position) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE position = VALUES(position)', [eventId, socket.id, ev.queue.length] ) } catch (error) { console.error('DB error saving queue entry:', error) } } } } // If queue is NOT active and user doesn't have access, grant it immediately if (!ev.queueOn && !ev.active.has(socket.id)) { console.log(`Granting immediate access to ${socket.id} (under threshold)`) const expiresAt = new Date(Date.now() + TOKEN_TTL_SECONDS * 1000) const token = jwt.sign({ sid: socket.id, eventId }, process.env.JWT_SECRET || "dev-secret", { expiresIn: TOKEN_TTL_SECONDS, }) ev.active.add(socket.id) // Save to database if (connection) { try { await connection.execute( 'INSERT INTO active_sessions (event_id, socket_id, jwt_token, expires_at) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE jwt_token = VALUES(jwt_token), expires_at = VALUES(expires_at)', [eventId, socket.id, token, expiresAt] ) } catch (error) { console.error('DB error saving active session:', error) } } io.to(socket.id).emit("granted", { token, expiresAt: expiresAt.toISOString() }) } // evaluate granting (for queue users) await evaluateQueue(eventId, io) // send initial update const pos = ev.queue.indexOf(socket.id) io.to(socket.id).emit("queue_update", { activeCount: ev.sockets.size, position: pos === -1 ? null : pos + 1, estimatedWait: pos === -1 ? null : ev.queue.length * 5, }) }) socket.on("disconnect", () => { console.log("Lecsatlakozott:", socket.id) // remove from every event for (const [eventId, ev] of Object.entries(events)) { if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id) const qi = ev.queue.indexOf(socket.id) if (qi !== -1) ev.queue.splice(qi, 1) if (ev.active.has(socket.id)) ev.active.delete(socket.id) // re-evaluate to grant next in line evaluateQueue(eventId, io) // notify remaining sockets broadcastUpdate(eventId, io) } }) }) // Start the Socket.IO server on a different port const SOCKET_PORT = process.env.SOCKET_PORT || 4000 server.listen(SOCKET_PORT, () => { console.log(`Socket.IO szerver fut a ${SOCKET_PORT} porton`) }) return Response.json({ message: "Socket.IO szerver inicializálva", port: SOCKET_PORT }) } catch (error) { console.error("Socket.IO inicializálási hiba:", error) return Response.json({ error: "Nem sikerült inicializálni a Socket.IO szervert" }, { status: 500 }) } }