Skip to content

Commit

Permalink
Connection implementations shall issue the Disconnected callback befo…
Browse files Browse the repository at this point in the history
…re returning and must be in a destructing state.
  • Loading branch information
azuisleet committed Dec 1, 2017
1 parent d31e7a3 commit 517d052
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 96 deletions.
50 changes: 14 additions & 36 deletions SteamKit2/SteamKit2/Networking/Steam3/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ class TcpConnection : IConnection
private BinaryWriter netWriter;

private CancellationTokenSource cancellationToken;
private ManualResetEvent connectionFree;
private object netLock, connectLock;
private object netLock;

public TcpConnection()
{
netLock = new object();
connectLock = new object();
connectionFree = new ManualResetEvent(true);
}

public event EventHandler<NetMsgEventArgs> NetMsgReceived;
Expand Down Expand Up @@ -98,8 +95,6 @@ private void Release( bool userRequestedDisconnect )
}

Disconnected?.Invoke( this, new DisconnectedEventArgs( userRequestedDisconnect ) );

connectionFree.Set();
}

private void ConnectCompleted(bool success)
Expand Down Expand Up @@ -184,46 +179,29 @@ private void TryConnect(object sender)
/// <param name="timeout">Timeout in milliseconds</param>
public void Connect(EndPoint endPoint, int timeout)
{
lock (connectLock)
lock ( netLock )
{
Disconnect();

connectionFree.Reset();

lock (netLock)
{
Debug.Assert(cancellationToken == null);
cancellationToken = new CancellationTokenSource();
Debug.Assert( cancellationToken == null );
cancellationToken = new CancellationTokenSource();

socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.ReceiveTimeout = timeout;
socket.SendTimeout = timeout;
socket = new Socket( AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp );
socket.ReceiveTimeout = timeout;
socket.SendTimeout = timeout;

destination = endPoint;
DebugLog.WriteLine("TcpConnection", "Connecting to {0}...", destination);
TryConnect(timeout);
}
destination = endPoint;
DebugLog.WriteLine( "TcpConnection", "Connecting to {0}...", destination );
TryConnect( timeout );
}

}

public void Disconnect()
{
lock (connectLock)
lock ( netLock )
{
lock (netLock)
{
if (cancellationToken != null)
{
cancellationToken.Cancel();
}
else
{
// we already appear to be disconncted, nothing to wait for
return;
}
}
cancellationToken?.Cancel();

connectionFree.WaitOne();
Disconnected?.Invoke( this, new DisconnectedEventArgs( true ) );
}
}

Expand Down
9 changes: 3 additions & 6 deletions SteamKit2/SteamKit2/Networking/Steam3/UdpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private enum State
private DateTime timeOut;
private DateTime nextResend;

private uint sourceConnId = 512;
private static uint sourceConnId = 512;
private uint remoteConnId;

/// <summary>
Expand Down Expand Up @@ -116,8 +116,6 @@ public UdpConnection()
/// <param name="timeout">Timeout in milliseconds</param>
public void Connect(EndPoint endPoint, int timeout)
{
Disconnect();

outPackets = new List<UdpPacket>();
inPackets = new Dictionary<uint, UdpPacket>();

Expand Down Expand Up @@ -159,11 +157,10 @@ public void Disconnect()
SendSequenced(new UdpPacket(EUdpPacketType.Disconnect));
}

// Graceful shutdown allows for the connection to empty its queue of messages to send
netThread.Join();

// Advance this the same way that steam does, when a socket gets reused.
sourceConnId += 256;

Disconnected?.Invoke( this, new DisconnectedEventArgs( true ) );
}

/// <summary>
Expand Down
120 changes: 66 additions & 54 deletions SteamKit2/SteamKit2/Steam/CMClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class CMClient
/// Returns the the local IP of this client.
/// </summary>
/// <returns>The local IP.</returns>
public IPAddress LocalIP => connection.GetLocalIP();
public IPAddress LocalIP => connection?.GetLocalIP();

/// <summary>
/// Gets the universe of this client.
Expand Down Expand Up @@ -94,6 +94,8 @@ public abstract class CMClient

internal bool ExpectDisconnection { get; set; }

// connection lock around the setup and tear down of the connection task
object connectionLock = new object();
CancellationTokenSource connectionCancellation;
Task connectionSetupTask;
IConnection connection;
Expand Down Expand Up @@ -131,81 +133,91 @@ public CMClient( SteamConfiguration configuration )
/// The <see cref="IPEndPoint"/> of the CM server to connect to.
/// If <c>null</c>, SteamKit will randomly select a CM server from its internal list.
/// </param>
public void Connect( ServerRecord cmServer = null )
public void Connect( ServerRecord cmServer = null )
{
this.Disconnect();
Debug.Assert( connection == null );
lock ( connectionLock )
{
this.Disconnect();
Debug.Assert( connection == null );

var cancellation = new CancellationTokenSource();
var token = cancellation.Token;
var oldCancellation = Interlocked.Exchange( ref connectionCancellation, cancellation );
Debug.Assert( oldCancellation == null );
Debug.Assert( connectionCancellation == null );

ExpectDisconnection = false;
connectionCancellation = new CancellationTokenSource();
var token = connectionCancellation.Token;

Task<ServerRecord> recordTask = null;
ExpectDisconnection = false;

if ( cmServer == null )
{
recordTask = Servers.GetNextServerCandidateAsync( Configuration.ProtocolTypes );
}
else
{
recordTask = Task.FromResult( cmServer );
}
Task<ServerRecord> recordTask = null;

connectionSetupTask = recordTask.ContinueWith( t =>
{
if ( token.IsCancellationRequested )
if ( cmServer == null )
{
DebugLog.WriteLine( nameof(CMClient), "Connection cancelled before a server could be chosen." );
OnClientDisconnected( userInitiated: true );
return;
recordTask = Servers.GetNextServerCandidateAsync( Configuration.ProtocolTypes );
}
else if ( t.IsFaulted || t.IsCanceled )
else
{
DebugLog.WriteLine( nameof(CMClient), "Server record task threw exception: {0}", t.Exception );
OnClientDisconnected( userInitiated: false );
return;
recordTask = Task.FromResult( cmServer );
}

var record = t.Result;

connection = CreateConnection( record.ProtocolTypes & Configuration.ProtocolTypes );
connection.NetMsgReceived += NetMsgReceived;
connection.Connected += Connected;
connection.Disconnected += Disconnected;
connection.Connect( record.EndPoint, ( int )ConnectionTimeout.TotalMilliseconds );
}, TaskContinuationOptions.ExecuteSynchronously).ContinueWith(t =>
{
if ( t.IsFaulted )
connectionSetupTask = recordTask.ContinueWith( t =>
{
DebugLog.WriteLine( nameof(CMClient), "Unhandled exception when attempting to connect to Steam: {0}", t.Exception );
OnClientDisconnected( userInitiated: false );
}
}, TaskContinuationOptions.ExecuteSynchronously);
if ( token.IsCancellationRequested )
{
DebugLog.WriteLine( nameof( CMClient ), "Connection cancelled before a server could be chosen." );
OnClientDisconnected( userInitiated: true );
return;
}
else if ( t.IsFaulted || t.IsCanceled )
{
DebugLog.WriteLine( nameof( CMClient ), "Server record task threw exception: {0}", t.Exception );
OnClientDisconnected( userInitiated: false );
return;
}

var record = t.Result;

connection = CreateConnection( record.ProtocolTypes & Configuration.ProtocolTypes );
connection.NetMsgReceived += NetMsgReceived;
connection.Connected += Connected;
connection.Disconnected += Disconnected;
connection.Connect( record.EndPoint, ( int )ConnectionTimeout.TotalMilliseconds );
}, TaskContinuationOptions.ExecuteSynchronously ).ContinueWith( t =>
{
if ( t.IsFaulted )
{
DebugLog.WriteLine( nameof( CMClient ), "Unhandled exception when attempting to connect to Steam: {0}", t.Exception );
OnClientDisconnected( userInitiated: false );
}
}, TaskContinuationOptions.ExecuteSynchronously );
}
}

/// <summary>
/// Disconnects this client.
/// </summary>
public void Disconnect()
{
heartBeatFunc.Stop();

var cts = Interlocked.Exchange(ref connectionCancellation, null);
if (cts != null)
lock ( connectionLock )
{
cts.Cancel();
cts.Dispose();
}
heartBeatFunc.Stop();

var task = Interlocked.Exchange(ref connectionSetupTask, null);
if ( task != null )
{
task.GetAwaiter().GetResult();
if ( connectionCancellation != null )
{
connectionCancellation.Cancel();
connectionCancellation.Dispose();
connectionCancellation = null;
}

if ( connectionSetupTask != null )
{
// though it's ugly, we want to wait for the completion of this task and keep hold of the lock
connectionSetupTask.GetAwaiter().GetResult();
connectionSetupTask = null;
}

// Connection implementations are required to issue the Disconnected callback before Disconnect() returns
connection?.Disconnect();
Debug.Assert( connection == null );
}
connection?.Disconnect();
}

/// <summary>
Expand Down

0 comments on commit 517d052

Please sign in to comment.