Code Review Stack Exchange is a question and answer site for peer programmer code reviews. Join them; it only takes a minute:

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

I'm working with TCP Sockets and I've build a simple packet struct that turns it into a datagram protocol. I've created a class which returns "promise" Tasks for Packets. Packets are very simple, they consist of:

struct TCPPacket
{
    unsigned short size; // contains size-1
                         // such that size ranges from 1 to 65536 bytes
    byte data[size];
}

Here is the implementation:

public class SocketASyncEventArgsEx
{
    private const int STATE_IDLE = 0;
    private const int STATE_RECEIVING = 1;
    private const int STATE_SENDING = 2;

    private SocketAsyncEventArgs e;
    private TaskCompletionSource<int> receiveTaskSource;
    private TaskCompletionSource<bool> sendTaskSource;

    private int state;

    private volatile byte[] buffer;
    private volatile int offset;
    private volatile int count;

    private byte[] size;

    public SocketASyncEventArgsEx()
    {
        e = new SocketAsyncEventArgs();
        e.Completed += E_Completed;
        size = new byte[sizeof(ushort)];
    }

    public Task<int> ReceivePacketAsync(Socket socket, byte[] buffer, int offset, int count)
    {
        if (socket == null)
            throw new ArgumentNullException("socket");
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (offset < 0 || offset > buffer.Length)
            throw new ArgumentOutOfRangeException("offset");
        if (count <= 0 || count > (ushort.MaxValue + 1) || count > buffer.Length - offset)
            throw new ArgumentOutOfRangeException("count");

        if (Interlocked.CompareExchange(ref state, STATE_RECEIVING, STATE_IDLE) != STATE_IDLE)
            throw new InvalidOperationException("Operation already in progress.");

        this.buffer = buffer;
        this.offset = offset;
        this.count = count;

        receiveTaskSource = new TaskCompletionSource<int>();
        e.SetBuffer(size, 0, sizeof(ushort));
        socket.ReceiveAsync(e);

        return receiveTaskSource.Task;
    }

    public Task<bool> SendPacketAsync(Socket socket, byte[] buffer, int offset, int count)
    {
        if (socket == null)
            throw new ArgumentNullException("socket");
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (offset < 0 || offset > buffer.Length)
            throw new ArgumentOutOfRangeException("offset");
        if (count <= 0 || count > (ushort.MaxValue + 1) || count > buffer.Length - offset)
            throw new ArgumentOutOfRangeException("count");

        if (Interlocked.CompareExchange(ref state, STATE_SENDING, STATE_IDLE) != STATE_IDLE)
            throw new InvalidOperationException("Operation already in progress.");

        sendTaskSource = new TaskCompletionSource<bool>();
        this.buffer = buffer;
        this.offset = offset;
        this.count = count;

        size[0] = (byte)((count - 1) >> 8);
        size[1] = (byte)((count - 1) >> 0);
        e.SetBuffer(size, 0, sizeof(ushort));
        socket.SendAsync(e);

        return sendTaskSource.Task;
    }

    private void E_Completed(object sender, SocketAsyncEventArgs e)
    {
        if (e.LastOperation == SocketAsyncOperation.Receive)
            HandleReceiveCompleted(sender, e);
        else if (e.LastOperation == SocketAsyncOperation.Send)
            HandleSendCompleted(sender, e);
    }

    private void HandleReceiveCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            if (e.BytesTransferred > 0)
            {
                int bytesLeft = e.Count - e.BytesTransferred;

                if (bytesLeft > 0)
                {
                    int byteOffset = e.Offset + e.BytesTransferred;
                    e.SetBuffer(byteOffset, bytesLeft);
                    ((Socket)sender).ReceiveAsync(e);
                }
                else
                {
                    byte[] bufferRef = buffer;

                    if (bufferRef != null)
                    {
                        int packetSize = ((size[0] << 8) | (size[1] << 0)) + 1;

                        if (packetSize > count)
                        {
                            receiveTaskSource.TrySetException(new IOException("The buffer specified was not large enough."));
                            return;
                        }

                        buffer = null;
                        e.SetBuffer(bufferRef, offset, packetSize);
                        ((Socket)sender).ReceiveAsync(e);
                    }
                    else
                    {
                        receiveTaskSource.TrySetResult(e.Offset + e.Count - offset);
                        Volatile.Write(ref state, STATE_IDLE);
                    }
                }
            }
            else
            {
                receiveTaskSource.TrySetResult(0);
                Volatile.Write(ref state, STATE_IDLE);
            }
        }
        else
        {
            receiveTaskSource.TrySetException(new SocketException((int)e.SocketError));
            Volatile.Write(ref state, STATE_IDLE);
        }
    }

    private void HandleSendCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            if (e.BytesTransferred > 0)
            {
                int bytesLeft = e.Count - e.BytesTransferred;

                if (bytesLeft > 0)
                {
                    int byteOffset = e.Offset + e.BytesTransferred;

                    e.SetBuffer(byteOffset, bytesLeft);
                    ((Socket)sender).SendAsync(e);
                }
                else
                {
                    byte[] bufferRef = buffer;

                    if (bufferRef != null)
                    {
                        buffer = null;
                        e.SetBuffer(bufferRef, offset, count);
                        ((Socket)sender).SendAsync(e);
                    }
                    else
                    {
                        sendTaskSource.TrySetResult(true);
                        Volatile.Write(ref state, STATE_IDLE);
                    }
                }
            }
            else
            {
                sendTaskSource.TrySetResult(false);
                Volatile.Write(ref state, STATE_IDLE);
            }
        }
        else
        {
            sendTaskSource.TrySetException(new SocketException((int)e.SocketError));
            Volatile.Write(ref state, STATE_IDLE);
        }
    }
}

My question is, do the buffer, offset and count fields need to be volatile? The event callback E_Completed comes from a bound ThreadPool handle, which may be the same thread every time or a different thread every time.

share|improve this question
    
Welcome to Code Review! I hope you get some helpful answer. – SirPython Jan 12 at 0:17
up vote 1 down vote accepted
  • You have some code duplication in the ReceivePacketAsync and SendPacketAsync regarding the validation of the method parameters which should be extracted to a separate method.

  • If you revert the condition of the first if in HandleSendCompleted and HandleReceiveCompleted you can return early and by omitting the else you will save some horizontal spacing.

  • omitting braces {} although they are optional is considered to be bad practice which can bite you in the future.

share|improve this answer

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.