This is a FIFO blocking memory queue in F# to be used as a component in a logging library.
namespace Primitives
open System
open System.Threading
open System.Threading.Tasks
type QueueOpResult<'T> =
| Success of 'T
| Failure of exn
| Cancellation
type QueueOpCancelType =
| Timeout of TimeSpan
| WaitHandle of WaitHandle
module BlockingQueue =
open System.Diagnostics
type MemoryQueueSlot<'elem> = {
mutable timestamp: int64
mutable entry: 'elem
}
type QueueMetrics = {
TotalReadWaitTime: double
TotalWriteWaitTime: double
AvgReadWaitTime: double
AvgWriteWaitTime: double
TotalReads: int64
TotalWrites: int64
TotalMessageQueueTime: double
AvgMessageQueueTime: double
TotalMessagePassthroughCount: int64
}
type CancelScope(cancType) =
let mutable isDisposed = false
let disposerAndWH =
match cancType with
| Timeout ts ->
let cancelWH = new EventWaitHandle(false, EventResetMode.ManualReset)
let rec tim = new System.Threading.Timer(
(fun (o:obj) ->
let ewh = (o :?> EventWaitHandle)
ewh.Set() |> ignore
), cancelWH, Timeout.Infinite, Timeout.Infinite)
let res () =
tim.Dispose()
cancelWH.Dispose()
tim.Change(ts, Timeout.InfiniteTimeSpan) |> ignore
(res, cancelWH :> WaitHandle)
| WaitHandle wh ->
let res () =
wh.Dispose()
(res, wh)
member __.WaitHandle = disposerAndWH |> snd
interface IDisposable with
member x.Dispose() =
if not isDisposed then
isDisposed <- true
(disposerAndWH |> fst) ()
type MemoryQueue<'elem>(capacity) as self =
let items : 'elem MemoryQueueSlot array =
let timestamp = Diagnostics.Stopwatch.GetTimestamp()
Array.init
capacity
(fun _ -> { timestamp = timestamp; entry = Unchecked.defaultof<_> } )
#if DEBUG
let mutable totalSuccessReadWaitTime = 0L
let mutable totalSuccessWriteWaitTime = 0L
let mutable totalSuccessReads = 0L
let mutable totalSuccessWrites = 0L
let mutable totalMessageQueueTime = 0L
let mutable totalMessagePassthroughCount = 0L
#endif
let mutable readIndex = 0
let mutable writeIndex = 0
let slotsAvailableWaitHandle = new EventWaitHandle(true, EventResetMode.ManualReset)
let itemsAvailableWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset)
let readAvailableWaitHandle = new EventWaitHandle(true, EventResetMode.AutoReset)
let writeAvailableWaitHandle = new EventWaitHandle(true, EventResetMode.AutoReset)
let disposingWaitHandle = new EventWaitHandle(false, EventResetMode.ManualReset)
let elemPostedEvent = new Event<'elem>()
let elemReadEvent = new Event<'elem>()
let isEmpty () = (readIndex = writeIndex)
let isFull () = ((writeIndex + 1) % capacity) = readIndex
#if DEBUG
let metrics_StartWrite () =
Diagnostics.Stopwatch.GetTimestamp()
let metrics_WriteEndWait startWaitTime timestamp =
Thread.MemoryBarrier()
totalSuccessWrites <- totalSuccessWrites + 1L
Thread.MemoryBarrier()
totalSuccessWriteWaitTime <- (timestamp - startWaitTime)
let metrics_StartRead () =
Diagnostics.Stopwatch.GetTimestamp()
let metrics_ReadEndWait startWaitTime =
let timestamp = Diagnostics.Stopwatch.GetTimestamp()
Thread.MemoryBarrier()
totalSuccessReads <- totalSuccessReads + 1L
Thread.MemoryBarrier()
totalSuccessReadWaitTime <- (timestamp - startWaitTime)
timestamp
let metrics_ReadComplete endWaitTime timestamp =
Thread.MemoryBarrier()
totalMessageQueueTime <- (endWaitTime - timestamp)
Thread.MemoryBarrier()
totalMessagePassthroughCount <- totalMessagePassthroughCount + 1L
#endif
let waitAny whs =
try
match WaitHandle.WaitAny(whs) with
| 0 -> Success ()
| _ -> Cancellation
with e ->
Failure e
let asyncOpWait
(wh: WaitHandle) cancelType =
match cancelType with
| Some cT ->
use cScope = new CancelScope(cT)
let whs = [| wh; cScope.WaitHandle; disposingWaitHandle :> WaitHandle |]
waitAny whs
| None ->
let whs = [| wh; disposingWaitHandle :> WaitHandle |]
waitAny whs
let asyncOpWaitUntilCanWrite = asyncOpWait writeAvailableWaitHandle
let asyncOpWaitUntilCanRead = asyncOpWait readAvailableWaitHandle
let asyncOpWaitUntilHasSlots = asyncOpWait slotsAvailableWaitHandle
let asyncOpWaitUntilHasItems = asyncOpWait itemsAvailableWaitHandle
let postSync
item
cType =
#if DEBUG
let startWaitTime = metrics_StartWrite()
#endif
match asyncOpWaitUntilCanWrite cType with
| Success () ->
let rec waitAvailSl () =
if isFull () then
match asyncOpWaitUntilHasSlots cType with
| Success () ->
waitAvailSl ()
| Failure e ->
writeAvailableWaitHandle.Set() |> ignore
Failure e
| Cancellation ->
writeAvailableWaitHandle.Set() |> ignore
Cancellation
else
let timestamp = Diagnostics.Stopwatch.GetTimestamp()
#if DEBUG
metrics_WriteEndWait startWaitTime timestamp
#endif
Thread.MemoryBarrier()
let wi = writeIndex
let slot = items.[wi]
slot.timestamp <- timestamp
slot.entry <- item
writeIndex <- (wi + 1) % capacity
Thread.MemoryBarrier()
if isFull() then
slotsAvailableWaitHandle.Reset() |> ignore
itemsAvailableWaitHandle.Set() |> ignore
writeAvailableWaitHandle.Set() |> ignore
elemPostedEvent.Trigger(item)
Success ()
waitAvailSl()
| Failure e -> Failure e
| Cancellation -> Cancellation
let readSync
cType =
#if DEBUG
let startWaitTime = metrics_StartRead()
#endif
match asyncOpWaitUntilCanRead cType with
| Success () ->
let rec waitAvailIt () =
if isEmpty () then
match asyncOpWaitUntilHasItems cType with
| Success () ->
waitAvailIt ()
| Failure e ->
readAvailableWaitHandle.Set() |> ignore
Failure e
| Cancellation ->
readAvailableWaitHandle.Set() |> ignore
Cancellation
else
let timestamp = Diagnostics.Stopwatch.GetTimestamp()
#if DEBUG
let endWaitTime = metrics_ReadEndWait startWaitTime
#endif
Thread.MemoryBarrier()
let ri = readIndex
let slot = items.[ri]
let res = slot.entry
let timestamp = slot.timestamp
slot.entry <- Unchecked.defaultof<_>
readIndex <- (ri + 1) % capacity
Thread.MemoryBarrier()
#if DEBUG
metrics_ReadComplete endWaitTime timestamp
#endif
if isEmpty() then
itemsAvailableWaitHandle.Reset() |> ignore
slotsAvailableWaitHandle.Set() |> ignore
readAvailableWaitHandle.Set() |> ignore
elemReadEvent.Trigger(res)
Success (res, timestamp)
waitAvailIt()
| Failure e -> Failure e
| Cancellation -> Cancellation
let postAsync item cType =
async {
let! postResult =
Task.Factory.StartNew(fun () -> postSync item cType)
|> Async.AwaitTask
return postResult
}
let readAsync cType =
async {
let! readResult =
Task.Factory.StartNew(fun () -> readSync cType)
|> Async.AwaitTask
return readResult
}
member __.ElementPosted = elemPostedEvent.Publish
member __.ElementRead = elemReadEvent.Publish
member __.TryGetMetrics () =
#if DEBUG
let (!) x = double(x)
let (!!) x = (!x / !(Diagnostics.Stopwatch.Frequency)) * 1000.0
Thread.MemoryBarrier()
{
TotalMessagePassthroughCount = totalMessagePassthroughCount
TotalMessageQueueTime = !!totalMessageQueueTime
TotalReadWaitTime = !!totalSuccessReadWaitTime
TotalReads = totalSuccessReads
TotalWriteWaitTime = !!totalSuccessWriteWaitTime
TotalWrites = totalSuccessWrites
AvgMessageQueueTime = !!totalMessageQueueTime / !totalMessagePassthroughCount
AvgReadWaitTime = !!totalSuccessReadWaitTime / !totalSuccessReads
AvgWriteWaitTime = !!totalSuccessWriteWaitTime / !totalSuccessWrites
} |> Some
#else
None
#endif
member __.Count =
Thread.MemoryBarrier()
let wI = writeIndex
let rI = readIndex
if (wI >= rI) then (wI - rI) else (capacity - rI + wI)
member __.PostBlocking(item, ?cancelType) =
postSync item cancelType
member __.Post(item, ?cancelType) =
let op = postAsync item cancelType |> Async.Ignore
Async.Start(op)
member __.PostAsync(item, ?cancelType) =
postAsync item cancelType
member __.ReadBlocking(?cancelType) =
readSync cancelType
member __.ReadAsync(?cancelType) =
readAsync cancelType
member __.Dispose() =
disposingWaitHandle.Set() |> ignore
disposingWaitHandle.Dispose()
slotsAvailableWaitHandle.Dispose()
itemsAvailableWaitHandle.Dispose()
readAvailableWaitHandle.Dispose()
writeAvailableWaitHandle.Dispose()
interface IDisposable with
member x.Dispose() =
self.Dispose()
Example usage:
let makeProducer
msgNum
(q: BlockingQueue.MemoryQueue<_>) =
async {
for i = 1 to msgNum do
let! pr = q.PostAsync(i)
match pr with
| Failure e -> Console.WriteLine("Error: {0}", e.Message)
| Cancellation -> Console.WriteLine("Cancellation")
| _ -> () // do something after posting
return ()
}
let makeConsumer
(q: BlockingQueue.MemoryQueue<_>) =
async {
let mutable active = true
while active do
let! rr = q.ReadAsync(Timeout(TimeSpan.FromSeconds(1.0)))
match rr with
| Failure e -> Console.WriteLine("Error: {0}", e.Message)
| Cancellation -> active <- false
| _ -> () // do something with read value
return ()
}