// This file is provided under The MIT License as part of RiptideNetworking.
// Copyright (c) Tom Weiland
// For additional information please see the included LICENSE.md file or view it on GitHub:
// https://github.com/RiptideNetworking/Riptide/blob/main/LICENSE.md
using Riptide.Transports;
using Riptide.Utils;
using System;
using System.Collections.Generic;
namespace Riptide
{
/// The state of a connection.
internal enum ConnectionState : byte
{
/// Not connected. No connection has been established or the connection has been closed.
NotConnected,
/// Connecting. Still trying to establish a connection.
Connecting,
/// Connection is pending. The server is still determining whether or not the connection should be allowed.
Pending,
/// Connected. A connection has been established successfully.
Connected,
}
/// Represents a connection to a or .
public abstract class Connection
{
/// Invoked when the notify message with the given sequence ID is successfully delivered.
public Action NotifyDelivered;
/// Invoked when the notify message with the given sequence ID is lost.
public Action NotifyLost;
/// Invoked when a notify message is received.
public Action NotifyReceived;
/// Invoked when the reliable message with the given sequence ID is successfully delivered.
public Action ReliableDelivered;
/// The connection's numeric ID.
public ushort Id { get; internal set; }
/// Whether or not the connection is currently not trying to connect, pending, nor actively connected.
public bool IsNotConnected => state == ConnectionState.NotConnected;
/// Whether or not the connection is currently in the process of connecting.
public bool IsConnecting => state == ConnectionState.Connecting;
/// Whether or not the connection is currently pending (waiting to be accepted/rejected by the server).
public bool IsPending => state == ConnectionState.Pending;
/// Whether or not the connection is currently connected.
public bool IsConnected => state == ConnectionState.Connected;
/// The round trip time (ping) of the connection, in milliseconds. -1 if not calculated yet.
public short RTT
{
get => _rtt;
private set
{
SmoothRTT = _rtt == -1 ? value : (short)Math.Max(1f, SmoothRTT * 0.7f + value * 0.3f);
_rtt = value;
}
}
private short _rtt;
/// The smoothed round trip time (ping) of the connection, in milliseconds. -1 if not calculated yet.
/// This value is slower to accurately represent lasting changes in latency than , but it is less susceptible to changing drastically due to significant—but temporary—jumps in latency.
public short SmoothRTT { get; private set; }
/// The time (in milliseconds) after which to disconnect if no heartbeats are received.
public int TimeoutTime { get; set; }
/// Whether or not the connection can time out.
public bool CanTimeout
{
get => _canTimeout;
set
{
if (value)
ResetTimeout();
_canTimeout = value;
}
}
private bool _canTimeout;
/// Whether or not the connection can disconnect due to poor connection quality.
/// When this is set to , , ,
/// and are ignored and exceeding their values will not trigger a disconnection.
public bool CanQualityDisconnect;
/// The connection's metrics.
public readonly ConnectionMetrics Metrics;
/// The maximum acceptable average number of send attempts it takes to deliver a reliable message. The connection
/// will be closed if this is exceeded more than times in a row.
public int MaxAvgSendAttempts;
/// How many consecutive times can be exceeded before triggering a disconnect.
public int AvgSendAttemptsResilience;
/// The absolute maximum number of times a reliable message may be sent. A single message reaching this threshold will cause a disconnection.
public int MaxSendAttempts;
/// The maximum acceptable loss rate of notify messages. The connection will be closed if this is exceeded more than times in a row.
public float MaxNotifyLoss;
/// How many consecutive times can be exceeded before triggering a disconnect.
public int NotifyLossResilience;
/// The local peer this connection is associated with.
internal Peer Peer { get; private set; }
/// Whether or not the connection has timed out.
internal bool HasTimedOut => _canTimeout && Peer.CurrentTime - lastHeartbeat > TimeoutTime;
/// Whether or not the connection attempt has timed out.
internal bool HasConnectAttemptTimedOut => _canTimeout && Peer.CurrentTime - lastHeartbeat > Peer.ConnectTimeoutTime;
/// The sequencer for notify messages.
private readonly NotifySequencer notify;
/// The sequencer for reliable messages.
private readonly ReliableSequencer reliable;
/// The currently pending reliably sent messages whose delivery has not been acknowledged yet. Stored by sequence ID.
private readonly Dictionary pendingMessages;
/// The connection's current state.
private ConnectionState state;
/// The number of consecutive times that the threshold was exceeded.
private int sendAttemptsViolations;
/// The number of consecutive times that the threshold was exceeded.
private int lossRateViolations;
/// The time at which the last heartbeat was received from the other end.
private long lastHeartbeat;
/// The ID of the last ping that was sent.
private byte lastPingId;
/// The ID of the currently pending ping.
private byte pendingPingId;
/// The time at which the currently pending ping was sent.
private long pendingPingSendTime;
/// Initializes the connection.
protected Connection()
{
Metrics = new ConnectionMetrics();
notify = new NotifySequencer(this);
reliable = new ReliableSequencer(this);
state = ConnectionState.Connecting;
_rtt = -1;
SmoothRTT = -1;
_canTimeout = true;
CanQualityDisconnect = true;
MaxAvgSendAttempts = 5;
AvgSendAttemptsResilience = 64;
MaxSendAttempts = 15;
MaxNotifyLoss = 0.05f; // 5%
NotifyLossResilience = 64;
pendingMessages = new Dictionary();
}
/// Initializes connection data.
/// The which this connection belongs to.
/// The timeout time.
internal void Initialize(Peer peer, int timeoutTime)
{
Peer = peer;
TimeoutTime = timeoutTime;
}
/// Resets the connection's timeout time.
public void ResetTimeout()
{
lastHeartbeat = Peer.CurrentTime;
}
/// Sends a message.
/// The message to send.
/// Whether or not to return the message to the pool after it is sent.
/// For reliable and notify messages, the sequence ID that the message was sent with. 0 for unreliable messages.
///
/// If you intend to continue using the message instance after calling this method, you must set
/// to . can be used to manually return the message to the pool at a later time.
///
public ushort Send(Message message, bool shouldRelease = true)
{
ushort sequenceId = 0;
if (message.SendMode == MessageSendMode.Notify)
{
sequenceId = notify.InsertHeader(message);
int byteAmount = message.BytesInUse;
Buffer.BlockCopy(message.Data, 0, Message.ByteBuffer, 0, byteAmount);
Send(Message.ByteBuffer, byteAmount);
Metrics.SentNotify(byteAmount);
}
else if (message.SendMode == MessageSendMode.Unreliable)
{
int byteAmount = message.BytesInUse;
Buffer.BlockCopy(message.Data, 0, Message.ByteBuffer, 0, byteAmount);
Send(Message.ByteBuffer, byteAmount);
Metrics.SentUnreliable(byteAmount);
}
else
{
sequenceId = reliable.NextSequenceId;
PendingMessage pendingMessage = PendingMessage.Create(sequenceId, message, this);
pendingMessages.Add(sequenceId, pendingMessage);
pendingMessage.TrySend();
Metrics.ReliableUniques++;
}
if (shouldRelease)
message.Release();
return sequenceId;
}
/// Sends data.
/// The array containing the data.
/// The number of bytes in the array which should be sent.
protected internal abstract void Send(byte[] dataBuffer, int amount);
/// Processes a notify message.
/// The received data.
/// The number of bytes that were received.
/// The message instance to use.
internal void ProcessNotify(byte[] dataBuffer, int amount, Message message)
{
notify.UpdateReceivedAcks(Converter.UShortFromBits(dataBuffer, Message.HeaderBits), Converter.ByteFromBits(dataBuffer, Message.HeaderBits + 16));
Metrics.ReceivedNotify(amount);
if (notify.ShouldHandle(Converter.UShortFromBits(dataBuffer, Message.HeaderBits + 24)))
{
Buffer.BlockCopy(dataBuffer, 1, message.Data, 1, amount - 1); // Copy payload
NotifyReceived?.Invoke(message);
}
else
Metrics.NotifyDiscarded++;
}
/// Determines if the message with the given sequence ID should be handled.
/// The message's sequence ID.
/// Whether or not the message should be handled.
internal bool ShouldHandle(ushort sequenceId)
{
return reliable.ShouldHandle(sequenceId);
}
/// Cleans up the local side of the connection.
internal void LocalDisconnect()
{
state = ConnectionState.NotConnected;
foreach (PendingMessage pendingMessage in pendingMessages.Values)
pendingMessage.Clear();
pendingMessages.Clear();
}
/// Resends the with the given sequence ID.
/// The sequence ID of the message to resend.
private void ResendMessage(ushort sequenceId)
{
if (pendingMessages.TryGetValue(sequenceId, out PendingMessage pendingMessage))
pendingMessage.RetrySend();
}
/// Clears the with the given sequence ID.
/// The sequence ID that was acknowledged.
internal void ClearMessage(ushort sequenceId)
{
if (pendingMessages.TryGetValue(sequenceId, out PendingMessage pendingMessage))
{
ReliableDelivered?.Invoke(sequenceId);
pendingMessage.Clear();
pendingMessages.Remove(sequenceId);
UpdateSendAttemptsViolations();
}
}
/// Puts the connection in the pending state.
internal void SetPending()
{
if (IsConnecting)
{
state = ConnectionState.Pending;
ResetTimeout();
}
}
/// Checks the average send attempts (of reliable messages) and updates accordingly.
private void UpdateSendAttemptsViolations()
{
if (Metrics.RollingReliableSends.Mean > MaxAvgSendAttempts)
{
sendAttemptsViolations++;
if (sendAttemptsViolations >= AvgSendAttemptsResilience)
Peer.Disconnect(this, DisconnectReason.PoorConnection);
}
else
sendAttemptsViolations = 0;
}
/// Checks the loss rate (of notify messages) and updates accordingly.
private void UpdateLossViolations()
{
if (Metrics.RollingNotifyLossRate > MaxNotifyLoss)
{
lossRateViolations++;
if (lossRateViolations >= NotifyLossResilience)
Peer.Disconnect(this, DisconnectReason.PoorConnection);
}
else
lossRateViolations = 0;
}
#region Messages
/// Sends an ack message for the given sequence ID.
/// The sequence ID to acknowledge.
/// The sequence ID of the latest message we've received.
/// Sequence IDs of previous messages that we have (or have not received).
private void SendAck(ushort forSeqId, ushort lastReceivedSeqId, Bitfield receivedSeqIds)
{
Message message = Message.Create(MessageHeader.Ack);
message.AddUShort(lastReceivedSeqId);
message.AddUShort(receivedSeqIds.First16);
if (forSeqId == lastReceivedSeqId)
message.AddBool(false);
else
message.AddBool(true);
message.AddUShort(forSeqId);
Send(message);
}
/// Handles an ack message.
/// The ack message to handle.
internal void HandleAck(Message message)
{
ushort remoteLastReceivedSeqId = message.GetUShort();
ushort remoteAcksBitField = message.GetUShort();
ushort ackedSeqId = message.GetBool() ? message.GetUShort() : remoteLastReceivedSeqId;
ClearMessage(ackedSeqId);
reliable.UpdateReceivedAcks(remoteLastReceivedSeqId, remoteAcksBitField);
}
#region Server
/// Sends a welcome message.
internal void SendWelcome()
{
Message message = Message.Create(MessageHeader.Welcome);
message.AddUShort(Id);
Send(message);
}
/// Handles a welcome message on the server.
/// The welcome message to handle.
/// Whether or not the connection is now connected.
internal bool HandleWelcomeResponse(Message message)
{
if (!IsPending)
return false;
ushort id = message.GetUShort();
if (Id != id)
RiptideLogger.Log(LogType.Error, Peer.LogName, $"Client has assumed ID {id} instead of {Id}!");
state = ConnectionState.Connected;
ResetTimeout();
return true;
}
/// Handles a heartbeat message.
/// The heartbeat message to handle.
internal void HandleHeartbeat(Message message)
{
if (!IsConnected)
return; // A client that is not yet fully connected should not be sending heartbeats
RespondHeartbeat(message.GetByte());
RTT = message.GetShort();
ResetTimeout();
}
/// Sends a heartbeat message.
private void RespondHeartbeat(byte pingId)
{
Message message = Message.Create(MessageHeader.Heartbeat);
message.AddByte(pingId);
Send(message);
}
#endregion
#region Client
/// Handles a welcome message on the client.
/// The welcome message to handle.
internal void HandleWelcome(Message message)
{
Id = message.GetUShort();
state = ConnectionState.Connected;
ResetTimeout();
RespondWelcome();
}
/// Sends a welcome response message.
private void RespondWelcome()
{
Message message = Message.Create(MessageHeader.Welcome);
message.AddUShort(Id);
Send(message);
}
/// Sends a heartbeat message.
internal void SendHeartbeat()
{
pendingPingId = lastPingId++;
pendingPingSendTime = Peer.CurrentTime;
Message message = Message.Create(MessageHeader.Heartbeat);
message.AddByte(pendingPingId);
message.AddShort(RTT);
Send(message);
}
/// Handles a heartbeat message.
/// The heartbeat message to handle.
internal void HandleHeartbeatResponse(Message message)
{
byte pingId = message.GetByte();
if (pendingPingId == pingId)
RTT = (short)Math.Max(1, Peer.CurrentTime - pendingPingSendTime);
ResetTimeout();
}
#endregion
#endregion
#region Events
/// Invokes the event.
/// The sequence ID of the delivered message.
protected virtual void OnNotifyDelivered(ushort sequenceId)
{
Metrics.DeliveredNotify();
NotifyDelivered?.Invoke(sequenceId);
UpdateLossViolations();
}
/// Invokes the event.
/// The sequence ID of the lost message.
protected virtual void OnNotifyLost(ushort sequenceId)
{
Metrics.LostNotify();
NotifyLost?.Invoke(sequenceId);
UpdateLossViolations();
}
#endregion
#region Message Sequencing
/// Provides functionality for filtering out duplicate messages and determining delivery/loss status.
private abstract class Sequencer
{
/// The next sequence ID to use.
internal ushort NextSequenceId => _nextSequenceId++;
private ushort _nextSequenceId = 1;
/// The connection this sequencer belongs to.
protected readonly Connection connection;
/// The sequence ID of the latest message that we want to acknowledge.
protected ushort lastReceivedSeqId;
/// Sequence IDs of messages which we have (or have not) received and want to acknowledge.
protected readonly Bitfield receivedSeqIds = new Bitfield();
/// The sequence ID of the latest message that we've received an ack for.
protected ushort lastAckedSeqId;
/// Sequence IDs of messages we sent and which we have (or have not) received acks for.
protected readonly Bitfield ackedSeqIds = new Bitfield(false);
/// Initializes the sequencer.
/// The connection this sequencer belongs to.
protected Sequencer(Connection connection)
{
this.connection = connection;
}
/// Determines whether or not to handle a message with the given sequence ID.
/// The sequence ID in question.
/// Whether or not to handle the message.
internal abstract bool ShouldHandle(ushort sequenceId);
/// Updates which messages we've received acks for.
/// The latest sequence ID that the other end has received.
/// Sequence IDs which the other end has (or has not) received.
internal abstract void UpdateReceivedAcks(ushort remoteLastReceivedSeqId, ushort remoteReceivedSeqIds);
}
///
private class NotifySequencer : Sequencer
{
///
internal NotifySequencer(Connection connection) : base(connection) { }
/// Inserts the notify header into the given message.
/// The message to insert the header into.
/// The sequence ID of the message.
internal ushort InsertHeader(Message message)
{
ushort sequenceId = NextSequenceId;
ulong notifyBits = lastReceivedSeqId | ((ulong)receivedSeqIds.First8 << (2 * Converter.BitsPerByte)) | ((ulong)sequenceId << (3 * Converter.BitsPerByte));
message.SetBits(notifyBits, 5 * Converter.BitsPerByte, Message.HeaderBits);
return sequenceId;
}
///
/// Duplicate and out of order messages are filtered out and not handled.
internal override bool ShouldHandle(ushort sequenceId)
{
int sequenceGap = Helper.GetSequenceGap(sequenceId, lastReceivedSeqId);
if (sequenceGap > 0)
{
// The received sequence ID is newer than the previous one
receivedSeqIds.ShiftBy(sequenceGap);
lastReceivedSeqId = sequenceId;
if (receivedSeqIds.IsSet(sequenceGap))
return false;
receivedSeqIds.Set(sequenceGap);
return true;
}
else
{
// The received sequence ID is older than or the same as the previous one (out of order or duplicate message)
return false;
}
}
///
internal override void UpdateReceivedAcks(ushort remoteLastReceivedSeqId, ushort remoteReceivedSeqIds)
{
int sequenceGap = Helper.GetSequenceGap(remoteLastReceivedSeqId, lastAckedSeqId);
if (sequenceGap > 0)
{
if (sequenceGap > 1)
{
// Deal with messages in the gap
while (sequenceGap > 9) // 9 because a gap of 1 means sequence IDs are consecutive, and notify uses 8 bits for the bitfield. 9 means all 8 bits are in use
{
lastAckedSeqId++;
sequenceGap--;
connection.NotifyLost?.Invoke(lastAckedSeqId);
}
int bitCount = sequenceGap - 1;
int bit = 1 << bitCount;
for (int i = 0; i < bitCount; i++)
{
lastAckedSeqId++;
bit >>= 1;
if ((remoteReceivedSeqIds & bit) == 0)
connection.OnNotifyLost(lastAckedSeqId);
else
connection.OnNotifyDelivered(lastAckedSeqId);
}
}
lastAckedSeqId = remoteLastReceivedSeqId;
connection.OnNotifyDelivered(lastAckedSeqId);
}
}
}
///
private class ReliableSequencer : Sequencer
{
///
internal ReliableSequencer(Connection connection) : base(connection) { }
///
/// Duplicate messages are filtered out while out of order messages are handled.
internal override bool ShouldHandle(ushort sequenceId)
{
bool doHandle = false;
int sequenceGap = Helper.GetSequenceGap(sequenceId, lastReceivedSeqId);
if (sequenceGap != 0)
{
// The received sequence ID is different from the previous one
if (sequenceGap > 0)
{
// The received sequence ID is newer than the previous one
if (sequenceGap > 64)
RiptideLogger.Log(LogType.Warning, connection.Peer.LogName, $"The gap between received sequence IDs was very large ({sequenceGap})!");
receivedSeqIds.ShiftBy(sequenceGap);
lastReceivedSeqId = sequenceId;
}
else // The received sequence ID is older than the previous one (out of order message)
sequenceGap = -sequenceGap;
doHandle = !receivedSeqIds.IsSet(sequenceGap);
receivedSeqIds.Set(sequenceGap);
}
connection.SendAck(sequenceId, lastReceivedSeqId, receivedSeqIds);
return doHandle;
}
/// Updates which messages we've received acks for.
/// The latest sequence ID that the other end has received.
/// Sequence IDs which the other end has (or has not) received.
internal override void UpdateReceivedAcks(ushort remoteLastReceivedSeqId, ushort remoteReceivedSeqIds)
{
int sequenceGap = Helper.GetSequenceGap(remoteLastReceivedSeqId, lastAckedSeqId);
if (sequenceGap > 0)
{
// The latest sequence ID that the other end has received is newer than the previous one
if (!ackedSeqIds.HasCapacityFor(sequenceGap, out int overflow))
{
for (int i = 0; i < overflow; i++)
{
// Resend those messages which haven't been acked and whose sequence IDs are about to be pushed out of the bitfield
if (!ackedSeqIds.CheckAndTrimLast(out int checkedPosition))
connection.ResendMessage((ushort)(lastAckedSeqId - checkedPosition));
else
connection.ClearMessage((ushort)(lastAckedSeqId - checkedPosition));
}
}
ackedSeqIds.ShiftBy(sequenceGap);
lastAckedSeqId = remoteLastReceivedSeqId;
for (int i = 0; i < 16; i++)
{
// Clear any messages that have been newly acknowledged
if (!ackedSeqIds.IsSet(i + 1) && (remoteReceivedSeqIds & (1 << i)) != 0)
connection.ClearMessage((ushort)(lastAckedSeqId - (i + 1)));
}
ackedSeqIds.Combine(remoteReceivedSeqIds);
ackedSeqIds.Set(sequenceGap); // Ensure that the bit corresponding to the previous acked sequence ID is set
connection.ClearMessage(remoteLastReceivedSeqId);
}
else if (sequenceGap < 0)
{
// The latest sequence ID that the other end has received is older than the previous one (out of order ack)
ackedSeqIds.Set(-sequenceGap);
}
else
{
// The latest sequence ID that the other end has received is the same as the previous one (duplicate ack)
ackedSeqIds.Combine(remoteReceivedSeqIds);
}
}
}
#endregion
}
}