I am studying multi-threading and have tried to implement a thread pool. Please provide feedback, mostly from multi-threading point of view. I have implemented two pools, one for adding tasks in queue and the other for taking and executing the requests.
PoolManager.java
package com.learn.threading.threadpool;
public class PoolManager {
ServiceThreadPool servicePool;
RequestThreadPool requestPool;
public static void main(String[] args) {
PoolManager poolManager = new PoolManager();
poolManager.start();
try{
Thread.sleep(4000);
}catch(InterruptedException ex){
ex.printStackTrace();
}finally {
poolManager.stop();
}
}
public void start(){
Queue queue = new Queue(10);
servicePool = new ServiceThreadPool(10,queue);
requestPool = new RequestThreadPool(1000,queue);
servicePool.start();
requestPool.start();
}
public void stop(){
requestPool.stop();
servicePool.stop();
}
}
ServiceThreadPool.java
package com.learn.threading.threadpool;
public class ServiceThreadPool {
private int numOfThreads;
private Processor[] threads;
public ServiceThreadPool(int numOFthreads, Queue queue) {
this.numOfThreads = numOFthreads;
threads = new Processor[numOFthreads];
for(int i = 0; i < numOFthreads; i++){
threads[i] = new Processor(queue, "Processor_"+i+" ");
}
}
public void start(){
for(int i = 0; i < numOfThreads; i++){
threads[i].start();
}
}
public void stop(){
for(int i = 0; i < numOfThreads; i++){
threads[i].interrupt();
}
}
}
RequestThreadPool.java
package com.learn.threading.threadpool;
public class RequestThreadPool {
private int numOfThreads;
private Request[] threads;
public RequestThreadPool(int numOFthreads, Queue queue) {
this.numOfThreads = numOFthreads;
threads = new Request[numOFthreads];
for(int i = 0; i < numOFthreads; i++){
threads[i] = new Request(queue,"Request_"+i+" ");
}
}
public void start(){
for(int i = 0; i < numOfThreads; i++){
threads[i].start();
}
}
public void stop(){
for(int i = 0; i < numOfThreads; i++){
threads[i].interrupt();
}
}
}
Request.java
package com.learn.threading.threadpool;
public class Request extends Thread {
private Queue queue;
private String name;
volatile private static int unique = 1;
public Request(Queue queue, String name) {
this.queue = queue;
this.name = name;
}
public void run() {
while (!isInterrupted()) {
try {
Task task = new Task(unique++);
synchronized (queue) {
while (queue.isFull()) {
queue.wait();
}
queue.add(task);
System.out.println(task +" added in Queue by "+name);
queue.notifyAll();
}
} catch (InterruptedException ex) {
break;
}
}
}
}
Processor.java
package com.learn.threading.threadpool;
public class Processor extends Thread {
private volatile Queue queue;
private String name;
public Processor(Queue queue, String name) {
this.queue = queue;
this.name = name;
}
@Override
public void run() {
while (!isInterrupted()) {
try {
Runnable task;
synchronized (queue) {
while (queue.isEmpty()) {
queue.wait();
}
task = queue.remove();
System.out.println(task +" taken from Queue by "+name);
queue.notifyAll();
}
task.run();
} catch (InterruptedException ex) {
break;
}
}
}
}
Queue.java
package com.learn.threading.threadpool;
public class Queue {
Runnable[] requests;
int maxSize;
int size;
public Queue(int maxSize){
this.maxSize = maxSize;
requests = new Task[maxSize];
}
public void add(Runnable task){
requests[size++] = task;
}
public Runnable remove(){
Runnable task = requests[size-1];
requests[size-1] = null;
size--;
return task;
}
public boolean isFull(){
return size == maxSize;
}
public boolean isEmpty(){
return size == 0;
}
}
Task.java
package com.learn.threading.threadpool;
public class Task implements Runnable {
private int input;
public Task(int input){
this.input = input;
}
public void run() {
int result = 0;
int temp = input;
while(temp > 1){
result = result+temp;
temp = temp-1;
}
System.out.println("sum of 1st "+input+" numbers is "+result);
}
public String toString(){
return "Task "+input;
}
}