I'm working with TCP Sockets and I've build a simple packet struct that turns it into a datagram protocol. I've created a class which returns "promise" Tasks for Packets. Packets are very simple, they consist of:
struct TCPPacket
{
unsigned short size; // contains size-1
// such that size ranges from 1 to 65536 bytes
byte data[size];
}
Here is the implementation:
public class SocketASyncEventArgsEx
{
private const int STATE_IDLE = 0;
private const int STATE_RECEIVING = 1;
private const int STATE_SENDING = 2;
private SocketAsyncEventArgs e;
private TaskCompletionSource<int> receiveTaskSource;
private TaskCompletionSource<bool> sendTaskSource;
private int state;
private volatile byte[] buffer;
private volatile int offset;
private volatile int count;
private byte[] size;
public SocketASyncEventArgsEx()
{
e = new SocketAsyncEventArgs();
e.Completed += E_Completed;
size = new byte[sizeof(ushort)];
}
public Task<int> ReceivePacketAsync(Socket socket, byte[] buffer, int offset, int count)
{
if (socket == null)
throw new ArgumentNullException("socket");
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0 || offset > buffer.Length)
throw new ArgumentOutOfRangeException("offset");
if (count <= 0 || count > (ushort.MaxValue + 1) || count > buffer.Length - offset)
throw new ArgumentOutOfRangeException("count");
if (Interlocked.CompareExchange(ref state, STATE_RECEIVING, STATE_IDLE) != STATE_IDLE)
throw new InvalidOperationException("Operation already in progress.");
this.buffer = buffer;
this.offset = offset;
this.count = count;
receiveTaskSource = new TaskCompletionSource<int>();
e.SetBuffer(size, 0, sizeof(ushort));
socket.ReceiveAsync(e);
return receiveTaskSource.Task;
}
public Task<bool> SendPacketAsync(Socket socket, byte[] buffer, int offset, int count)
{
if (socket == null)
throw new ArgumentNullException("socket");
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0 || offset > buffer.Length)
throw new ArgumentOutOfRangeException("offset");
if (count <= 0 || count > (ushort.MaxValue + 1) || count > buffer.Length - offset)
throw new ArgumentOutOfRangeException("count");
if (Interlocked.CompareExchange(ref state, STATE_SENDING, STATE_IDLE) != STATE_IDLE)
throw new InvalidOperationException("Operation already in progress.");
sendTaskSource = new TaskCompletionSource<bool>();
this.buffer = buffer;
this.offset = offset;
this.count = count;
size[0] = (byte)((count - 1) >> 8);
size[1] = (byte)((count - 1) >> 0);
e.SetBuffer(size, 0, sizeof(ushort));
socket.SendAsync(e);
return sendTaskSource.Task;
}
private void E_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.LastOperation == SocketAsyncOperation.Receive)
HandleReceiveCompleted(sender, e);
else if (e.LastOperation == SocketAsyncOperation.Send)
HandleSendCompleted(sender, e);
}
private void HandleReceiveCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
if (e.BytesTransferred > 0)
{
int bytesLeft = e.Count - e.BytesTransferred;
if (bytesLeft > 0)
{
int byteOffset = e.Offset + e.BytesTransferred;
e.SetBuffer(byteOffset, bytesLeft);
((Socket)sender).ReceiveAsync(e);
}
else
{
byte[] bufferRef = buffer;
if (bufferRef != null)
{
int packetSize = ((size[0] << 8) | (size[1] << 0)) + 1;
if (packetSize > count)
{
receiveTaskSource.TrySetException(new IOException("The buffer specified was not large enough."));
return;
}
buffer = null;
e.SetBuffer(bufferRef, offset, packetSize);
((Socket)sender).ReceiveAsync(e);
}
else
{
receiveTaskSource.TrySetResult(e.Offset + e.Count - offset);
Volatile.Write(ref state, STATE_IDLE);
}
}
}
else
{
receiveTaskSource.TrySetResult(0);
Volatile.Write(ref state, STATE_IDLE);
}
}
else
{
receiveTaskSource.TrySetException(new SocketException((int)e.SocketError));
Volatile.Write(ref state, STATE_IDLE);
}
}
private void HandleSendCompleted(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
if (e.BytesTransferred > 0)
{
int bytesLeft = e.Count - e.BytesTransferred;
if (bytesLeft > 0)
{
int byteOffset = e.Offset + e.BytesTransferred;
e.SetBuffer(byteOffset, bytesLeft);
((Socket)sender).SendAsync(e);
}
else
{
byte[] bufferRef = buffer;
if (bufferRef != null)
{
buffer = null;
e.SetBuffer(bufferRef, offset, count);
((Socket)sender).SendAsync(e);
}
else
{
sendTaskSource.TrySetResult(true);
Volatile.Write(ref state, STATE_IDLE);
}
}
}
else
{
sendTaskSource.TrySetResult(false);
Volatile.Write(ref state, STATE_IDLE);
}
}
else
{
sendTaskSource.TrySetException(new SocketException((int)e.SocketError));
Volatile.Write(ref state, STATE_IDLE);
}
}
}
My question is, do the buffer, offset and count fields need to be volatile? The event callback E_Completed
comes from a bound ThreadPool
handle, which may be the same thread every time or a different thread every time.