ScalableThreadpool.java

package eu.javaexperience.multithread;

import java.util.Collection;
import java.util.Vector;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import eu.javaexperience.asserts.AssertArgument;
import eu.javaexperience.interfaces.simple.SimpleGet;

public class ScalableThreadpool<T>
{
	protected final SimpleGet<BlockingJob<T>> blockingJobFactory;
	public ScalableThreadpool(SimpleGet<BlockingJob<T>> blockingJobFactory)
	{
		AssertArgument.assertNotNull(this.blockingJobFactory = blockingJobFactory, "blockingJobFactory");
	}
	
	protected Vector<Thread> workers = new Vector<>();
	
	protected AtomicInteger numberOfWorkers = new AtomicInteger();	
	
	public int getNumberOfWorkers()
	{
		return numberOfWorkers.get();
	}
	
	public void setNumberOfWorkers(int initialWorkerCount)
	{
		numberOfWorkers.set(initialWorkerCount);
	}
	
	protected AtomicInteger waitingWorkers = new AtomicInteger();
	
	public int getNumberOfWaitingWorkers()
	{
		return waitingWorkers.get();
	}
	
	protected AtomicInteger workingWorkers = new AtomicInteger();
	
	public int getNumberOfWorkingWorkers()
	{
		return workingWorkers.get();
	}

	public synchronized void setWorkerCount(int newCount) throws InterruptedException
	{
		int current = numberOfWorkers.get();
		numberOfWorkers.set(newCount);
		if(current == newCount)
		{
			return;
		}
		if(newCount <= 0)
		{
			cleanShutdown();
			return;
		}
		else if(newCount < current)
		{
			shrinkWorkers();
			return;
		}
		else
		//if(newCount > current)
		{
			growthWorkers();
		}
	}
	
	protected final AtomicReference<Semaphore> semaphoreForShrink = new AtomicReference<>();
	
	protected synchronized void shrinkWorkers()
	{
		Semaphore sem = new Semaphore(numberOfWorkers.get());
		//return only on successfully shrink
		semaphoreForShrink.set(sem);
		semaphoreForShrink.set(null);
	}
	
	protected synchronized void growthWorkers()
	{
		int plus = numberOfWorkers.get() - workers.size();
		for(int i=0;i<plus;++i)
		{
			Thread t = createNewWorker();
			workers.add(t);
			t.start();
		}
	}
	
	public synchronized void start()
	{
		growthWorkers();
	}
	
	public synchronized void cleanShutdown()
	{
		numberOfWorkers.set(0);
		shrinkWorkers();
	}

	protected Thread createNewWorker()
	{
		return new Thread()
		{
			protected final BlockingJob<T> job = blockingJobFactory.get();
			
			@Override
			public void run()
			{
				while(alive)
				{
					Semaphore sem = semaphoreForShrink.get();
					try
					{
						if(null != sem)
						{
							if(!sem.tryAcquire())
							{
								//This thread is out of new pool size, so it should be shuwdown.
								workers.remove(this);
								/*synchronized (sem)
								{
									sem.notifyAll();
								}*/
								return;
							}
						}
						
			 			/*T ret = strategy.accept(SturmWaffe.this, srv);
						Q q = strategy.wrap(SturmWaffe.this, ret);
						q.*/
						T ret = null;
						try
						{
							waitingWorkers.incrementAndGet();
							ret = job.acceptJob();
						}
						catch(Throwable t)
						{
							if(t == MultithreadingTools.THREAD_SHUTDOWN_POISON)
							{
								return;
							}
							
							MultithreadingTools.topLevelException(t);
							continue;
						}
						finally
						{
							waitingWorkers.decrementAndGet();
						}
						
						try
						{
							workingWorkers.incrementAndGet();
							job.exec(ret);
						}
						catch(Throwable t)
						{
							if(t == MultithreadingTools.THREAD_SHUTDOWN_POISON)
							{
								return;
							}
							
							MultithreadingTools.topLevelException(t);
						}
						finally
						{
							workingWorkers.decrementAndGet();
						}
						
					}
					finally
					{
						if(null != sem)
							sem.release();
					}
				}
			}
		};
	}

	@Deprecated
	public void fillThreads(Collection<Thread> poolThreads)
	{
		poolThreads.addAll(workers);
	}
	
	protected boolean alive = true;
	
	public void stop()
	{
		alive = false;
		cleanShutdown();
		for(Thread t:workers)
		{
			t.interrupt();
		}
	}
}