This commit is contained in:
2025-06-16 15:14:23 +02:00
commit 074e590073
3174 changed files with 428263 additions and 0 deletions

View File

@ -0,0 +1,756 @@
// threaded transport to handle all the magic.
// implementations are automatically elevated to the worker thread
// by simply overwriting all the thread functions
//
// note that ThreadLog.cs is required for Debug.Log from threads to work in builds.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Net;
using System.Threading;
using UnityEngine;
using Debug = UnityEngine.Debug;
namespace Mirror
{
// buffered events for main thread
enum ClientMainEventType
{
OnClientConnected,
OnClientSent,
OnClientReceived,
OnClientError,
OnClientDisconnected,
}
enum ServerMainEventType
{
OnServerConnected,
OnServerSent,
OnServerReceived,
OnServerError,
OnServerDisconnected,
}
// buffered events for worker thread
enum ThreadEventType
{
DoServerStart,
DoServerSend,
DoServerDisconnect,
DoServerStop,
DoClientConnect,
DoClientSend,
DoClientDisconnect,
Sleep,
Wake,
DoShutdown
}
struct ClientMainEvent
{
public ClientMainEventType type;
public object param;
// some events have value type parameters: connectionId, error.
// store them explicitly to avoid boxing allocations to 'object param'.
public int? channelId; // connect/disconnect don't have a channel
public TransportError? error;
public ClientMainEvent(
ClientMainEventType type,
object param,
int? channelId = null,
TransportError? error = null)
{
this.type = type;
this.channelId = channelId;
this.error = error;
this.param = param;
}
}
struct ServerMainEvent
{
public ServerMainEventType type;
public object param;
// some events have value type parameters: connectionId, error.
// store them explicitly to avoid boxing allocations to 'object param'.
public int? connectionId; // only server needs connectionId
public int? channelId; // connect/disconnect don't have a channel
public TransportError? error;
public ServerMainEvent(
ServerMainEventType type,
object param,
int? connectionId,
int? channelId = null,
TransportError? error = null)
{
this.type = type;
this.channelId = channelId;
this.connectionId = connectionId;
this.error = error;
this.param = param;
}
}
struct ThreadEvent
{
public ThreadEventType type;
public object param;
// some events have value type parameters: connectionId.
// store them explicitly to avoid boxing allocations to 'object param'.
public int? connectionId;
public int? channelId;
public ThreadEvent(
ThreadEventType type,
object param,
int? connectionId = null,
int? channelId = null)
{
this.type = type;
this.connectionId = connectionId;
this.channelId = channelId;
this.param = param;
}
}
public abstract class ThreadedTransport : Transport
{
WorkerThread thread;
// main thread's event queue.
// worker thread puts events in, main thread processes them.
// client & server separate because EarlyUpdate is separate too.
// TODO nonalloc
readonly ConcurrentQueue<ClientMainEvent> clientMainQueue = new ConcurrentQueue<ClientMainEvent>();
readonly ConcurrentQueue<ServerMainEvent> serverMainQueue = new ConcurrentQueue<ServerMainEvent>();
// worker thread's event queue
// main thread puts events in, worker thread processes them.
// TODO nonalloc
readonly ConcurrentQueue<ThreadEvent> threadQueue = new ConcurrentQueue<ThreadEvent>();
// active flags, since we can't access server/client from main thread
volatile bool serverActive;
volatile bool clientConnected;
// max number of thread messages to process per tick in main thread.
// very large limit to prevent deadlocks.
const int MaxProcessingPerTick = 10_000_000;
[Tooltip("Detect device sleep mode and automatically disconnect + hibernate the thread after 'sleepTimeout' seconds.\nFor example: on mobile / VR, we don't want to drain the battery after putting down the device.")]
public bool sleepDetection = true;
public float sleepTimeoutInSeconds = 30;
// communication between main & worker thread //////////////////////////
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void EnqueueClientMain(
ClientMainEventType type,
object param,
int? channelId,
TransportError? error) =>
clientMainQueue.Enqueue(new ClientMainEvent(type, param, channelId, error));
// add an event for main thread
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void EnqueueServerMain(
ServerMainEventType type,
object param,
int? connectionId,
int? channelId,
TransportError? error) =>
serverMainQueue.Enqueue(new ServerMainEvent(type, param, connectionId, channelId, error));
void EnqueueThread(
ThreadEventType type,
object param,
int? channelId,
int? connectionId) =>
threadQueue.Enqueue(new ThreadEvent(type, param, connectionId, channelId));
// Unity callbacks /////////////////////////////////////////////////////
protected virtual void Awake()
{
// start the thread.
// if main application terminates, this thread needs to terminate too.
EnsureThread();
}
// starts the thread if not created or not active yet.
void EnsureThread()
{
if (thread != null && thread.IsAlive) return;
thread = new WorkerThread(ToString());
thread.Tick = ThreadTick;
thread.Cleanup = ThreadedShutdown;
thread.Start();
Debug.Log($"ThreadedTransport: started worker thread!");
}
protected virtual void OnDestroy()
{
// stop thread fully
Shutdown();
// TODO recycle writers.
}
// worker thread ///////////////////////////////////////////////////////
// sleep timeout to automatically end if the device was put to sleep.
Stopwatch sleepTimer = null; // NOT THREAD SAFE: ONLY USE THIS IN WORKER THREAD!
void ProcessThreadQueue()
{
// TODO deadlock protection. worker thread may be to slow to process all.
while (threadQueue.TryDequeue(out ThreadEvent elem))
{
switch (elem.type)
{
// SERVER EVENTS ///////////////////////////////////////////
case ThreadEventType.DoServerStart: // start listening
{
// call the threaded function
ThreadedServerStart();
break;
}
case ThreadEventType.DoServerSend:
{
// call the threaded function
ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
ThreadedServerSend(elem.connectionId.Value, writer, elem.channelId.Value);
// recycle writer to thread safe pool for reuse
ConcurrentNetworkWriterPool.Return(writer);
break;
}
case ThreadEventType.DoServerDisconnect:
{
// call the threaded function
ThreadedServerDisconnect(elem.connectionId.Value);
break;
}
case ThreadEventType.DoServerStop: // stop listening
{
// call the threaded function
ThreadedServerStop();
break;
}
// CLIENT EVENTS ///////////////////////////////////////////
case ThreadEventType.DoClientConnect:
{
// call the threaded function
if (elem.param is string address)
ThreadedClientConnect(address);
else if (elem.param is Uri uri)
ThreadedClientConnect(uri);
break;
}
case ThreadEventType.DoClientSend:
{
// call the threaded function
ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
ThreadedClientSend(writer, elem.channelId.Value);
// recycle writer to thread safe pool for reuse
ConcurrentNetworkWriterPool.Return(writer);
break;
}
case ThreadEventType.DoClientDisconnect:
{
// call the threaded function
ThreadedClientDisconnect();
break;
}
// SLEEP ////////////////////////////////////////////////
case ThreadEventType.Sleep:
{
// start the sleep timer if not started yet
if (sleepTimer == null)
{
Debug.Log($"ThreadedTransport: sleep detected, sleeping in {sleepTimeoutInSeconds:F0}s!");
sleepTimer = Stopwatch.StartNew();
}
break;
}
case ThreadEventType.Wake:
{
// stop the sleep timer (if any)
if (sleepTimer != null)
{
Debug.Log($"ThreadedTransport: Woke up, interrupting sleep timer!");
sleepTimer = null;
}
break;
}
// SHUTDOWN ////////////////////////////////////////////////
case ThreadEventType.DoShutdown:
{
// call the threaded function
ThreadedShutdown();
break;
}
}
}
}
// Tick() returns a bool so it can easily stop the thread
// without needing to throw InterruptExceptions or similar.
bool ThreadTick()
{
// was the device put to sleep?
if (sleepTimer != null &&
sleepTimer.Elapsed.TotalSeconds >= sleepTimeoutInSeconds)
{
Debug.Log("ThreadedTransport: entering sleep mode and stopping/disconnecting.");
ThreadedServerStop();
ThreadedClientDisconnect();
sleepTimer = null;
// if the device was put to sleep, end the thread gracefully.
// all threads must end, otherwise putting down the device would
// slowly drain the battery after a day or more.
return false;
}
// early update the implementation first
ThreadedClientEarlyUpdate();
ThreadedServerEarlyUpdate();
// process queued user requests
ProcessThreadQueue();
// late update the implementation at the end
ThreadedClientLateUpdate();
ThreadedServerLateUpdate();
// save some cpu power.
Thread.Sleep(1);
return true;
}
// threaded callbacks to call from transport thread.
// they will be queued up for main thread automatically.
protected void OnThreadedClientConnected()
{
EnqueueClientMain(ClientMainEventType.OnClientConnected, null, null, null);
}
protected void OnThreadedClientSend(ArraySegment<byte> message, int channelId)
{
// ArraySegment is only valid until returning.
// copy to a writer until main thread processes it.
// make sure to recycle the writer in main thread.
ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
writer.WriteBytes(message.Array, message.Offset, message.Count);
EnqueueClientMain(ClientMainEventType.OnClientSent, writer, channelId, null);
}
protected void OnThreadedClientReceive(ArraySegment<byte> message, int channelId)
{
// ArraySegment is only valid until returning.
// copy to a writer until main thread processes it.
// make sure to recycle the writer in main thread.
ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
writer.WriteBytes(message.Array, message.Offset, message.Count);
EnqueueClientMain(ClientMainEventType.OnClientReceived, writer, channelId, null);
}
protected void OnThreadedClientError(TransportError error, string reason)
{
EnqueueClientMain(ClientMainEventType.OnClientError, reason, null, error);
}
protected void OnThreadedClientDisconnected()
{
EnqueueClientMain(ClientMainEventType.OnClientDisconnected, null, null, null);
}
protected void OnThreadedServerConnected(int connectionId, IPEndPoint endPoint)
{
// create string copy of address immediately before sending to another thread
string address = endPoint.PrettyAddress();
EnqueueServerMain(ServerMainEventType.OnServerConnected, address, connectionId, null, null);
}
protected void OnThreadedServerSend(int connectionId, ArraySegment<byte> message, int channelId)
{
// ArraySegment is only valid until returning.
// copy to a writer until main thread processes it.
// make sure to recycle the writer in main thread.
ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
writer.WriteBytes(message.Array, message.Offset, message.Count);
EnqueueServerMain(ServerMainEventType.OnServerSent, writer, connectionId, channelId, null);
}
protected void OnThreadedServerReceive(int connectionId, ArraySegment<byte> message, int channelId)
{
// ArraySegment is only valid until returning.
// copy to a writer until main thread processes it.
// make sure to recycle the writer in main thread.
ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
writer.WriteBytes(message.Array, message.Offset, message.Count);
EnqueueServerMain(ServerMainEventType.OnServerReceived, writer, connectionId, channelId, null);
}
protected void OnThreadedServerError(int connectionId, TransportError error, string reason)
{
EnqueueServerMain(ServerMainEventType.OnServerError, reason, connectionId, null, error);
}
protected void OnThreadedServerDisconnected(int connectionId)
{
EnqueueServerMain(ServerMainEventType.OnServerDisconnected, null, connectionId, null, null);
}
protected abstract void ThreadedClientConnect(string address);
protected abstract void ThreadedClientConnect(Uri address);
protected abstract void ThreadedClientSend(ArraySegment<byte> message, int channelId);
protected abstract void ThreadedClientDisconnect();
protected abstract void ThreadedServerStart();
protected abstract void ThreadedServerStop();
protected abstract void ThreadedServerSend(int connectionId, ArraySegment<byte> message, int channelId);
protected abstract void ThreadedServerDisconnect(int connectionId);
// threaded update functions.
// make sure not to call main thread OnReceived etc. events.
// queue everything.
protected abstract void ThreadedClientEarlyUpdate();
protected abstract void ThreadedClientLateUpdate();
protected abstract void ThreadedServerEarlyUpdate();
protected abstract void ThreadedServerLateUpdate();
protected abstract void ThreadedShutdown();
// client //////////////////////////////////////////////////////////////
// implementations need to use ThreadedEarlyUpdate
public override void ClientEarlyUpdate()
{
// regular transports process OnReceive etc. from early update.
// need to process the worker thread's queued events here too.
//
// process only up to N messages per tick here.
// if main thread becomes too slow, we don't want to deadlock.
int processed = 0;
while (clientMainQueue.TryDequeue(out ClientMainEvent elem))
{
switch (elem.type)
{
// CLIENT EVENTS ///////////////////////////////////////////
case ClientMainEventType.OnClientConnected:
{
// call original transport event
OnClientConnected?.Invoke();
break;
}
case ClientMainEventType.OnClientSent:
{
// call original transport event
ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
OnClientDataSent?.Invoke(writer, elem.channelId.Value);
// recycle writer to thread safe pool for reuse
ConcurrentNetworkWriterPool.Return(writer);
break;
}
case ClientMainEventType.OnClientReceived:
{
// immediately stop processing Data messages after ClientDisconnect() was called.
// ClientDisconnect() sets clientConnected=false, so we can simply check that here.
// fixes: https://github.com/MirrorNetworking/Mirror/issues/3787
if (clientConnected)
{
// call original transport event
ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
OnClientDataReceived?.Invoke(writer, elem.channelId.Value);
// recycle writer to thread safe pool for reuse
ConcurrentNetworkWriterPool.Return(writer);
}
break;
}
case ClientMainEventType.OnClientError:
{
// call original transport event
OnClientError?.Invoke(elem.error.Value, (string)elem.param);
break;
}
case ClientMainEventType.OnClientDisconnected:
{
// call original transport event
OnClientDisconnected?.Invoke();
break;
}
}
// process only up to N messages per tick here.
// if main thread becomes too slow, we don't want to deadlock.
if (++processed >= MaxProcessingPerTick)
{
Debug.LogWarning($"ThreadedTransport processed the limit of {MaxProcessingPerTick} messages this tick. Continuing next tick to prevent deadlock.");
break;
}
}
}
// manual state flag because implementations can't access their
// threaded .server/.client state from main thread.
public override bool ClientConnected() => clientConnected;
public override void ClientConnect(string address)
{
// don't connect the thread twice
if (ClientConnected())
{
Debug.LogWarning($"Threaded transport: client already connected!");
return;
}
// start worker thread if not started yet
EnsureThread();
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoClientConnect, address, null, null);
// manual state flag because implementations can't access their
// threaded .server/.client state from main thread.
clientConnected = true;
}
public override void ClientConnect(Uri uri)
{
// don't connect the thread twice
if (ClientConnected())
{
Debug.LogWarning($"Threaded transport: client already connected!");
return;
}
// start worker thread if not started yet
EnsureThread();
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoClientConnect, uri, null, null);
// manual state flag because implementations can't access their
// threaded .server/.client state from main thread.
clientConnected = true;
}
public override void ClientSend(ArraySegment<byte> segment, int channelId = Channels.Reliable)
{
if (!ClientConnected()) return;
// segment is only valid until returning.
// copy it to a writer.
// make sure to recycle it from worker thread.
ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoClientSend, writer, channelId, null);
}
public override void ClientDisconnect()
{
EnqueueThread(ThreadEventType.DoClientDisconnect, null, null, null);
// manual state flag because implementations can't access their
// threaded .server/.client state from main thread.
clientConnected = false;
}
// server //////////////////////////////////////////////////////////////
// implementations need to use ThreadedEarlyUpdate
public override void ServerEarlyUpdate()
{
// regular transports process OnReceive etc. from early update.
// need to process the worker thread's queued events here too.
//
// process only up to N messages per tick here.
// if main thread becomes too slow, we don't want to deadlock.
int processed = 0;
while (serverMainQueue.TryDequeue(out ServerMainEvent elem))
{
switch (elem.type)
{
// SERVER EVENTS ///////////////////////////////////////////
case ServerMainEventType.OnServerConnected:
{
// call original transport event with connectionId, address
string address = (string)elem.param;
OnServerConnectedWithAddress?.Invoke(elem.connectionId.Value, address);
break;
}
case ServerMainEventType.OnServerSent:
{
// call original transport event
ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
OnServerDataSent?.Invoke(elem.connectionId.Value, writer, elem.channelId.Value);
// recycle writer to thread safe pool for reuse
ConcurrentNetworkWriterPool.Return(writer);
break;
}
case ServerMainEventType.OnServerReceived:
{
// call original transport event
ConcurrentNetworkWriterPooled writer = (ConcurrentNetworkWriterPooled)elem.param;
OnServerDataReceived?.Invoke(elem.connectionId.Value, writer, elem.channelId.Value);
// recycle writer to thread safe pool for reuse
ConcurrentNetworkWriterPool.Return(writer);
break;
}
case ServerMainEventType.OnServerError:
{
// call original transport event
OnServerError?.Invoke(elem.connectionId.Value, elem.error.Value, (string)elem.param);
break;
}
case ServerMainEventType.OnServerDisconnected:
{
// call original transport event
OnServerDisconnected?.Invoke(elem.connectionId.Value);
break;
}
}
// process only up to N messages per tick here.
// if main thread becomes too slow, we don't want to deadlock.
if (++processed >= MaxProcessingPerTick)
{
Debug.LogWarning($"ThreadedTransport processed the limit of {MaxProcessingPerTick} messages this tick. Continuing next tick to prevent deadlock.");
break;
}
}
}
// implementations need to use ThreadedLateUpdate
public override void ServerLateUpdate() {}
// manual state flag because implementations can't access their
// threaded .server/.client state from main thread.
public override bool ServerActive() => serverActive;
public override void ServerStart()
{
// don't start the thread twice
if (ServerActive())
{
Debug.LogWarning($"Threaded transport: server already started!");
return;
}
// start worker thread if not started yet
EnsureThread();
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoServerStart, null, null, null);
// manual state flag because implementations can't access their
// threaded .server/.client state from main thread.
serverActive = true;
}
public override void ServerSend(int connectionId, ArraySegment<byte> segment, int channelId = Channels.Reliable)
{
if (!ServerActive()) return;
// segment is only valid until returning.
// copy it to a writer.
// make sure to recycle it from worker thread.
ConcurrentNetworkWriterPooled writer = ConcurrentNetworkWriterPool.Get();
writer.WriteBytes(segment.Array, segment.Offset, segment.Count);
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoServerSend, writer, channelId, connectionId);
}
public override void ServerDisconnect(int connectionId)
{
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoServerDisconnect, null, null, connectionId);
}
// TODO pass address in OnConnected.
// querying this at runtime won't work for threaded transports.
public override string ServerGetClientAddress(int connectionId)
{
throw new NotImplementedException("ThreadedTransport passes each connection's address in OnServerConnectedThreaded. Don't use ServerGetClientAddress.");
}
public override void ServerStop()
{
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoServerStop, null, null, null);
// manual state flag because implementations can't access their
// threaded .server/.client state from main thread.
serverActive = false;
}
// sleep ///////////////////////////////////////////////////////////////
// when a device goes to sleep, we must end the worker thread after a while.
// otherwise putting down the device would slowly drain the battery after a day or more.
void OnApplicationPause(bool pauseStatus)
{
Debug.Log($"{GetType()}: OnApplicationPause={pauseStatus}");
// is sleep detection feature enabled?
if (!sleepDetection) return;
// pause thread if application pauses
if (pauseStatus)
{
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.Sleep, null, null, null);
}
// resume thread if application resumes
else
{
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.Wake, null, null, null);
}
}
// shutdown ////////////////////////////////////////////////////////////
public override void Shutdown()
{
// enqueue to process in worker thread
EnqueueThread(ThreadEventType.DoShutdown, null, null, null);
// need to wait a little for worker thread to process the enqueued
// Shutdown event and do proper cleanup.
//
// otherwise if a server with a connected client is stopped,
// and then started, a warning would be shown when starting again
// about an old connection not being found because it wasn't cleared
// in KCP
// TODO cleaner
Thread.Sleep(100);
// stop thread fully, with timeout
// ?.: 'thread' might be null after script reload -> stop play
thread?.StopBlocking(1);
// clear queues so we don't process old messages
// when listening again later
clientMainQueue.Clear();
serverMainQueue.Clear();
threadQueue.Clear();
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: af1f2befa05c14b2ea463791ae56081e
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {fileID: 2800000, guid: 7453abfe9e8b2c04a8a47eb536fe21eb, type: 3}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Threaded/ThreadedTransport.cs
uploadId: 736421