I am in the process of writing a Cloud application (mostly hobby, learning) that required a very quick cache, queue, and messaging system. I have looked at a few different options from Microsoft (hosting is on Azure) and all seem to be slow (for my relative needs). Then I hit Redis, and the speed was right where I needed it to be for what I am using.
Other thoughts that I had before using this, is that I also want to keep my usage of components down to a minimum, in case I need to move from Azure to baremetal, etc, I can always host my own Redis.
I decided, for both learning and for sport, to write a queue system that could work as a AtMostOnce
or AtLeastOnce
that would be reliable over system failure. This class should also be able to be run on multiple machines (in this case workerroles
) and be instanced by either IoC or manually.
The following is what I have so far, before I take care of some of the problems I have not implemented yet (cancellationTokens
, shared ConnectionMultiplexer
for instance). The following code does work as I have tested it on 3 different WorkerRoles
instances, while also testing crashes and reboots. My concerns are aimed more at problems that I don't see, performance issues, and my lack of general experience. Feel free to tell me if I'm doing anything wrong, but be aware that I am aware there are packages out there already. I just like to do things myself.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace CitySurvival.WorkerCommon
{
/// <summary>
/// Needed: (2) Redis queues 1 for new messages, 1 for currently processing messages
/// Needed: processing messages list is FILO
///
/// The queues will only contain the key to the message in redis, which is stored as
/// a single entity for quick lookup
///
/// jobQueue -- processingQueue
/// job:1 job:2
///
/// job:1 (job to do index 1)
/// job:2 (job to do index 2)
///
/// Finish method, will LREM key, and Remove Key from database
///
/// ON adding a new job, send a Publish to say a new job is added
///
/// ON taking a job, RPOPLPUSH from jobQueue to processingQueue
///
/// Checking for failed jobs, experation time 10 seconds (this should be long enough
/// to process anything)
/// If job stays in processingQueue for longer than timeout, RPOPLPUSH to jobQueue
///
/// TODO: cancellationTokens (being in with autofac for global token or use Factory param)
/// TODO: Get ConnectionMultiplexer from Constructor, or Factory
/// </summary>
public class RedisJobQueue
{
public delegate RedisJobQueue Factory(string jobName);
private IConnectionMultiplexer ConnectionMultiplexer => _lazyConnection.Value;
private readonly Lazy<IConnectionMultiplexer> _lazyConnection = new Lazy<IConnectionMultiplexer>(() => StackExchange.Redis.ConnectionMultiplexer.Connect("ConnctionString"));
private readonly string _jobQueue;
private readonly string _processingQueue;
private readonly string _subChannel;
private readonly string _jobName;
private Task _managementTask;
private bool _receiving;
public event EventHandler<JobReceivedEventArgs> OnJobReceived;
public RedisJobQueue(/*ConnectionMultiplexer multiplexer, */string jobName)
{
//_connectionMultiplexer = multiplexer;
_jobQueue = jobName + ":jobs";
_processingQueue = jobName + ":process";
_subChannel = jobName + ":channel";
_jobName = jobName;
}
private IDatabase Database => ConnectionMultiplexer.GetDatabase();
/// <summary>
/// When a job is finished, remove it from the processingQueue and from the
/// cache database.
/// </summary>
/// <param name="key"></param>
/// <param name="failed">Operation failed, requeue for another attempt</param>
public async Task Finish(string key, bool failed = false)
{
var db = Database;
await db.ListRemoveAsync(_processingQueue, key);
if (failed)
{
// How many times to fail before dead
if (await db.HashExistsAsync(key, "failedcount"))
{
var count = await db.HashGetAsync(key, "failedcount");
if (count.IsInteger)
{
if ((int) count >= 10)
{
// for now, delete the key, later we might integrate a dead message
// queue
await db.KeyDeleteAsync(key);
return;
}
}
}
db.HashIncrement(key, "failedcount");
db.HashDelete(key, "active");
db.ListRightPush(_jobQueue, key);
ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
}
else
{
// Job was successfully run, remove the key
await db.KeyDeleteAsync(key);
}
}
/// <summary>
/// Do we consume messages from the queue
/// </summary>
/// <returns></returns>
public RedisJobQueue AsConsumer()
{
var sub = ConnectionMultiplexer.GetSubscriber();
sub.Subscribe(_subChannel, (channel, value) => HandleNewJobs());
// Assume on starting that we have jobs waiting to be handled
HandleNewJobs();
return this;
}
/// <summary>
/// Runs a Task every 10 seconds to see if any remaining items are in
/// processing queue
/// </summary>
/// <returns></returns>
public RedisJobQueue AsManager()
{
_managementTask = Task.Factory.StartNew(async () =>
{
while (true)
{
await Task.Delay(10000);
var timeToKill = (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds - 10000;
RedisValue[] values = Database.ListRange(_processingQueue);
foreach (var value in from value in values let activeTime = (double)Database.HashGet((string)value, "active") where activeTime < timeToKill select value)
{
await Finish(value, true);
}
}
});
return this;
}
/// <summary>
/// Move key from JobQueue to processingQueue, get key value from cache.
///
/// Also set the active field. Indicates when job was retrieved so we can monitor
/// its time.
/// </summary>
/// <returns></returns>
private Dictionary<RedisValue, RedisValue> GetJob()
{
Dictionary<RedisValue, RedisValue> value;
while (true)
{
string key = Database.ListRightPopLeftPush(_jobQueue, _processingQueue);
// If key is null, then nothing was there to get, so no value is available
if (string.IsNullOrEmpty(key))
{
value = new Dictionary<RedisValue, RedisValue>();
break;
}
Database.HashSet(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
value = Database.HashGetAll(key).ToDictionary();
if (value.Count == 0)
{
Database.ListRemove(_processingQueue, key);
continue;
}
value.Add("key", key);
break;
}
return value;
}
/// <summary>
/// Move key from JobQueue to processingQueue, get key value from cache.
///
/// Also set the active field. Indicates when job was retrieved so we can monitor
/// its time.
/// </summary>
/// <returns></returns>
private async Task<Dictionary<RedisValue, RedisValue>> GetJobAsync()
{
var db = Database;
Dictionary<RedisValue, RedisValue> value;
while (true)
{
string key = await db.ListRightPopLeftPushAsync(_jobQueue, _processingQueue);
// If key is null, then nothing was there to get, so no value is available
if (string.IsNullOrEmpty(key))
{
value = new Dictionary<RedisValue, RedisValue>();
break;
}
await db.HashSetAsync(key, "active", (DateTime.UtcNow - new DateTime(1970, 1, 1)).TotalMilliseconds);
value = (await db.HashGetAllAsync(key)).ToDictionary();
// if Count is 0, remove it and check for the next job
if (value.Count == 0)
{
await db.ListRemoveAsync(_processingQueue, key);
continue;
}
value.Add("key", key);
break;
}
return value;
}
/// <summary>
/// We have received an indicator that new jobs are available
/// We process until we are out of jobs.
/// </summary>
private async void HandleNewJobs()
{
if (_receiving)
{
Trace.WriteLine("Already Receiving Jobs...");
return;
}
_receiving = true;
Trace.WriteLine("Trying to get jobs...");
var job = await GetJobAsync();
// If a valid job cannot be found, it will return an empty Dictionary
while (job.Count != 0)
{
// Fire the Event
OnJobReceived?.Invoke(this, new JobReceivedEventArgs(job, job["key"]));
// Get a new job if there is one
job = await GetJobAsync();
}
_receiving = false;
}
/// <summary>
/// Add a job to the Queue
/// </summary>
/// <param name="job"></param>
public void AddJob(RedisValue job)
{
if (job.IsNullOrEmpty) return;
var id = Database.StringIncrement(_jobName + ":jobid");
var key = _jobName + ":" + id;
Database.HashSet(key, "payload", job);
Database.ListLeftPush(_jobQueue, key);
ConnectionMultiplexer.GetSubscriber().Publish(_subChannel, "");
Trace.WriteLine("Added Job");
}
/// <summary>
/// Add a job to the Queue (async)
/// </summary>
/// <param name="job"></param>
public async Task AddJobAsync(RedisValue job)
{
if (job.IsNullOrEmpty) return;
var id = await Database.StringIncrementAsync(_jobName + ":jobid");
var key = _jobName + ":" + id;
await Database.HashSetAsync(key, "payload", job);
await Database.ListLeftPushAsync(_jobQueue, key);
await ConnectionMultiplexer.GetSubscriber().PublishAsync(_subChannel, "");
Trace.WriteLine("Added Job");
}
}
}
New Code
I have added and edited a few items from the original code, have also incorperated some changes listed below. The new code can be found on Github in this gist GitHub Gist for Redis Job/Message Queue