Take the 2-minute tour ×
Code Review Stack Exchange is a question and answer site for peer programmer code reviews. It's 100% free, no registration required.

This is small piece of bigger puzzle, but usable by its own. It an attempt to have a nonblocking queue with "unlimited" (besides memory size) buffer length.

Got unit-tests 100% statement coverage, it looks that it's ok, but I'm very new to Go and concurrency so I might miss something.

import "sync"

// Message and MessageQueuer are interfaces, defined in other file

type msgQueue struct {
    input, output     chan Message
    buffer            []Message
    head, tail, count int
    immediateClose    bool
    done              sync.WaitGroup
}

func NewQueue() MessageQueuer {
    mq := &msgQueue{
        input:  make(chan Message),
        output: make(chan Message),
        buffer: make([]Message, 16),
    }

    mq.done.Add(1)

    go mq.magicBuffer()

    return mq
}

func (mq *msgQueue) In() chan<- Message {
    return mq.input
}

func (mq *msgQueue) Out() <-chan Message {
    return mq.output
}

func (mq *msgQueue) Len() int {
    return mq.count
}

func (mq *msgQueue) Close() {
    close(mq.input)
}

func (mq *msgQueue) CloseAndWait() {
    close(mq.input)
    mq.done.Wait()
}

func (mq *msgQueue) ClearAndClose() {
    mq.immediateClose = true
    mq.count = 0
    close(mq.input)
}

func (mq *msgQueue) enqueue(msg Message) {
    if mq.head == mq.tail && mq.count > 0 {
        buffer := make([]Message, len(mq.buffer)*2)

        copy(buffer, mq.buffer[mq.head:])
        copy(buffer[len(mq.buffer)-mq.head:], mq.buffer[:mq.head])

        mq.head = 0
        mq.tail = len(mq.buffer)

        mq.buffer = buffer
    }

    mq.buffer[mq.tail] = msg
    mq.tail = (mq.tail + 1) % len(mq.buffer)
    mq.count++
}

func (mq *msgQueue) dequeue() Message {
    msg := mq.buffer[mq.head]

    mq.head = (mq.head + 1) % len(mq.buffer)
    mq.count--

    return msg
}

func (mq *msgQueue) shutdown() {
    defer mq.done.Done()

    if mq.immediateClose {
        mq.buffer = nil
    }

    for mq.count > 0 {
        mq.output <- mq.dequeue()
    }

    close(mq.output)
}

func (mq *msgQueue) magicBuffer() {
    for {
        if mq.count == 0 {
            msg, open := <-mq.input

            if open && !mq.immediateClose {
                mq.enqueue(msg)
            } else {
                mq.shutdown()

                return
            }
        } else {
            select {
            case msg, open := <-mq.input:
                if open && !mq.immediateClose {
                    mq.enqueue(msg)
                } else {
                    mq.shutdown()

                    return
                }

            case mq.output <- mq.buffer[mq.head]:
                // dequeue
                mq.head = (mq.head + 1) % len(mq.buffer)
                mq.count--
            }
        }
    }
}
share|improve this question
add comment

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Browse other questions tagged or ask your own question.