363 lines
12 KiB
JavaScript
363 lines
12 KiB
JavaScript
import { Server } from 'socket.io'
|
|
import jwt from 'jsonwebtoken'
|
|
import mysql from 'mysql2/promise'
|
|
|
|
// Simple in-memory structures keyed by eventId (for fast access)
|
|
const events = {}
|
|
|
|
const QUEUE_THRESHOLD = parseInt(process.env.QUEUE_THRESHOLD || "100", 10)
|
|
const CONCURRENT_ACTIVE = parseInt(process.env.CONCURRENT_ACTIVE || "50", 10)
|
|
const TOKEN_TTL_SECONDS = parseInt(process.env.TOKEN_TTL_SECONDS || `${15 * 60}`, 10)
|
|
|
|
// MySQL kapcsolat - minden híváskor új connection
|
|
async function getDbConnection() {
|
|
if (!process.env.MYSQL_HOST) return null
|
|
try {
|
|
const connection = await mysql.createConnection({
|
|
host: process.env.MYSQL_HOST,
|
|
user: process.env.MYSQL_USER,
|
|
password: process.env.MYSQL_PASSWORD,
|
|
database: process.env.MYSQL_DATABASE,
|
|
})
|
|
return connection
|
|
} catch (error) {
|
|
console.error('MySQL connection error:', error)
|
|
return null
|
|
}
|
|
}
|
|
|
|
function ensureEvent(eventId) {
|
|
if (!events[eventId]) {
|
|
events[eventId] = {
|
|
sockets: new Set(), // connected socket ids
|
|
queue: [], // array of socket ids in order
|
|
active: new Set(), // sockets that currently hold a token (allowed to buy)
|
|
queueOn: false,
|
|
}
|
|
}
|
|
return events[eventId]
|
|
}
|
|
|
|
function broadcastUpdate(eventId, io) {
|
|
const ev = events[eventId]
|
|
if (!ev) return
|
|
// notify all connected sockets in room
|
|
for (const sid of ev.sockets) {
|
|
const pos = ev.queue.indexOf(sid)
|
|
io.to(sid).emit("queue_update", {
|
|
activeCount: ev.sockets.size,
|
|
position: pos === -1 ? null : pos + 1,
|
|
estimatedWait: pos === -1 ? null : (ev.active.size + pos) * TOKEN_TTL_SECONDS,
|
|
})
|
|
}
|
|
}
|
|
|
|
async function evaluateQueue(eventId, io) {
|
|
const ev = events[eventId]
|
|
if (!ev) return
|
|
|
|
const connection = await getDbConnection()
|
|
|
|
// ensure active set size <= CONCURRENT_ACTIVE
|
|
while (ev.active.size < CONCURRENT_ACTIVE && ev.queue.length > 0) {
|
|
const next = ev.queue.shift()
|
|
if (!next) break
|
|
|
|
// Get database time for consistency
|
|
const [timeRows] = await connection.execute('SELECT NOW() as db_time')
|
|
const dbTime = new Date(timeRows[0].db_time)
|
|
const expiresAt = new Date(dbTime.getTime() + TOKEN_TTL_SECONDS * 1000)
|
|
|
|
const token = jwt.sign({ sid: next, eventId }, process.env.JWT_SECRET || "dev-secret", {
|
|
expiresIn: TOKEN_TTL_SECONDS,
|
|
})
|
|
|
|
console.log(`Creating queued token for ${next.substring(0, 8)}: DB time ${dbTime.toISOString()}, expires at ${expiresAt.toISOString()}, TTL: ${TOKEN_TTL_SECONDS}s`)
|
|
|
|
ev.active.add(next)
|
|
|
|
// Save to database
|
|
if (connection) {
|
|
try {
|
|
await connection.execute(
|
|
'INSERT INTO active_sessions (event_id, socket_id, jwt_token, expires_at) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE jwt_token = VALUES(jwt_token), expires_at = VALUES(expires_at)',
|
|
[eventId, next, token, expiresAt]
|
|
)
|
|
} catch (error) {
|
|
console.error('DB error saving active session:', error)
|
|
}
|
|
}
|
|
|
|
io.to(next).emit("granted", { token, expiresAt: expiresAt.toISOString() })
|
|
}
|
|
|
|
// Check for expired tokens in database and notify clients
|
|
if (connection) {
|
|
try {
|
|
// Debug: check current time vs stored times
|
|
const [allSessions] = await connection.execute(
|
|
'SELECT socket_id, expires_at, NOW() as server_time FROM active_sessions WHERE event_id = ?',
|
|
[eventId]
|
|
)
|
|
|
|
console.log(`Event ${eventId} - Current sessions:`, allSessions.map(s => ({
|
|
socket: s.socket_id.substring(0, 8),
|
|
expires: s.expires_at,
|
|
server_time: s.server_time,
|
|
expired: new Date(s.expires_at) < new Date(s.server_time)
|
|
})))
|
|
|
|
const [expiredSessions] = await connection.execute(
|
|
'SELECT socket_id FROM active_sessions WHERE event_id = ? AND expires_at < NOW()',
|
|
[eventId]
|
|
)
|
|
|
|
if (expiredSessions.length > 0) {
|
|
console.log(`Found ${expiredSessions.length} expired sessions for event ${eventId}`)
|
|
}
|
|
|
|
for (const session of expiredSessions) {
|
|
const sid = session.socket_id
|
|
if (ev.active.has(sid)) {
|
|
console.log(`Sending token_expired to ${sid.substring(0, 8)}`)
|
|
ev.active.delete(sid)
|
|
io.to(sid).emit("token_expired")
|
|
}
|
|
}
|
|
|
|
// Clean up expired sessions from database
|
|
if (expiredSessions.length > 0) {
|
|
console.log(`Cleaning up ${expiredSessions.length} expired sessions for event ${eventId}`);
|
|
await connection.execute(
|
|
'DELETE FROM active_sessions WHERE event_id = ? AND expires_at < NOW()',
|
|
[eventId]
|
|
)
|
|
}
|
|
} catch (error) {
|
|
console.error('DB error checking expired tokens:', error)
|
|
}
|
|
}
|
|
|
|
// If too many active (rare), revoke oldest
|
|
if (ev.active.size > CONCURRENT_ACTIVE) {
|
|
const toRevoke = Array.from(ev.active).slice(CONCURRENT_ACTIVE)
|
|
for (const sid of toRevoke) {
|
|
ev.active.delete(sid)
|
|
io.to(sid).emit("revoked")
|
|
|
|
// Remove from database
|
|
if (connection) {
|
|
try {
|
|
await connection.execute(
|
|
'DELETE FROM active_sessions WHERE event_id = ? AND socket_id = ?',
|
|
[eventId, sid]
|
|
)
|
|
} catch (error) {
|
|
console.error('DB error removing active session:', error)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If queue no longer needed, clear it
|
|
if (ev.sockets.size < QUEUE_THRESHOLD) ev.queueOn = false
|
|
|
|
// Close database connection
|
|
if (connection) {
|
|
try {
|
|
await connection.end()
|
|
} catch (error) {
|
|
console.error('Error closing DB connection:', error)
|
|
}
|
|
}
|
|
|
|
// broadcast
|
|
broadcastUpdate(eventId, io)
|
|
}
|
|
|
|
export async function GET(req) {
|
|
if (global.io) {
|
|
console.log("Socket.IO már fut")
|
|
return Response.json({ message: "Socket.IO már inicializálva" })
|
|
}
|
|
|
|
console.log("Socket.IO inicializálása...")
|
|
|
|
try {
|
|
// Get the HTTP server instance from Next.js
|
|
const server = req.nextUrl.protocol === 'https:'
|
|
? require('https').createServer()
|
|
: require('http').createServer()
|
|
|
|
const io = new Server(server, {
|
|
cors: {
|
|
origin: "*",
|
|
methods: ["GET", "POST"]
|
|
}
|
|
})
|
|
|
|
global.io = io
|
|
|
|
// Clean up all expired sessions on server start
|
|
const startupConnection = await getDbConnection()
|
|
if (startupConnection) {
|
|
try {
|
|
const [result] = await startupConnection.execute('DELETE FROM active_sessions WHERE expires_at < NOW()')
|
|
console.log(`Server startup: Cleaned ${result.affectedRows} expired sessions`)
|
|
await startupConnection.end()
|
|
} catch (error) {
|
|
console.error('Cleanup error on startup:', error)
|
|
}
|
|
}
|
|
|
|
// Periodikus token ellenőrzés minden 5 másodpercben
|
|
setInterval(async () => {
|
|
for (const eventId of Object.keys(events)) {
|
|
await evaluateQueue(eventId, io)
|
|
}
|
|
}, 5000)
|
|
|
|
io.on("connection", (socket) => {
|
|
console.log("Csatlakozott:", socket.id)
|
|
|
|
socket.on("join_event", async ({ eventId }) => {
|
|
if (!eventId) return
|
|
const ev = ensureEvent(eventId)
|
|
ev.sockets.add(socket.id)
|
|
socket.join(eventId)
|
|
|
|
const connection = await getDbConnection()
|
|
|
|
// Get event threshold from database
|
|
let eventThreshold = QUEUE_THRESHOLD
|
|
if (connection) {
|
|
try {
|
|
const [rows] = await connection.execute(
|
|
'SELECT max_concurrent_users FROM events WHERE id = ?',
|
|
[eventId]
|
|
)
|
|
if (rows.length > 0) {
|
|
eventThreshold = rows[0].max_concurrent_users
|
|
}
|
|
} catch (error) {
|
|
console.error('DB error getting event:', error)
|
|
}
|
|
}
|
|
|
|
// compute counts
|
|
const activeCount = ev.sockets.size
|
|
|
|
console.log(`Event ${eventId}: ${activeCount} users, threshold: ${eventThreshold}`)
|
|
|
|
// turn on queue if threshold reached
|
|
if (activeCount >= eventThreshold) {
|
|
ev.queueOn = true
|
|
console.log(`Queue activated for event ${eventId}`)
|
|
}
|
|
|
|
// if queueOn and socket not active, add to queue
|
|
if (ev.queueOn && !ev.active.has(socket.id)) {
|
|
if (!ev.queue.includes(socket.id)) {
|
|
ev.queue.push(socket.id)
|
|
console.log(`Added ${socket.id} to queue at position ${ev.queue.length}`)
|
|
|
|
// Save queue position to database
|
|
if (connection) {
|
|
try {
|
|
await connection.execute(
|
|
'INSERT INTO queue_entries (event_id, socket_id, position) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE position = VALUES(position)',
|
|
[eventId, socket.id, ev.queue.length]
|
|
)
|
|
} catch (error) {
|
|
console.error('DB error saving queue entry:', error)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// If queue is NOT active and user doesn't have access, grant it immediately
|
|
if (!ev.queueOn && !ev.active.has(socket.id)) {
|
|
console.log(`Granting immediate access to ${socket.id.substring(0, 8)} (under threshold)`)
|
|
|
|
// Get server time from database to ensure consistency
|
|
const [timeRows] = await connection.execute('SELECT NOW() as db_time')
|
|
const dbTime = new Date(timeRows[0].db_time)
|
|
const expiresAt = new Date(dbTime.getTime() + TOKEN_TTL_SECONDS * 1000)
|
|
|
|
console.log(`DB time: ${dbTime.toISOString()}, Token expires: ${expiresAt.toISOString()}, TTL: ${TOKEN_TTL_SECONDS}s`)
|
|
|
|
const token = jwt.sign({ sid: socket.id, eventId }, process.env.JWT_SECRET || "dev-secret", {
|
|
expiresIn: TOKEN_TTL_SECONDS,
|
|
})
|
|
|
|
ev.active.add(socket.id)
|
|
|
|
// Save to database
|
|
if (connection) {
|
|
try {
|
|
await connection.execute(
|
|
'INSERT INTO active_sessions (event_id, socket_id, jwt_token, expires_at) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE jwt_token = VALUES(jwt_token), expires_at = VALUES(expires_at)',
|
|
[eventId, socket.id, token, expiresAt]
|
|
)
|
|
} catch (error) {
|
|
console.error('DB error saving active session:', error)
|
|
}
|
|
}
|
|
|
|
io.to(socket.id).emit("granted", { token, expiresAt: expiresAt.toISOString() })
|
|
}
|
|
|
|
// evaluate granting (for queue users)
|
|
await evaluateQueue(eventId, io)
|
|
|
|
// send initial update
|
|
const pos = ev.queue.indexOf(socket.id)
|
|
io.to(socket.id).emit("queue_update", {
|
|
activeCount: ev.sockets.size,
|
|
position: pos === -1 ? null : pos + 1,
|
|
estimatedWait: pos === -1 ? null : (ev.active.size + pos) * TOKEN_TTL_SECONDS,
|
|
})
|
|
|
|
// Close database connection
|
|
if (connection) {
|
|
try {
|
|
await connection.end()
|
|
} catch (error) {
|
|
console.error('Error closing DB connection:', error)
|
|
}
|
|
}
|
|
})
|
|
|
|
socket.on("disconnect", () => {
|
|
console.log("Lecsatlakozott:", socket.id)
|
|
// remove from every event
|
|
for (const [eventId, ev] of Object.entries(events)) {
|
|
if (ev.sockets.has(socket.id)) ev.sockets.delete(socket.id)
|
|
const qi = ev.queue.indexOf(socket.id)
|
|
if (qi !== -1) ev.queue.splice(qi, 1)
|
|
if (ev.active.has(socket.id)) ev.active.delete(socket.id)
|
|
// re-evaluate to grant next in line
|
|
evaluateQueue(eventId, io)
|
|
// notify remaining sockets
|
|
broadcastUpdate(eventId, io)
|
|
}
|
|
})
|
|
})
|
|
|
|
// Start the Socket.IO server on a different port
|
|
const SOCKET_PORT = process.env.SOCKET_PORT || 4000
|
|
server.listen(SOCKET_PORT, () => {
|
|
console.log(`Socket.IO szerver fut a ${SOCKET_PORT} porton`)
|
|
})
|
|
|
|
return Response.json({
|
|
message: "Socket.IO szerver inicializálva",
|
|
port: SOCKET_PORT
|
|
})
|
|
|
|
} catch (error) {
|
|
console.error("Socket.IO inicializálási hiba:", error)
|
|
return Response.json({ error: "Nem sikerült inicializálni a Socket.IO szervert" }, { status: 500 })
|
|
}
|
|
}
|