I have implemented some methods to make the creation of multithreaded programs easier for me in the future
The requirement was:
- Be able to add as many Tasks as I need to the pool
- Be able to restrict the amount of concurrently running threads
- Be able to set the thread creation speed
- Be able to cancel the thread creation/currently running threads
This is the code i came up with, note I do not know the conventions of Thread safety and design patterns usually used in multithreaded programming in c# or any other language.
This is a first attempt at a topic im interested in, The point of the review is to see if there are improvements I can make to my code and learn some new methods to make it tidier and easier to understand
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MTSimplified
{
public class AlreadyStartedException : Exception { public AlreadyStartedException() { } }
public class NotStartedException : Exception { public NotStartedException(){ } }
class ThreadOrganizer
{
private List<Thread> threadList;
private int maxThreads,creationInterval,finishedThreads,currentThreadIndex;
private bool keepCreating,started;
private CancellationTokenSource cts;
/* entire pool */
public event EventHandler onDone;
public event EventHandler onStopped;
/* per thread */
public event EventHandler onThreadCreate;
public event EventHandler onThreadFinish;
public ThreadOrganizer(int maxThreads, int creationInterval)
{
this.maxThreads = maxThreads;
this.creationInterval = creationInterval;
this.threadList = new List<Thread>();
this.finishedThreads = 0;
this.currentThreadIndex = 0;
this.cts = new CancellationTokenSource();
this.keepCreating = true;
this.started = false;
}
public void addTask(Action function)
{
if (started)
throw new AlreadyStartedException();
threadList.Add(new Thread(delegate ()
{
functionWrapper(function,cts.Token);
}));
}
public void startAll()
{
if (started)
throw new AlreadyStartedException();
this.started = true;
int targetFinish = threadList.Count;
Task.Run(() => {
while (finishedThreads != targetFinish && currentThreadIndex != targetFinish && keepCreating)
{
if (threadList.Count(t => t.IsAlive) < maxThreads)
{
startThread();
Thread.Sleep(creationInterval);
}
}
if (!keepCreating)
return;
while (threadList.Count(t => t.IsAlive) != 0) ;
onDone?.Invoke(this, EventArgs.Empty);
});
}
public void stopAll()
{
if (!started)
throw new NotStartedException();
this.cts.Cancel();
this.keepCreating = false;
while (threadList.Count(t => t.IsAlive) != 0) ;
onStopped?.Invoke(this, EventArgs.Empty);
}
/* private methods */
private void functionWrapper(Action function, CancellationToken ct)
{
onThreadCreate?.Invoke(this, EventArgs.Empty);
function.Invoke();
this.finishedThreads++;
onThreadFinish?.Invoke(this, EventArgs.Empty);
}
private void startThread()
{
threadList[currentThreadIndex].Start();
currentThreadIndex++;
}
}
}
To test the class
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace MTSimplified
{
class Program
{
static int i = 0;
static void Main(string[] args)
{
ThreadOrganizer threadO = new ThreadOrganizer(10,100);
threadO.onDone += (s,e) => done();
threadO.onStopped += (s, e) => canceled();
threadO.onThreadCreate += (s, e) => created();
threadO.onThreadFinish += (s, e) => finished();
for (int i = 0; i<200;i++)
threadO.addTask(print);
threadO.startAll();
Console.ReadLine();
threadO.stopAll();
while (true) ;
}
public static void print()
{
Thread.Sleep(1000);
i++;
Console.WriteLine(i);
}
public static void created()
{
Console.WriteLine("created");
}
public static void finished()
{
Console.WriteLine("finished");
}
public static void done()
{
Console.WriteLine("Everything is done");
}
public static void canceled()
{
Console.WriteLine("Everything is canceled");
}
}
}