MultithreadingTools.java

package eu.javaexperience.multithread;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import eu.javaexperience.interfaces.simple.getBy.GetBy1;
import eu.javaexperience.interfaces.simple.publish.SimplePublish1;
import eu.javaexperience.multithread.notify.WaitForEvents;

public class MultithreadingTools
{
	
	/**
	 * Used for implement thread shutdown request.
	 * 
	 * Used (must be used) in ALL multithreading uniliys which cooperates with threads or thread pools,
	 * or manages any {@link Thread}.
	 * 
	 * clients: throw this exception to indicate for current thread (even if job is issed to a thread pool)
	 * that it must be shutdown.
	 *		desing hint: never catch Throwables, just Exceptions (RuntimeException is a descendant)
	 *		but if you have to, catch the throwable and if value is identically this value then throw again it.
	 *
	 *	
	 * maintainer: if this value is catched on the top level, shutdown the thread!
	 * 
	 * */
	public static final Error THREAD_SHUTDOWN_POISON = new Error();
	
	public static <E> Job<Queue<E>> processAll(final Job<E> job)
	{
		return new Job<Queue<E>>()
		{
			@Override
			public void exec(Queue<E> param) throws Throwable
			{
				E elem = null;
				while((elem = param.poll()) != null)
				{
					job.exec(elem);
				}
			}
		};
	}
	
	public static <E> Job<BlockingQueue<E>> processInfinite(final Job<E> job)
	{
		return new Job<BlockingQueue<E>>()
		{
			@Override
			public void exec(BlockingQueue<E> param) throws Throwable
			{
				E elem = null;
				while((elem = param.take()) != null)
				{
					job.exec(elem);
				}
			}
		};
	}

	public static void topLevelException(Throwable t)
	{
		System.err.println("=== TOP LEVEL UNCATCHED EXCEPTION");
		t.printStackTrace();
	}

	public static <T> void adhocMultiProcessAll
	(
		Queue<T> units,
		int concurrency,
		SimplePublish1<T> exec
	)
	{
		for(int i=0;i<concurrency;++i)
		{
			new Thread()
			{
				public void run()
				{
					T elem = null;
					while(null != (elem = units.poll()))
					{
						exec.publish(elem);
					}
				};
			}.start();
		}
	}

	public static <P,R> List<R> processAllConcurrently
	(
		int maxConcurrency,
		Collection<P> toProc,
		GetBy1<R, P> processor
	)
	{
		WaitForEvents w = new WaitForEvents(toProc.size());
		List<R> ret = new ArrayList<>();
		
		Queue<P> params = new LinkedBlockingQueue<>();
		params.addAll(toProc);
		
		int c = Math.min(maxConcurrency, toProc.size());
		for(int i=0;i<c;++i)
		{
			new Thread()
			{
				public void run()
				{
					P p = null;
					while(null != (p = params.poll()))
					{
						try
						{
							R proc = processor.getBy(p);
							synchronized(ret)
							{
								ret.add(proc);
							}
						}
						catch(Throwable t)
						{
							topLevelException(t);
						}
						finally
						{
							w.call();
						}
					}
				};
			}.start();
		}
		
		w.waitForAllEvent();
		return ret;
	}
}