This commit is contained in:
2025-06-16 15:14:23 +02:00
committed by devbeni
parent 60fe4620ff
commit 4ff561284f
3174 changed files with 428263 additions and 0 deletions

View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: a1233408bc4b145fb8f6f5a8e95790e0
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,361 @@
using System;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
// ClientState OBJECT that can be handed to the ReceiveThread safely.
// => allows us to create a NEW OBJECT every time we connect and start a
// receive thread.
// => perfectly protects us against data races. fixes all the flaky tests
// where .Connecting or .client would still be used by a dieing thread
// while attempting to use it for a new connection attempt etc.
// => creating a fresh client state each time is the best solution against
// data races here!
class ClientConnectionState : ConnectionState
{
public Thread receiveThread;
// TcpClient.Connected doesn't check if socket != null, which
// results in NullReferenceExceptions if connection was closed.
// -> let's check it manually instead
public bool Connected => client != null &&
client.Client != null &&
client.Client.Connected;
// TcpClient has no 'connecting' state to check. We need to keep track
// of it manually.
// -> checking 'thread.IsAlive && !Connected' is not enough because the
// thread is alive and connected is false for a short moment after
// disconnecting, so this would cause race conditions.
// -> we use a threadsafe bool wrapper so that ThreadFunction can remain
// static (it needs a common lock)
// => Connecting is true from first Connect() call in here, through the
// thread start, until TcpClient.Connect() returns. Simple and clear.
// => bools are atomic according to
// https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/language-specification/variables
// made volatile so the compiler does not reorder access to it
public volatile bool Connecting;
// thread safe pipe for received messages
// => inside client connection state so that we can create a new state
// each time we connect
// (unlike server which has one receive pipe for all connections)
public readonly MagnificentReceivePipe receivePipe;
// constructor always creates new TcpClient for client connection!
public ClientConnectionState(int MaxMessageSize) : base(new TcpClient(), MaxMessageSize)
{
// create receive pipe with max message size for pooling
receivePipe = new MagnificentReceivePipe(MaxMessageSize);
}
// dispose all the state safely
public void Dispose()
{
// close client
client.Close();
// wait until thread finished. this is the only way to guarantee
// that we can call Connect() again immediately after Disconnect
// -> calling .Join would sometimes wait forever, e.g. when
// calling Disconnect while trying to connect to a dead end
receiveThread?.Interrupt();
// we interrupted the receive Thread, so we can't guarantee that
// connecting was reset. let's do it manually.
Connecting = false;
// clear send pipe. no need to hold on to elements.
// (unlike receiveQueue, which is still needed to process the
// latest Disconnected message, etc.)
sendPipe.Clear();
// IMPORTANT: DO NOT CLEAR RECEIVE PIPE.
// we still want to process disconnect messages in Tick()!
// let go of this client completely. the thread ended, no one uses
// it anymore and this way Connected is false again immediately.
client = null;
}
}
public class Client : Common
{
// events to hook into
// => OnData uses ArraySegment for allocation free receives later
public Action OnConnected;
public Action<ArraySegment<byte>> OnData;
public Action OnDisconnected;
// disconnect if send queue gets too big.
// -> avoids ever growing queue memory if network is slower than input
// -> disconnecting is great for load balancing. better to disconnect
// one connection than risking every connection / the whole server
// -> huge queue would introduce multiple seconds of latency anyway
//
// Mirror/DOTSNET use MaxMessageSize batching, so for a 16kb max size:
// limit = 1,000 means 16 MB of memory/connection
// limit = 10,000 means 160 MB of memory/connection
public int SendQueueLimit = 10000;
public int ReceiveQueueLimit = 10000;
// all client state wrapped into an object that is passed to ReceiveThread
// => we create a new one each time we connect to avoid data races with
// old dieing threads still using the previous object!
ClientConnectionState state;
// Connected & Connecting
public bool Connected => state != null && state.Connected;
public bool Connecting => state != null && state.Connecting;
// pipe count, useful for debugging / benchmarks
public int ReceivePipeCount => state != null ? state.receivePipe.TotalCount : 0;
// constructor
public Client(int MaxMessageSize) : base(MaxMessageSize) {}
// the thread function
// STATIC to avoid sharing state!
// => pass ClientState object. a new one is created for each new thread!
// => avoids data races where an old dieing thread might still modify
// the current thread's state :/
static void ReceiveThreadFunction(ClientConnectionState state, string ip, int port, int MaxMessageSize, bool NoDelay, int SendTimeout, int ReceiveTimeout, int ReceiveQueueLimit)
{
Thread sendThread = null;
// absolutely must wrap with try/catch, otherwise thread
// exceptions are silent
try
{
// connect (blocking)
state.client.Connect(ip, port);
state.Connecting = false; // volatile!
// set socket options after the socket was created in Connect()
// (not after the constructor because we clear the socket there)
state.client.NoDelay = NoDelay;
state.client.SendTimeout = SendTimeout;
state.client.ReceiveTimeout = ReceiveTimeout;
// start send thread only after connected
// IMPORTANT: DO NOT SHARE STATE ACROSS MULTIPLE THREADS!
sendThread = new Thread(() => { ThreadFunctions.SendLoop(0, state.client, state.sendPipe, state.sendPending); });
sendThread.IsBackground = true;
sendThread.Start();
// run the receive loop
// (receive pipe is shared across all loops)
ThreadFunctions.ReceiveLoop(0, state.client, MaxMessageSize, state.receivePipe, ReceiveQueueLimit);
}
catch (SocketException exception)
{
// this happens if (for example) the ip address is correct
// but there is no server running on that ip/port
Log.Info("Client Recv: failed to connect to ip=" + ip + " port=" + port + " reason=" + exception);
}
catch (ThreadInterruptedException)
{
// expected if Disconnect() aborts it
}
catch (ThreadAbortException)
{
// expected if Disconnect() aborts it
}
catch (ObjectDisposedException)
{
// expected if Disconnect() aborts it and disposed the client
// while ReceiveThread is in a blocking Connect() call
}
catch (Exception exception)
{
// something went wrong. probably important.
Log.Error("Client Recv Exception: " + exception);
}
// add 'Disconnected' event to receive pipe so that the caller
// knows that the Connect failed. otherwise they will never know
state.receivePipe.Enqueue(0, EventType.Disconnected, default);
// sendthread might be waiting on ManualResetEvent,
// so let's make sure to end it if the connection
// closed.
// otherwise the send thread would only end if it's
// actually sending data while the connection is
// closed.
sendThread?.Interrupt();
// Connect might have failed. thread might have been closed.
// let's reset connecting state no matter what.
state.Connecting = false;
// if we got here then we are done. ReceiveLoop cleans up already,
// but we may never get there if connect fails. so let's clean up
// here too.
state.client?.Close();
}
public void Connect(string ip, int port)
{
// not if already started
if (Connecting || Connected)
{
Log.Warning("Telepathy Client can not create connection because an existing connection is connecting or connected");
return;
}
// overwrite old thread's state object. create a new one to avoid
// data races where an old dieing thread might still modify the
// current state! fixes all the flaky tests!
state = new ClientConnectionState(MaxMessageSize);
// We are connecting from now until Connect succeeds or fails
state.Connecting = true;
// create a TcpClient with perfect IPv4, IPv6 and hostname resolving
// support.
//
// * TcpClient(hostname, port): works but would connect (and block)
// already
// * TcpClient(AddressFamily.InterNetworkV6): takes Ipv4 and IPv6
// addresses but only connects to IPv6 servers (e.g. Telepathy).
// does NOT connect to IPv4 servers (e.g. Mirror Booster), even
// with DualMode enabled.
// * TcpClient(): creates IPv4 socket internally, which would force
// Connect() to only use IPv4 sockets.
//
// => the trick is to clear the internal IPv4 socket so that Connect
// resolves the hostname and creates either an IPv4 or an IPv6
// socket as needed (see TcpClient source)
state.client.Client = null; // clear internal IPv4 socket until Connect()
// client.Connect(ip, port) is blocking. let's call it in the thread
// and return immediately.
// -> this way the application doesn't hang for 30s if connect takes
// too long, which is especially good in games
// -> this way we don't async client.BeginConnect, which seems to
// fail sometimes if we connect too many clients too fast
state.receiveThread = new Thread(() => {
ReceiveThreadFunction(state, ip, port, MaxMessageSize, NoDelay, SendTimeout, ReceiveTimeout, ReceiveQueueLimit);
});
state.receiveThread.IsBackground = true;
state.receiveThread.Start();
}
public void Disconnect()
{
// only if started
if (Connecting || Connected)
{
// dispose all the state safely
state.Dispose();
// IMPORTANT: DO NOT set state = null!
// we still want to process the pipe's disconnect message etc.!
}
}
// send message to server using socket connection.
// arraysegment for allocation free sends later.
// -> the segment's array is only used until Send() returns!
public bool Send(ArraySegment<byte> message)
{
if (Connected)
{
// respect max message size to avoid allocation attacks.
if (message.Count <= MaxMessageSize)
{
// check send pipe limit
if (state.sendPipe.Count < SendQueueLimit)
{
// add to thread safe send pipe and return immediately.
// calling Send here would be blocking (sometimes for long
// times if other side lags or wire was disconnected)
state.sendPipe.Enqueue(message);
state.sendPending.Set(); // interrupt SendThread WaitOne()
return true;
}
// disconnect if send queue gets too big.
// -> avoids ever growing queue memory if network is slower
// than input
// -> avoids ever growing latency as well
//
// note: while SendThread always grabs the WHOLE send queue
// immediately, it's still possible that the sending
// blocks for so long that the send queue just gets
// way too big. have a limit - better safe than sorry.
else
{
// log the reason
Log.Warning($"Client.Send: sendPipe reached limit of {SendQueueLimit}. This can happen if we call send faster than the network can process messages. Disconnecting to avoid ever growing memory & latency.");
// just close it. send thread will take care of the rest.
state.client.Close();
return false;
}
}
Log.Error("Client.Send: message too big: " + message.Count + ". Limit: " + MaxMessageSize);
return false;
}
Log.Warning("Client.Send: not connected!");
return false;
}
// tick: processes up to 'limit' messages
// => limit parameter to avoid deadlocks / too long freezes if server or
// client is too slow to process network load
// => Mirror & DOTSNET need to have a process limit anyway.
// might as well do it here and make life easier.
// => returns amount of remaining messages to process, so the caller
// can call tick again as many times as needed (or up to a limit)
//
// Tick() may process multiple messages, but Mirror needs a way to stop
// processing immediately if a scene change messages arrives. Mirror
// can't process any other messages during a scene change.
// (could be useful for others too)
// => make sure to allocate the lambda only once in transports
public int Tick(int processLimit, Func<bool> checkEnabled = null)
{
// only if state was created yet (after connect())
// note: we don't check 'only if connected' because we want to still
// process Disconnect messages afterwards too!
if (state == null)
return 0;
// process up to 'processLimit' messages
for (int i = 0; i < processLimit; ++i)
{
// check enabled in case a Mirror scene message arrived
if (checkEnabled != null && !checkEnabled())
break;
// peek first. allows us to process the first queued entry while
// still keeping the pooled byte[] alive by not removing anything.
if (state.receivePipe.TryPeek(out int _, out EventType eventType, out ArraySegment<byte> message))
{
switch (eventType)
{
case EventType.Connected:
OnConnected?.Invoke();
break;
case EventType.Data:
OnData?.Invoke(message);
break;
case EventType.Disconnected:
OnDisconnected?.Invoke();
break;
}
// IMPORTANT: now dequeue and return it to pool AFTER we are
// done processing the event.
state.receivePipe.TryDequeue();
}
// no more messages. stop the loop.
else break;
}
// return what's left to process for next time
return state.receivePipe.TotalCount;
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: a5b95294cc4ec4b15aacba57531c7985
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/Client.cs
uploadId: 736421

View File

@ -0,0 +1,39 @@
// common code used by server and client
namespace Telepathy
{
public abstract class Common
{
// IMPORTANT: DO NOT SHARE STATE ACROSS SEND/RECV LOOPS (DATA RACES)
// (except receive pipe which is used for all threads)
// NoDelay disables nagle algorithm. lowers CPU% and latency but
// increases bandwidth
public bool NoDelay = true;
// Prevent allocation attacks. Each packet is prefixed with a length
// header, so an attacker could send a fake packet with length=2GB,
// causing the server to allocate 2GB and run out of memory quickly.
// -> simply increase max packet size if you want to send around bigger
// files!
// -> 16KB per message should be more than enough.
public readonly int MaxMessageSize;
// Send would stall forever if the network is cut off during a send, so
// we need a timeout (in milliseconds)
public int SendTimeout = 5000;
// Default TCP receive time out can be huge (minutes).
// That's way too much for games, let's make it configurable.
// we need a timeout (in milliseconds)
// => '0' means disabled
// => disabled by default because some people might use Telepathy
// without Mirror and without sending pings, so timeouts are likely
public int ReceiveTimeout = 0;
// constructor
protected Common(int MaxMessageSize)
{
this.MaxMessageSize = MaxMessageSize;
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: c4d56322cf0e248a89103c002a505dab
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/Common.cs
uploadId: 736421

View File

@ -0,0 +1,35 @@
// both server and client need a connection state object.
// -> server needs it to keep track of multiple connections
// -> client needs it to safely create a new connection state on every new
// connect in order to avoid data races where a dieing thread might still
// modify the current state. can't happen if we create a new state each time!
// (fixes all the flaky tests)
//
// ... besides, it also allows us to share code!
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public class ConnectionState
{
public TcpClient client;
// thread safe pipe to send messages from main thread to send thread
public readonly MagnificentSendPipe sendPipe;
// ManualResetEvent to wake up the send thread. better than Thread.Sleep
// -> call Set() if everything was sent
// -> call Reset() if there is something to send again
// -> call WaitOne() to block until Reset was called
public ManualResetEvent sendPending = new ManualResetEvent(false);
public ConnectionState(TcpClient client, int MaxMessageSize)
{
this.client = client;
// create send pipe with max message size for pooling
sendPipe = new MagnificentSendPipe(MaxMessageSize);
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: af95e2b6f6343411aa8bdf871abd7b1b
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/ConnectionState.cs
uploadId: 736421

View File

@ -0,0 +1,9 @@
namespace Telepathy
{
public enum EventType
{
Connected,
Data,
Disconnected
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: 49f1a330755814803be5f27f493e1910
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/EventType.cs
uploadId: 736421

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2018, vis2k
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -0,0 +1,14 @@
fileFormatVersion: 2
guid: 0ba11103b95fd4721bffbb08440d5b8e
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/LICENSE
uploadId: 736421

View File

@ -0,0 +1,15 @@
// A simple logger class that uses Console.WriteLine by default.
// Can also do Logger.LogMethod = Debug.Log for Unity etc.
// (this way we don't have to depend on UnityEngine.DLL and don't need a
// different version for every UnityEngine version here)
using System;
namespace Telepathy
{
public static class Log
{
public static Action<string> Info = Console.WriteLine;
public static Action<string> Warning = Console.WriteLine;
public static Action<string> Error = Console.Error.WriteLine;
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: 0a123d054bef34d059057ac2ce936605
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/Log.cs
uploadId: 736421

View File

@ -0,0 +1,222 @@
// a magnificent receive pipe to shield us from all of life's complexities.
// safely sends messages from receive thread to main thread.
// -> thread safety built in
// -> byte[] pooling coming in the future
//
// => hides all the complexity from telepathy
// => easy to switch between stack/queue/concurrentqueue/etc.
// => easy to test
using System;
using System.Collections.Generic;
namespace Telepathy
{
public class MagnificentReceivePipe
{
// queue entry message. only used in here.
// -> byte arrays are always of 4 + MaxMessageSize
// -> ArraySegment indicates the actual message content
struct Entry
{
public int connectionId;
public EventType eventType;
public ArraySegment<byte> data;
public Entry(int connectionId, EventType eventType, ArraySegment<byte> data)
{
this.connectionId = connectionId;
this.eventType = eventType;
this.data = data;
}
}
// message queue
// ConcurrentQueue allocates. lock{} instead.
//
// IMPORTANT: lock{} all usages!
readonly Queue<Entry> queue = new Queue<Entry>();
// byte[] pool to avoid allocations
// Take & Return is beautifully encapsulated in the pipe.
// the outside does not need to worry about anything.
// and it can be tested easily.
//
// IMPORTANT: lock{} all usages!
Pool<byte[]> pool;
// unfortunately having one receive pipe per connetionId is way slower
// in CCU tests. right now we have one pipe for all connections.
// => we still need to limit queued messages per connection to avoid one
// spamming connection being able to slow down everyone else since
// the queue would be full of just this connection's messages forever
// => let's use a simpler per-connectionId counter for now
Dictionary<int, int> queueCounter = new Dictionary<int, int>();
// constructor
public MagnificentReceivePipe(int MaxMessageSize)
{
// initialize pool to create max message sized byte[]s each time
pool = new Pool<byte[]>(() => new byte[MaxMessageSize]);
}
// return amount of queued messages for this connectionId.
// for statistics. don't call Count and assume that it's the same after
// the call.
public int Count(int connectionId)
{
lock (this)
{
return queueCounter.TryGetValue(connectionId, out int count)
? count
: 0;
}
}
// total count
public int TotalCount
{
get { lock (this) { return queue.Count; } }
}
// pool count for testing
public int PoolCount
{
get { lock (this) { return pool.Count(); } }
}
// enqueue a message
// -> ArraySegment to avoid allocations later
// -> parameters passed directly so it's more obvious that we don't just
// queue a passed 'Message', instead we copy the ArraySegment into
// a byte[] and store it internally, etc.)
public void Enqueue(int connectionId, EventType eventType, ArraySegment<byte> message)
{
// pool & queue usage always needs to be locked
lock (this)
{
// does this message have a data array content?
ArraySegment<byte> segment = default;
if (message != default)
{
// ArraySegment is only valid until returning.
// copy it into a byte[] that we can store.
// ArraySegment array is only valid until returning, so copy
// it into a byte[] that we can queue safely.
// get one from the pool first to avoid allocations
byte[] bytes = pool.Take();
// copy into it
Buffer.BlockCopy(message.Array, message.Offset, bytes, 0, message.Count);
// indicate which part is the message
segment = new ArraySegment<byte>(bytes, 0, message.Count);
}
// enqueue it
// IMPORTANT: pass the segment around pool byte[],
// NOT the 'message' that is only valid until returning!
Entry entry = new Entry(connectionId, eventType, segment);
queue.Enqueue(entry);
// increase counter for this connectionId
int oldCount = Count(connectionId);
queueCounter[connectionId] = oldCount + 1;
}
}
// peek the next message
// -> allows the caller to process it while pipe still holds on to the
// byte[]
// -> TryDequeue should be called after processing, so that the message
// is actually dequeued and the byte[] is returned to pool!
// => see TryDequeue comments!
//
// IMPORTANT: TryPeek & Dequeue need to be called from the SAME THREAD!
public bool TryPeek(out int connectionId, out EventType eventType, out ArraySegment<byte> data)
{
connectionId = 0;
eventType = EventType.Disconnected;
data = default;
// pool & queue usage always needs to be locked
lock (this)
{
if (queue.Count > 0)
{
Entry entry = queue.Peek();
connectionId = entry.connectionId;
eventType = entry.eventType;
data = entry.data;
return true;
}
return false;
}
}
// dequeue the next message
// -> simply dequeues and returns the byte[] to pool (if any)
// -> use Peek to actually process the first element while the pipe
// still holds on to the byte[]
// -> doesn't return the element because the byte[] needs to be returned
// to the pool in dequeue. caller can't be allowed to work with a
// byte[] that is already returned to pool.
// => Peek & Dequeue is the most simple, clean solution for receive
// pipe pooling to avoid allocations!
//
// IMPORTANT: TryPeek & Dequeue need to be called from the SAME THREAD!
public bool TryDequeue()
{
// pool & queue usage always needs to be locked
lock (this)
{
if (queue.Count > 0)
{
// dequeue from queue
Entry entry = queue.Dequeue();
// return byte[] to pool (if any).
// not all message types have byte[] contents.
if (entry.data != default)
{
pool.Return(entry.data.Array);
}
// decrease counter for this connectionId
queueCounter[entry.connectionId]--;
// remove if zero. don't want to keep old connectionIds in
// there forever, it would cause slowly growing memory.
if (queueCounter[entry.connectionId] == 0)
queueCounter.Remove(entry.connectionId);
return true;
}
return false;
}
}
public void Clear()
{
// pool & queue usage always needs to be locked
lock (this)
{
// clear queue, but via dequeue to return each byte[] to pool
while (queue.Count > 0)
{
// dequeue
Entry entry = queue.Dequeue();
// return byte[] to pool (if any).
// not all message types have byte[] contents.
if (entry.data != default)
{
pool.Return(entry.data.Array);
}
}
// clear counter too
queueCounter.Clear();
}
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: 010a208972a9a4e0cb0e7c18a60b4494
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/MagnificentReceivePipe.cs
uploadId: 736421

View File

@ -0,0 +1,165 @@
// a magnificent send pipe to shield us from all of life's complexities.
// safely sends messages from main thread to send thread.
// -> thread safety built in
// -> byte[] pooling coming in the future
//
// => hides all the complexity from telepathy
// => easy to switch between stack/queue/concurrentqueue/etc.
// => easy to test
using System;
using System.Collections.Generic;
namespace Telepathy
{
public class MagnificentSendPipe
{
// message queue
// ConcurrentQueue allocates. lock{} instead.
// -> byte arrays are always of MaxMessageSize
// -> ArraySegment indicates the actual message content
//
// IMPORTANT: lock{} all usages!
readonly Queue<ArraySegment<byte>> queue = new Queue<ArraySegment<byte>>();
// byte[] pool to avoid allocations
// Take & Return is beautifully encapsulated in the pipe.
// the outside does not need to worry about anything.
// and it can be tested easily.
//
// IMPORTANT: lock{} all usages!
Pool<byte[]> pool;
// constructor
public MagnificentSendPipe(int MaxMessageSize)
{
// initialize pool to create max message sized byte[]s each time
pool = new Pool<byte[]>(() => new byte[MaxMessageSize]);
}
// for statistics. don't call Count and assume that it's the same after
// the call.
public int Count
{
get { lock (this) { return queue.Count; } }
}
// pool count for testing
public int PoolCount
{
get { lock (this) { return pool.Count(); } }
}
// enqueue a message
// arraysegment for allocation free sends later.
// -> the segment's array is only used until Enqueue() returns!
public void Enqueue(ArraySegment<byte> message)
{
// pool & queue usage always needs to be locked
lock (this)
{
// ArraySegment array is only valid until returning, so copy
// it into a byte[] that we can queue safely.
// get one from the pool first to avoid allocations
byte[] bytes = pool.Take();
// copy into it
Buffer.BlockCopy(message.Array, message.Offset, bytes, 0, message.Count);
// indicate which part is the message
ArraySegment<byte> segment = new ArraySegment<byte>(bytes, 0, message.Count);
// now enqueue it
queue.Enqueue(segment);
}
}
// send threads need to dequeue each byte[] and write it into the socket
// -> dequeueing one byte[] after another works, but it's WAY slower
// than dequeueing all immediately (locks only once)
// lock{} & DequeueAll is WAY faster than ConcurrentQueue & dequeue
// one after another:
//
// uMMORPG 450 CCU
// SafeQueue: 900-1440ms latency
// ConcurrentQueue: 2000ms latency
//
// -> the most obvious solution is to just return a list with all byte[]
// (which allocates) and then write each one into the socket
// -> a faster solution is to serialize each one into one payload buffer
// and pass that to the socket only once. fewer socket calls always
// give WAY better CPU performance(!)
// -> to avoid allocating a new list of entries each time, we simply
// serialize all entries into the payload here already
// => having all this complexity built into the pipe makes testing and
// modifying the algorithm super easy!
//
// IMPORTANT: serializing in here will allow us to return the byte[]
// entries back to a pool later to completely avoid
// allocations!
public bool DequeueAndSerializeAll(ref byte[] payload, out int packetSize)
{
// pool & queue usage always needs to be locked
lock (this)
{
// do nothing if empty
packetSize = 0;
if (queue.Count == 0)
return false;
// we might have multiple pending messages. merge into one
// packet to avoid TCP overheads and improve performance.
//
// IMPORTANT: Mirror & DOTSNET already batch into MaxMessageSize
// chunks, but we STILL pack all pending messages
// into one large payload so we only give it to TCP
// ONCE. This is HUGE for performance so we keep it!
packetSize = 0;
foreach (ArraySegment<byte> message in queue)
packetSize += 4 + message.Count; // header + content
// create payload buffer if not created yet or previous one is
// too small
// IMPORTANT: payload.Length might be > packetSize! don't use it!
if (payload == null || payload.Length < packetSize)
payload = new byte[packetSize];
// dequeue all byte[] messages and serialize into the packet
int position = 0;
while (queue.Count > 0)
{
// dequeue
ArraySegment<byte> message = queue.Dequeue();
// write header (size) into buffer at position
Utils.IntToBytesBigEndianNonAlloc(message.Count, payload, position);
position += 4;
// copy message into payload at position
Buffer.BlockCopy(message.Array, message.Offset, payload, position, message.Count);
position += message.Count;
// return to pool so it can be reused (avoids allocations!)
pool.Return(message.Array);
}
// we did serialize something
return true;
}
}
public void Clear()
{
// pool & queue usage always needs to be locked
lock (this)
{
// clear queue, but via dequeue to return each byte[] to pool
while (queue.Count > 0)
{
pool.Return(queue.Dequeue().Array);
}
}
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: d490021c2e6a64374bc88168cec75c70
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/MagnificentSendPipe.cs
uploadId: 736421

View File

@ -0,0 +1,67 @@
using System;
using System.IO;
using System.Net.Sockets;
namespace Telepathy
{
public static class NetworkStreamExtensions
{
// .Read returns '0' if remote closed the connection but throws an
// IOException if we voluntarily closed our own connection.
//
// let's add a ReadSafely method that returns '0' in both cases so we don't
// have to worry about exceptions, since a disconnect is a disconnect...
public static int ReadSafely(this NetworkStream stream, byte[] buffer, int offset, int size)
{
try
{
return stream.Read(buffer, offset, size);
}
// IOException happens if we voluntarily closed our own connection.
catch (IOException)
{
return 0;
}
// ObjectDisposedException can be thrown if Client.Disconnect()
// disposes the stream, while we are still trying to read here.
// catching it fixes https://github.com/vis2k/Telepathy/pull/104
catch (ObjectDisposedException)
{
return 0;
}
}
// helper function to read EXACTLY 'n' bytes
// -> default .Read reads up to 'n' bytes. this function reads exactly
// 'n' bytes
// -> this is blocking until 'n' bytes were received
// -> immediately returns false in case of disconnects
public static bool ReadExactly(this NetworkStream stream, byte[] buffer, int amount)
{
// there might not be enough bytes in the TCP buffer for .Read to read
// the whole amount at once, so we need to keep trying until we have all
// the bytes (blocking)
//
// note: this just is a faster version of reading one after another:
// for (int i = 0; i < amount; ++i)
// if (stream.Read(buffer, i, 1) == 0)
// return false;
// return true;
int bytesRead = 0;
while (bytesRead < amount)
{
// read up to 'remaining' bytes with the 'safe' read extension
int remaining = amount - bytesRead;
int result = stream.ReadSafely(buffer, bytesRead, remaining);
// .Read returns 0 if disconnected
if (result == 0)
return false;
// otherwise add to bytes read
bytesRead += result;
}
return true;
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: 7a8076c43fa8d4d45831adae232d4d3c
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/NetworkStreamExtensions.cs
uploadId: 736421

View File

@ -0,0 +1,34 @@
// pool to avoid allocations. originally from libuv2k.
using System;
using System.Collections.Generic;
namespace Telepathy
{
public class Pool<T>
{
// objects
readonly Stack<T> objects = new Stack<T>();
// some types might need additional parameters in their constructor, so
// we use a Func<T> generator
readonly Func<T> objectGenerator;
// constructor
public Pool(Func<T> objectGenerator)
{
this.objectGenerator = objectGenerator;
}
// take an element from the pool, or create a new one if empty
public T Take() => objects.Count > 0 ? objects.Pop() : objectGenerator();
// return an element to the pool
public void Return(T item) => objects.Push(item);
// clear the pool with the disposer function applied to each object
public void Clear() => objects.Clear();
// count to see how many objects are in the pool. useful for tests.
public int Count() => objects.Count;
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: 6d3e530f6872642ec81e9b8b76277c93
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/Pool.cs
uploadId: 736421

View File

@ -0,0 +1,424 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public class Server : Common
{
// events to hook into
// => OnData uses ArraySegment for allocation free receives later
public Action<int, string> OnConnected;
public Action<int, ArraySegment<byte>> OnData;
public Action<int> OnDisconnected;
// listener
public TcpListener listener;
Thread listenerThread;
// disconnect if send queue gets too big.
// -> avoids ever growing queue memory if network is slower than input
// -> disconnecting is great for load balancing. better to disconnect
// one connection than risking every connection / the whole server
// -> huge queue would introduce multiple seconds of latency anyway
//
// Mirror/DOTSNET use MaxMessageSize batching, so for a 16kb max size:
// limit = 1,000 means 16 MB of memory/connection
// limit = 10,000 means 160 MB of memory/connection
public int SendQueueLimit = 10000;
public int ReceiveQueueLimit = 10000;
// thread safe pipe for received messages
// IMPORTANT: unfortunately using one pipe per connection is way slower
// when testing 150 CCU. we need to use one pipe for all
// connections. this scales beautifully.
protected MagnificentReceivePipe receivePipe;
// pipe count, useful for debugging / benchmarks
public int ReceivePipeTotalCount => receivePipe.TotalCount;
// clients with <connectionId, ConnectionState>
readonly ConcurrentDictionary<int, ConnectionState> clients = new ConcurrentDictionary<int, ConnectionState>();
// connectionId counter
int counter;
// public next id function in case someone needs to reserve an id
// (e.g. if hostMode should always have 0 connection and external
// connections should start at 1, etc.)
public int NextConnectionId()
{
int id = Interlocked.Increment(ref counter);
// it's very unlikely that we reach the uint limit of 2 billion.
// even with 1 new connection per second, this would take 68 years.
// -> but if it happens, then we should throw an exception because
// the caller probably should stop accepting clients.
// -> it's hardly worth using 'bool Next(out id)' for that case
// because it's just so unlikely.
if (id == int.MaxValue)
{
throw new Exception("connection id limit reached: " + id);
}
return id;
}
// check if the server is running
public bool Active => listenerThread != null && listenerThread.IsAlive;
// constructor
public Server(int MaxMessageSize) : base(MaxMessageSize) {}
// the listener thread's listen function
// note: no maxConnections parameter. high level API should handle that.
// (Transport can't send a 'too full' message anyway)
void Listen(int port)
{
// absolutely must wrap with try/catch, otherwise thread
// exceptions are silent
try
{
// start listener on all IPv4 and IPv6 address via .Create
listener = TcpListener.Create(port);
listener.Server.NoDelay = NoDelay;
// IMPORTANT: do not set send/receive timeouts on listener.
// On linux setting the recv timeout will cause the blocking
// Accept call to timeout with EACCEPT (which mono interprets
// as EWOULDBLOCK).
// https://stackoverflow.com/questions/1917814/eagain-error-for-accept-on-blocking-socket/1918118#1918118
// => fixes https://github.com/vis2k/Mirror/issues/2695
//
//listener.Server.SendTimeout = SendTimeout;
//listener.Server.ReceiveTimeout = ReceiveTimeout;
listener.Start();
Log.Info("Server: listening port=" + port);
// keep accepting new clients
while (true)
{
// wait and accept new client
// note: 'using' sucks here because it will try to
// dispose after thread was started but we still need it
// in the thread
TcpClient client = listener.AcceptTcpClient();
// set socket options
client.NoDelay = NoDelay;
client.SendTimeout = SendTimeout;
client.ReceiveTimeout = ReceiveTimeout;
// generate the next connection id (thread safely)
int connectionId = NextConnectionId();
// add to dict immediately
ConnectionState connection = new ConnectionState(client, MaxMessageSize);
clients[connectionId] = connection;
// spawn a send thread for each client
Thread sendThread = new Thread(() =>
{
// wrap in try-catch, otherwise Thread exceptions
// are silent
try
{
// run the send loop
// IMPORTANT: DO NOT SHARE STATE ACROSS MULTIPLE THREADS!
ThreadFunctions.SendLoop(connectionId, client, connection.sendPipe, connection.sendPending);
}
catch (ThreadAbortException)
{
// happens on stop. don't log anything.
// (we catch it in SendLoop too, but it still gets
// through to here when aborting. don't show an
// error.)
}
catch (Exception exception)
{
Log.Error("Server send thread exception: " + exception);
}
});
sendThread.IsBackground = true;
sendThread.Start();
// spawn a receive thread for each client
Thread receiveThread = new Thread(() =>
{
// wrap in try-catch, otherwise Thread exceptions
// are silent
try
{
// run the receive loop
// (receive pipe is shared across all loops)
ThreadFunctions.ReceiveLoop(connectionId, client, MaxMessageSize, receivePipe, ReceiveQueueLimit);
// IMPORTANT: do NOT remove from clients after the
// thread ends. need to do it in Tick() so that the
// disconnect event in the pipe is still processed.
// (removing client immediately would mean that the
// pipe is lost and the disconnect event is never
// processed)
// sendthread might be waiting on ManualResetEvent,
// so let's make sure to end it if the connection
// closed.
// otherwise the send thread would only end if it's
// actually sending data while the connection is
// closed.
sendThread.Interrupt();
}
catch (Exception exception)
{
Log.Error("Server client thread exception: " + exception);
}
});
receiveThread.IsBackground = true;
receiveThread.Start();
}
}
catch (ThreadAbortException exception)
{
// UnityEditor causes AbortException if thread is still
// running when we press Play again next time. that's okay.
Log.Info("Server thread aborted. That's okay. " + exception);
}
catch (SocketException exception)
{
// calling StopServer will interrupt this thread with a
// 'SocketException: interrupted'. that's okay.
Log.Info("Server Thread stopped. That's okay. " + exception);
}
catch (Exception exception)
{
// something went wrong. probably important.
Log.Error("Server Exception: " + exception);
}
}
// start listening for new connections in a background thread and spawn
// a new thread for each one.
public bool Start(int port)
{
// not if already started
if (Active) return false;
// create receive pipe with max message size for pooling
// => create new pipes every time!
// if an old receive thread is still finishing up, it might still
// be using the old pipes. we don't want to risk any old data for
// our new start here.
receivePipe = new MagnificentReceivePipe(MaxMessageSize);
// start the listener thread
// (on low priority. if main thread is too busy then there is not
// much value in accepting even more clients)
Log.Info("Server: Start port=" + port);
listenerThread = new Thread(() => { Listen(port); });
listenerThread.IsBackground = true;
listenerThread.Priority = ThreadPriority.BelowNormal;
listenerThread.Start();
return true;
}
public void Stop()
{
// only if started
if (!Active) return;
Log.Info("Server: stopping...");
// stop listening to connections so that no one can connect while we
// close the client connections
// (might be null if we call Stop so quickly after Start that the
// thread was interrupted before even creating the listener)
listener?.Stop();
// kill listener thread at all costs. only way to guarantee that
// .Active is immediately false after Stop.
// -> calling .Join would sometimes wait forever
listenerThread?.Interrupt();
listenerThread = null;
// close all client connections
foreach (KeyValuePair<int, ConnectionState> kvp in clients)
{
TcpClient client = kvp.Value.client;
// close the stream if not closed yet. it may have been closed
// by a disconnect already, so use try/catch
try { client.GetStream().Close(); } catch {}
client.Close();
}
// clear clients list
clients.Clear();
// reset the counter in case we start up again so
// clients get connection ID's starting from 1
counter = 0;
}
// send message to client using socket connection.
// arraysegment for allocation free sends later.
// -> the segment's array is only used until Send() returns!
public bool Send(int connectionId, ArraySegment<byte> message)
{
// respect max message size to avoid allocation attacks.
if (message.Count <= MaxMessageSize)
{
// find the connection
if (clients.TryGetValue(connectionId, out ConnectionState connection))
{
// check send pipe limit
if (connection.sendPipe.Count < SendQueueLimit)
{
// add to thread safe send pipe and return immediately.
// calling Send here would be blocking (sometimes for long
// times if other side lags or wire was disconnected)
connection.sendPipe.Enqueue(message);
connection.sendPending.Set(); // interrupt SendThread WaitOne()
return true;
}
// disconnect if send queue gets too big.
// -> avoids ever growing queue memory if network is slower
// than input
// -> disconnecting is great for load balancing. better to
// disconnect one connection than risking every
// connection / the whole server
//
// note: while SendThread always grabs the WHOLE send queue
// immediately, it's still possible that the sending
// blocks for so long that the send queue just gets
// way too big. have a limit - better safe than sorry.
else
{
// log the reason
Log.Warning($"Server.Send: sendPipe for connection {connectionId} reached limit of {SendQueueLimit}. This can happen if we call send faster than the network can process messages. Disconnecting this connection for load balancing.");
// just close it. send thread will take care of the rest.
connection.client.Close();
return false;
}
}
// sending to an invalid connectionId is expected sometimes.
// for example, if a client disconnects, the server might still
// try to send for one frame before it calls GetNextMessages
// again and realizes that a disconnect happened.
// so let's not spam the console with log messages.
//Logger.Log("Server.Send: invalid connectionId: " + connectionId);
return false;
}
Log.Error("Server.Send: message too big: " + message.Count + ". Limit: " + MaxMessageSize);
return false;
}
// client's ip is sometimes needed by the server, e.g. for bans
public string GetClientAddress(int connectionId)
{
try
{
// find the connection
if (clients.TryGetValue(connectionId, out ConnectionState connection))
{
return ((IPEndPoint)connection.client.Client.RemoteEndPoint).Address.ToString();
}
return "";
}
catch (SocketException)
{
// using server.listener.LocalEndpoint causes an Exception
// in UWP + Unity 2019:
// Exception thrown at 0x00007FF9755DA388 in UWF.exe:
// Microsoft C++ exception: Il2CppExceptionWrapper at memory
// location 0x000000E15A0FCDD0. SocketException: An address
// incompatible with the requested protocol was used at
// System.Net.Sockets.Socket.get_LocalEndPoint ()
// so let's at least catch it and recover
return "unknown";
}
catch (ObjectDisposedException)
{
return "Disposed";
}
catch (Exception)
{
return "";
}
}
// disconnect (kick) a client
public bool Disconnect(int connectionId)
{
// find the connection
if (clients.TryGetValue(connectionId, out ConnectionState connection))
{
// just close it. send thread will take care of the rest.
connection.client.Close();
Log.Info("Server.Disconnect connectionId:" + connectionId);
return true;
}
return false;
}
// tick: processes up to 'limit' messages for each connection
// => limit parameter to avoid deadlocks / too long freezes if server or
// client is too slow to process network load
// => Mirror & DOTSNET need to have a process limit anyway.
// might as well do it here and make life easier.
// => returns amount of remaining messages to process, so the caller
// can call tick again as many times as needed (or up to a limit)
//
// Tick() may process multiple messages, but Mirror needs a way to stop
// processing immediately if a scene change messages arrives. Mirror
// can't process any other messages during a scene change.
// (could be useful for others too)
// => make sure to allocate the lambda only once in transports
public int Tick(int processLimit, Func<bool> checkEnabled = null)
{
// only if pipe was created yet (after start())
if (receivePipe == null)
return 0;
// process up to 'processLimit' messages for this connection
for (int i = 0; i < processLimit; ++i)
{
// check enabled in case a Mirror scene message arrived
if (checkEnabled != null && !checkEnabled())
break;
// peek first. allows us to process the first queued entry while
// still keeping the pooled byte[] alive by not removing anything.
if (receivePipe.TryPeek(out int connectionId, out EventType eventType, out ArraySegment<byte> message))
{
switch (eventType)
{
case EventType.Connected:
OnConnected?.Invoke(connectionId, GetClientAddress(connectionId));
break;
case EventType.Data:
OnData?.Invoke(connectionId, message);
break;
case EventType.Disconnected:
OnDisconnected?.Invoke(connectionId);
// remove disconnected connection now that the final
// disconnected message was processed.
clients.TryRemove(connectionId, out ConnectionState _);
break;
}
// IMPORTANT: now dequeue and return it to pool AFTER we are
// done processing the event.
receivePipe.TryDequeue();
}
// no more messages. stop the loop.
else break;
}
// return what's left to process for next time
return receivePipe.TotalCount;
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: fb98a16841ccc4338a7e0b4e59136563
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/Server.cs
uploadId: 736421

View File

@ -0,0 +1,12 @@
{
"name": "Telepathy",
"references": [],
"optionalUnityReferences": [],
"includePlatforms": [],
"excludePlatforms": [],
"allowUnsafeCode": false,
"overrideReferences": false,
"precompiledReferences": [],
"autoReferenced": true,
"defineConstraints": []
}

View File

@ -0,0 +1,14 @@
fileFormatVersion: 2
guid: 725ee7191c021de4dbf9269590ded755
AssemblyDefinitionImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/Telepathy.asmdef
uploadId: 736421

View File

@ -0,0 +1,244 @@
// IMPORTANT
// force all thread functions to be STATIC.
// => Common.Send/ReceiveLoop is EXTREMELY DANGEROUS because it's too easy to
// accidentally share Common state between threads.
// => header buffer, payload etc. were accidentally shared once after changing
// the thread functions from static to non static
// => C# does not automatically detect data races. best we can do is move all of
// our thread code into static functions and pass all state into them
//
// let's even keep them in a STATIC CLASS so it's 100% obvious that this should
// NOT EVER be changed to non static!
using System;
using System.Net.Sockets;
using System.Threading;
namespace Telepathy
{
public static class ThreadFunctions
{
// send message (via stream) with the <size,content> message structure
// this function is blocking sometimes!
// (e.g. if someone has high latency or wire was cut off)
// -> payload is of multiple <<size, content, size, content, ...> parts
public static bool SendMessagesBlocking(NetworkStream stream, byte[] payload, int packetSize)
{
// stream.Write throws exceptions if client sends with high
// frequency and the server stops
try
{
// write the whole thing
stream.Write(payload, 0, packetSize);
return true;
}
catch (Exception exception)
{
// log as regular message because servers do shut down sometimes
Log.Info("Send: stream.Write exception: " + exception);
return false;
}
}
// read message (via stream) blocking.
// writes into byte[] and returns bytes written to avoid allocations.
public static bool ReadMessageBlocking(NetworkStream stream, int MaxMessageSize, byte[] headerBuffer, byte[] payloadBuffer, out int size)
{
size = 0;
// buffer needs to be of Header + MaxMessageSize
if (payloadBuffer.Length != 4 + MaxMessageSize)
{
Log.Error($"ReadMessageBlocking: payloadBuffer needs to be of size 4 + MaxMessageSize = {4 + MaxMessageSize} instead of {payloadBuffer.Length}");
return false;
}
// read exactly 4 bytes for header (blocking)
if (!stream.ReadExactly(headerBuffer, 4))
return false;
// convert to int
size = Utils.BytesToIntBigEndian(headerBuffer);
// protect against allocation attacks. an attacker might send
// multiple fake '2GB header' packets in a row, causing the server
// to allocate multiple 2GB byte arrays and run out of memory.
//
// also protect against size <= 0 which would cause issues
if (size > 0 && size <= MaxMessageSize)
{
// read exactly 'size' bytes for content (blocking)
return stream.ReadExactly(payloadBuffer, size);
}
Log.Warning("ReadMessageBlocking: possible header attack with a header of: " + size + " bytes.");
return false;
}
// thread receive function is the same for client and server's clients
public static void ReceiveLoop(int connectionId, TcpClient client, int MaxMessageSize, MagnificentReceivePipe receivePipe, int QueueLimit)
{
// get NetworkStream from client
NetworkStream stream = client.GetStream();
// every receive loop needs it's own receive buffer of
// HeaderSize + MaxMessageSize
// to avoid runtime allocations.
//
// IMPORTANT: DO NOT make this a member, otherwise every connection
// on the server would use the same buffer simulatenously
byte[] receiveBuffer = new byte[4 + MaxMessageSize];
// avoid header[4] allocations
//
// IMPORTANT: DO NOT make this a member, otherwise every connection
// on the server would use the same buffer simulatenously
byte[] headerBuffer = new byte[4];
// absolutely must wrap with try/catch, otherwise thread exceptions
// are silent
try
{
// add connected event to pipe
receivePipe.Enqueue(connectionId, EventType.Connected, default);
// let's talk about reading data.
// -> normally we would read as much as possible and then
// extract as many <size,content>,<size,content> messages
// as we received this time. this is really complicated
// and expensive to do though
// -> instead we use a trick:
// Read(2) -> size
// Read(size) -> content
// repeat
// Read is blocking, but it doesn't matter since the
// best thing to do until the full message arrives,
// is to wait.
// => this is the most elegant AND fast solution.
// + no resizing
// + no extra allocations, just one for the content
// + no crazy extraction logic
while (true)
{
// read the next message (blocking) or stop if stream closed
if (!ReadMessageBlocking(stream, MaxMessageSize, headerBuffer, receiveBuffer, out int size))
// break instead of return so stream close still happens!
break;
// create arraysegment for the read message
ArraySegment<byte> message = new ArraySegment<byte>(receiveBuffer, 0, size);
// send to main thread via pipe
// -> it'll copy the message internally so we can reuse the
// receive buffer for next read!
receivePipe.Enqueue(connectionId, EventType.Data, message);
// disconnect if receive pipe gets too big for this connectionId.
// -> avoids ever growing queue memory if network is slower
// than input
// -> disconnecting is great for load balancing. better to
// disconnect one connection than risking every
// connection / the whole server
if (receivePipe.Count(connectionId) >= QueueLimit)
{
// log the reason
Log.Warning($"receivePipe reached limit of {QueueLimit} for connectionId {connectionId}. This can happen if network messages come in way faster than we manage to process them. Disconnecting this connection for load balancing.");
// IMPORTANT: do NOT clear the whole queue. we use one
// queue for all connections.
//receivePipe.Clear();
// just break. the finally{} will close everything.
break;
}
}
}
catch (Exception exception)
{
// something went wrong. the thread was interrupted or the
// connection closed or we closed our own connection or ...
// -> either way we should stop gracefully
Log.Info("ReceiveLoop: finished receive function for connectionId=" + connectionId + " reason: " + exception);
}
finally
{
// clean up no matter what
stream.Close();
client.Close();
// add 'Disconnected' message after disconnecting properly.
// -> always AFTER closing the streams to avoid a race condition
// where Disconnected -> Reconnect wouldn't work because
// Connected is still true for a short moment before the stream
// would be closed.
receivePipe.Enqueue(connectionId, EventType.Disconnected, default);
}
}
// thread send function
// note: we really do need one per connection, so that if one connection
// blocks, the rest will still continue to get sends
public static void SendLoop(int connectionId, TcpClient client, MagnificentSendPipe sendPipe, ManualResetEvent sendPending)
{
// get NetworkStream from client
NetworkStream stream = client.GetStream();
// avoid payload[packetSize] allocations. size increases dynamically as
// needed for batching.
//
// IMPORTANT: DO NOT make this a member, otherwise every connection
// on the server would use the same buffer simulatenously
byte[] payload = null;
try
{
while (client.Connected) // try this. client will get closed eventually.
{
// reset ManualResetEvent before we do anything else. this
// way there is no race condition. if Send() is called again
// while in here then it will be properly detected next time
// -> otherwise Send might be called right after dequeue but
// before .Reset, which would completely ignore it until
// the next Send call.
sendPending.Reset(); // WaitOne() blocks until .Set() again
// dequeue & serialize all
// a locked{} TryDequeueAll is twice as fast as
// ConcurrentQueue, see SafeQueue.cs!
if (sendPipe.DequeueAndSerializeAll(ref payload, out int packetSize))
{
// send messages (blocking) or stop if stream is closed
if (!SendMessagesBlocking(stream, payload, packetSize))
// break instead of return so stream close still happens!
break;
}
// don't choke up the CPU: wait until queue not empty anymore
sendPending.WaitOne();
}
}
catch (ThreadAbortException)
{
// happens on stop. don't log anything.
}
catch (ThreadInterruptedException)
{
// happens if receive thread interrupts send thread.
}
catch (Exception exception)
{
// something went wrong. the thread was interrupted or the
// connection closed or we closed our own connection or ...
// -> either way we should stop gracefully
Log.Info("SendLoop Exception: connectionId=" + connectionId + " reason: " + exception);
}
finally
{
// clean up no matter what
// we might get SocketExceptions when sending if the 'host has
// failed to respond' - in which case we should close the connection
// which causes the ReceiveLoop to end and fire the Disconnected
// message. otherwise the connection would stay alive forever even
// though we can't send anymore.
stream.Close();
client.Close();
}
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: d01598bf851164dc48a24c26913460b9
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/ThreadFunctions.cs
uploadId: 736421

View File

@ -0,0 +1,23 @@
namespace Telepathy
{
public static class Utils
{
// IntToBytes version that doesn't allocate a new byte[4] each time.
// -> important for MMO scale networking performance.
public static void IntToBytesBigEndianNonAlloc(int value, byte[] bytes, int offset = 0)
{
bytes[offset + 0] = (byte)(value >> 24);
bytes[offset + 1] = (byte)(value >> 16);
bytes[offset + 2] = (byte)(value >> 8);
bytes[offset + 3] = (byte)value;
}
public static int BytesToIntBigEndian(byte[] bytes)
{
return (bytes[0] << 24) |
(bytes[1] << 16) |
(bytes[2] << 8) |
bytes[3];
}
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: 951d08c05297f4b3e8feb5bfcab86531
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/Utils.cs
uploadId: 736421

View File

@ -0,0 +1,65 @@
V1.9 [2023-11-10]
- fix: Always enqueue Disconnected event (imer)
V1.8 [2021-06-02]
- fix: Do not set timeouts on listener (fixes https://github.com/vis2k/Mirror/issues/2695)
- fix: #104 - ReadSafely now catches ObjectDisposedException too
V1.7 [2021-02-20]
- ReceiveTimeout: disabled by default for cases where people use Telepathy by
itself without pings etc.
V1.6 [2021-02-10]
- configurable ReceiveTimeout to avoid TCPs high default timeout
- Server/Client receive queue limit now disconnects instead of showing a
warning. this is necessary for load balancing to avoid situations where one
spamming connection might fill the queue and slow down everyone else.
V1.5 [2021-02-05]
- fix: client data races & flaky tests fixed by creating a new client state
object every time we connect. fixes data race where an old dieing thread
might still try to modify the current state
- fix: Client.ReceiveThreadFunction catches and ignores ObjectDisposedException
which can happen if Disconnect() closes and disposes the client, while the
ReceiveThread just starts up and still uses the client.
- Server/Client Tick() optional enabled check for Mirror scene changing
V1.4 [2021-02-03]
- Server/Client.Tick: limit parameter added to process up to 'limit' messages.
makes Mirror & DOTSNET transports easier to implement
- stability: Server/Client send queue limit disconnects instead of showing a
warning. allows for load balancing. better to kick one connection and keep
the server running than slowing everything down for everyone.
V1.3 [2021-02-02]
- perf: ReceivePipe: byte[] pool for allocation free receives (╯°□°)╯︵ ┻━┻
- fix: header buffer, payload buffer data races because they were made non
static earlier. server threads would all access the same ones.
=> all threaded code was moved into a static ThreadFunctions class to make it
100% obvious that there should be no shared state in the future
V1.2 [2021-02-02]
- Client/Server Tick & OnConnected/OnData/OnDisconnected events instead of
having the outside process messages via GetNextMessage. That's easier for
Mirror/DOTSNET and allows for allocation free data message processing later.
- MagnificientSend/RecvPipe to shield Telepathy from all the complexity
- perf: SendPipe: byte[] pool for allocation free sends (╯°□°)╯︵ ┻━┻
V1.1 [2021-02-01]
- stability: added more tests
- breaking: Server/Client.Send: ArraySegment parameter and copy internally so
that Transports don't need to worry about it
- perf: Buffer.BlockCopy instead of Array.Copy
- perf: SendMessageBlocking puts message header directly into payload now
- perf: receiveQueues use SafeQueue instead of ConcurrentQueue to avoid
allocations
- Common: removed static state
- perf: SafeQueue.TryDequeueAll: avoid queue.ToArray() allocations. copy into a
list instead.
- Logger.Log/LogWarning/LogError renamed to Log.Info/Warning/Error
- MaxMessageSize is now specified in constructor to prepare for pooling
- flaky tests are ignored for now
- smaller improvements
V1.0
- first stable release

View File

@ -0,0 +1,14 @@
fileFormatVersion: 2
guid: d942af06608be434dbeeaa58207d20bd
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/Telepathy/VERSION
uploadId: 736421

View File

@ -0,0 +1,251 @@
// wraps Telepathy for use as HLAPI TransportLayer
using System;
using System.Net;
using System.Net.Sockets;
using UnityEngine;
// Replaced by Kcp November 2020
namespace Mirror
{
[HelpURL("https://github.com/vis2k/Telepathy/blob/master/README.md")]
[DisallowMultipleComponent]
public class TelepathyTransport : Transport, PortTransport
{
// scheme used by this transport
// "tcp4" means tcp with 4 bytes header, network byte order
public const string Scheme = "tcp4";
public ushort port = 7777;
public ushort Port { get => port; set => port=value; }
[Header("Common")]
[Tooltip("Nagle Algorithm can be disabled by enabling NoDelay")]
public bool NoDelay = true;
[Tooltip("Send timeout in milliseconds.")]
public int SendTimeout = 5000;
[Tooltip("Receive timeout in milliseconds. High by default so users don't time out during scene changes.")]
public int ReceiveTimeout = 30000;
[Header("Server")]
[Tooltip("Protect against allocation attacks by keeping the max message size small. Otherwise an attacker might send multiple fake packets with 2GB headers, causing the server to run out of memory after allocating multiple large packets.")]
public int serverMaxMessageSize = 16 * 1024;
[Tooltip("Server processes a limit amount of messages per tick to avoid a deadlock where it might end up processing forever if messages come in faster than we can process them.")]
public int serverMaxReceivesPerTick = 10000;
[Tooltip("Server send queue limit per connection for pending messages. Telepathy will disconnect a connection's queues reach that limit for load balancing. Better to kick one slow client than slowing down the whole server.")]
public int serverSendQueueLimitPerConnection = 10000;
[Tooltip("Server receive queue limit per connection for pending messages. Telepathy will disconnect a connection's queues reach that limit for load balancing. Better to kick one slow client than slowing down the whole server.")]
public int serverReceiveQueueLimitPerConnection = 10000;
[Header("Client")]
[Tooltip("Protect against allocation attacks by keeping the max message size small. Otherwise an attacker host might send multiple fake packets with 2GB headers, causing the connected clients to run out of memory after allocating multiple large packets.")]
public int clientMaxMessageSize = 16 * 1024;
[Tooltip("Client processes a limit amount of messages per tick to avoid a deadlock where it might end up processing forever if messages come in faster than we can process them.")]
public int clientMaxReceivesPerTick = 1000;
[Tooltip("Client send queue limit for pending messages. Telepathy will disconnect if the connection's queues reach that limit in order to avoid ever growing latencies.")]
public int clientSendQueueLimit = 10000;
[Tooltip("Client receive queue limit for pending messages. Telepathy will disconnect if the connection's queues reach that limit in order to avoid ever growing latencies.")]
public int clientReceiveQueueLimit = 10000;
Telepathy.Client client;
Telepathy.Server server;
// scene change message needs to halt message processing immediately
// Telepathy.Tick() has a enabledCheck parameter that we can use, but
// let's only allocate it once.
Func<bool> enabledCheck;
void Awake()
{
// tell Telepathy to use Unity's Debug.Log
Telepathy.Log.Info = Debug.Log;
Telepathy.Log.Warning = Debug.LogWarning;
Telepathy.Log.Error = Debug.LogError;
// allocate enabled check only once
enabledCheck = () => enabled;
Debug.Log("TelepathyTransport initialized!");
}
// C#'s built in TCP sockets run everywhere except on WebGL
// Do not change this back to using Application.platform
// because that doesn't work in the Editor!
public override bool Available() =>
#if UNITY_WEBGL
false;
#else
true;
#endif
// client
private void CreateClient()
{
// create client
client = new Telepathy.Client(clientMaxMessageSize);
// client hooks
// other systems hook into transport events in OnCreate or
// OnStartRunning in no particular order. the only way to avoid
// race conditions where telepathy uses OnConnected before another
// system's hook (e.g. statistics OnData) was added is to wrap
// them all in a lambda and always call the latest hook.
// (= lazy call)
client.OnConnected = () => OnClientConnected.Invoke();
client.OnData = (segment) => OnClientDataReceived.Invoke(segment, Channels.Reliable);
// fix: https://github.com/vis2k/Mirror/issues/3287
// Telepathy may call OnDisconnected twice.
// Mirror may have cleared the callback already, so use "?." here.
client.OnDisconnected = () => OnClientDisconnected?.Invoke();
// client configuration
client.NoDelay = NoDelay;
client.SendTimeout = SendTimeout;
client.ReceiveTimeout = ReceiveTimeout;
client.SendQueueLimit = clientSendQueueLimit;
client.ReceiveQueueLimit = clientReceiveQueueLimit;
}
public override bool ClientConnected() => client != null && client.Connected;
public override void ClientConnect(string address)
{
CreateClient();
client.Connect(address, port);
}
public override void ClientConnect(Uri uri)
{
CreateClient();
if (uri.Scheme != Scheme)
throw new ArgumentException($"Invalid url {uri}, use {Scheme}://host:port instead", nameof(uri));
int serverPort = uri.IsDefaultPort ? port : uri.Port;
client.Connect(uri.Host, serverPort);
}
public override void ClientSend(ArraySegment<byte> segment, int channelId)
{
client?.Send(segment);
// call event. might be null if no statistics are listening etc.
OnClientDataSent?.Invoke(segment, Channels.Reliable);
}
public override void ClientDisconnect()
{
client?.Disconnect();
client = null;
// client triggers the disconnected event in client.Tick() which won't be run anymore
OnClientDisconnected?.Invoke();
}
// messages should always be processed in early update
public override void ClientEarlyUpdate()
{
// note: we need to check enabled in case we set it to false
// when LateUpdate already started.
// (https://github.com/vis2k/Mirror/pull/379)
if (!enabled) return;
// process a maximum amount of client messages per tick
// IMPORTANT: check .enabled to stop processing immediately after a
// scene change message arrives!
client?.Tick(clientMaxReceivesPerTick, enabledCheck);
}
// server
public override Uri ServerUri()
{
UriBuilder builder = new UriBuilder();
builder.Scheme = Scheme;
builder.Host = Dns.GetHostName();
builder.Port = port;
return builder.Uri;
}
public override bool ServerActive() => server != null && server.Active;
public override void ServerStart()
{
// create server
server = new Telepathy.Server(serverMaxMessageSize);
// server hooks
// other systems hook into transport events in OnCreate or
// OnStartRunning in no particular order. the only way to avoid
// race conditions where telepathy uses OnConnected before another
// system's hook (e.g. statistics OnData) was added is to wrap
// them all in a lambda and always call the latest hook.
// (= lazy call)
server.OnConnected = (connectionId, remoteClientAddress) => OnServerConnectedWithAddress.Invoke(connectionId, remoteClientAddress);
server.OnData = (connectionId, segment) => OnServerDataReceived.Invoke(connectionId, segment, Channels.Reliable);
server.OnDisconnected = (connectionId) => OnServerDisconnected.Invoke(connectionId);
// server configuration
server.NoDelay = NoDelay;
server.SendTimeout = SendTimeout;
server.ReceiveTimeout = ReceiveTimeout;
server.SendQueueLimit = serverSendQueueLimitPerConnection;
server.ReceiveQueueLimit = serverReceiveQueueLimitPerConnection;
server.Start(port);
}
public override void ServerSend(int connectionId, ArraySegment<byte> segment, int channelId)
{
server?.Send(connectionId, segment);
// call event. might be null if no statistics are listening etc.
OnServerDataSent?.Invoke(connectionId, segment, Channels.Reliable);
}
public override void ServerDisconnect(int connectionId) => server?.Disconnect(connectionId);
public override string ServerGetClientAddress(int connectionId) => server?.GetClientAddress(connectionId);
public override void ServerStop()
{
server?.Stop();
server = null;
}
// messages should always be processed in early update
public override void ServerEarlyUpdate()
{
// note: we need to check enabled in case we set it to false
// when LateUpdate already started.
// (https://github.com/vis2k/Mirror/pull/379)
if (!enabled) return;
// process a maximum amount of server messages per tick
// IMPORTANT: check .enabled to stop processing immediately after a
// scene change message arrives!
server?.Tick(serverMaxReceivesPerTick, enabledCheck);
}
// common
public override void Shutdown()
{
Debug.Log("TelepathyTransport Shutdown()");
client?.Disconnect();
client = null;
server?.Stop();
server = null;
}
public override int GetMaxPacketSize(int channelId)
{
return serverMaxMessageSize;
}
// Keep it short and simple so it looks nice in the HUD.
//
// printing server.listener.LocalEndpoint causes an Exception
// in UWP + Unity 2019:
// Exception thrown at 0x00007FF9755DA388 in UWF.exe:
// Microsoft C++ exception: Il2CppExceptionWrapper at memory
// location 0x000000E15A0FCDD0. SocketException: An address
// incompatible with the requested protocol was used at
// System.Net.Sockets.Socket.get_LocalEndPoint ()
// so just use the regular port instead.
public override string ToString() => $"Telepathy [{port}]";
}
}

View File

@ -0,0 +1,18 @@
fileFormatVersion: 2
guid: c7424c1070fad4ba2a7a96b02fbeb4bb
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 1000
icon: {fileID: 2800000, guid: 7453abfe9e8b2c04a8a47eb536fe21eb, type: 3}
userData:
assetBundleName:
assetBundleVariant:
AssetOrigin:
serializedVersion: 1
productId: 129321
packageName: Mirror
packageVersion: 96.0.1
assetPath: Assets/Mirror/Transports/Telepathy/TelepathyTransport.cs
uploadId: 736421