This is small piece of bigger puzzle, but usable by its own. It an attempt to have a nonblocking queue with "unlimited" (besides memory size) buffer length.
Got unit-tests 100% statement coverage, it looks that it's ok, but I'm very new to Go and concurrency so I might miss something.
import "sync"
// Message and MessageQueuer are interfaces, defined in other file
type msgQueue struct {
input, output chan Message
buffer []Message
head, tail, count int
immediateClose bool
done sync.WaitGroup
}
func NewQueue() MessageQueuer {
mq := &msgQueue{
input: make(chan Message),
output: make(chan Message),
buffer: make([]Message, 16),
}
mq.done.Add(1)
go mq.magicBuffer()
return mq
}
func (mq *msgQueue) In() chan<- Message {
return mq.input
}
func (mq *msgQueue) Out() <-chan Message {
return mq.output
}
func (mq *msgQueue) Len() int {
return mq.count
}
func (mq *msgQueue) Close() {
close(mq.input)
}
func (mq *msgQueue) CloseAndWait() {
close(mq.input)
mq.done.Wait()
}
func (mq *msgQueue) ClearAndClose() {
mq.immediateClose = true
mq.count = 0
close(mq.input)
}
func (mq *msgQueue) enqueue(msg Message) {
if mq.head == mq.tail && mq.count > 0 {
buffer := make([]Message, len(mq.buffer)*2)
copy(buffer, mq.buffer[mq.head:])
copy(buffer[len(mq.buffer)-mq.head:], mq.buffer[:mq.head])
mq.head = 0
mq.tail = len(mq.buffer)
mq.buffer = buffer
}
mq.buffer[mq.tail] = msg
mq.tail = (mq.tail + 1) % len(mq.buffer)
mq.count++
}
func (mq *msgQueue) dequeue() Message {
msg := mq.buffer[mq.head]
mq.head = (mq.head + 1) % len(mq.buffer)
mq.count--
return msg
}
func (mq *msgQueue) shutdown() {
defer mq.done.Done()
if mq.immediateClose {
mq.buffer = nil
}
for mq.count > 0 {
mq.output <- mq.dequeue()
}
close(mq.output)
}
func (mq *msgQueue) magicBuffer() {
for {
if mq.count == 0 {
msg, open := <-mq.input
if open && !mq.immediateClose {
mq.enqueue(msg)
} else {
mq.shutdown()
return
}
} else {
select {
case msg, open := <-mq.input:
if open && !mq.immediateClose {
mq.enqueue(msg)
} else {
mq.shutdown()
return
}
case mq.output <- mq.buffer[mq.head]:
// dequeue
mq.head = (mq.head + 1) % len(mq.buffer)
mq.count--
}
}
}
}