ScalableBlockingJobExecutor.java
package eu.javaexperience.multithread;
import eu.javaexperience.asserts.AssertArgument;
import eu.javaexperience.interfaces.simple.SimpleGet;
import eu.javaexperience.log.JavaExperienceLoggingFacility;
import eu.javaexperience.log.LogLevel;
import eu.javaexperience.log.Loggable;
import eu.javaexperience.log.Logger;
import eu.javaexperience.log.LoggingTools;
public abstract class ScalableBlockingJobExecutor<T>
{
protected ScalableThreadpoolManageStrategy poolStrategy;
protected static Logger LOG = JavaExperienceLoggingFacility.getLogger(new Loggable("ScalableBlockingJobExecutor"));
protected ScalableThreadpool<SimpleGet<BlockingJob<T>>> threadPool = new ScalableThreadpool
(
new SimpleGet<BlockingJob<T>>()
{
@Override
public BlockingJob<T> get()
{
return blockingJob;
}
}
);
protected BlockingJob<T> blockingJob;
public ScalableBlockingJobExecutor(ScalableThreadpoolManageStrategy poolStrategy)
{
AssertArgument.assertNotNull(this.poolStrategy = poolStrategy, "poolStrategy");
}
public ScalableBlockingJobExecutor(int initialWorketCount)
{
poolStrategy = getDefault(initialWorketCount);
}
@Deprecated
public ScalableThreadpool<SimpleGet<BlockingJob<T>>> getThreadPool()
{
return threadPool;
}
public ScalableThreadpoolManageStrategy getStrategy()
{
return poolStrategy;
}
protected abstract BlockingJob<T> initalizeJob();
public void start()
{
AssertArgument.assertNotNull(blockingJob = initalizeJob(), "blockingJob");
poolStrategy.initialize(threadPool);
threadPool.start();
}
protected void manageLoad()
{
poolStrategy.manageLoad(threadPool);
}
public void setPoolStrategy(ScalableThreadpoolManageStrategy poolStrategy)
{
this.poolStrategy = poolStrategy;
}
public static ScalableThreadpoolManageStrategy getDefault(final int initialWorkerCount)
{
//return getDefault(initialWorkerCount, 0.25, 0.75);
return getDefault(initialWorkerCount, 0.15, 0.75, 0.4);
}
public static ScalableThreadpoolManageStrategy getDefault(final int initialWorkerCount, final double shrinkLoad, final double growLoad, final double targetLoad)
{
return new ScalableThreadpoolManageStrategy()
{
@Override
public void manageLoad(ScalableThreadpool<?> pool)
{
//if busy is eq or more than 75 % we increase the workers count
//to busy to 50 %
//synchronized(pool)
{
double num = pool.getNumberOfWorkers();
double busy = pool.getNumberOfWorkingWorkers();
double load = busy/num;
if(load >= growLoad || load <= shrinkLoad)
{
handle(pool);
}
}
}
public void handle(ScalableThreadpool<?> pool)
{
//synchronized (this)
{
int num = pool.getNumberOfWorkers();
int busy = pool.getNumberOfWorkingWorkers();
double load = ((double)busy)/num;
int newCount = num;
if(load >= growLoad || load <= shrinkLoad)
{
newCount = (int) (busy * (1.0/targetLoad));
}
/*if()
{
newCount = (int) (busy * shrinkLoad);
}*/
if(newCount < initialWorkerCount)
{
newCount = initialWorkerCount;
}
if(num == newCount)
{
return;
}
//scale up
//scale down
LoggingTools.tryLogFormat(LOG, LogLevel.INFO, "Scaling pool: threads: %d, busy: %d, newThreadCount: %d", num, busy, newCount);
try
{
pool.setWorkerCount(newCount);
//System.out.println("New worker pool size is: "+newCount);
}
catch (InterruptedException e)
{
LoggingTools.tryLogSimple(LOG, LogLevel.WARNING, e);
}
}
}
@Override
public void initialize(ScalableThreadpool<?> pool)
{
pool.setNumberOfWorkers(initialWorkerCount);
}
};
}
public void stop()
{
threadPool.stop();
}
}