JavaRpcParallelClient.java

package eu.javaexperience.rpc.javaclient;

import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import eu.javaexperience.asserts.AssertArgument;
import eu.javaexperience.datareprez.DataObject;
import eu.javaexperience.datareprez.DataReceiver;
import eu.javaexperience.datareprez.DataReprezTools;
import eu.javaexperience.datareprez.DataSender;
import eu.javaexperience.interfaces.simple.SimpleGet;
import eu.javaexperience.interfaces.simple.getBy.GetBy1;
import eu.javaexperience.interfaces.simple.publish.SimplePublish1;
import eu.javaexperience.log.JavaExperienceLoggingFacility;
import eu.javaexperience.log.LogLevel;
import eu.javaexperience.log.Loggable;
import eu.javaexperience.log.Logger;
import eu.javaexperience.log.LoggingTools;
import eu.javaexperience.patterns.behavioral.mediator.EventMediator;
import eu.javaexperience.reflect.Mirror;
import eu.javaexperience.rpc.RpcProtocolHandler;
import eu.javaexperience.semantic.references.MayNull;

public class JavaRpcParallelClient
{
	protected static final Logger LOG = JavaExperienceLoggingFacility.getLogger(new Loggable("JavaRpcParallelClient"));
	
	protected EventMediator<DataObject> serverEvents = new EventMediator<>();
	protected RpcProtocolHandler proto;

	protected AtomicLong trasactionId = new AtomicLong();
	
	protected LinkedList<JavaClientRpcPendingRequest> pendingRequests = new LinkedList<>();
	
	SimplePublish1<DataObject> send;
	SimpleGet<DataObject> receive;
	
	protected Thread reader;
	protected boolean readResponses = false;
	
	public JavaRpcParallelClient(DataSender send, DataReceiver rec, RpcProtocolHandler proto)
	{
		this.send = o ->
		{
			try
			{
				send.send(o);
			}
			catch (IOException e)
			{
				Mirror.propagateAnyway(e);
			}
		};
		this.receive = ()->
		{
			try
			{
				return rec.receiveDataObject();
			}
			catch (IOException e)
			{
				Mirror.propagateAnyway(e);
				return null;
			}
		};
		this.proto = proto;
	}
	
	public JavaRpcParallelClient(SimplePublish1<DataObject> send, SimpleGet<DataObject> rec, RpcProtocolHandler proto)
	{
		this.send = send;
		this.receive = rec;
		this.proto = proto;
	}
	
	protected class JavaClientRpcPendingRequest implements Closeable
	{
		protected Object originalTid;
		protected final long tid;
		protected DataObject response = null;
		protected boolean revoked = false;
		
		public JavaClientRpcPendingRequest(DataObject req)
		{
			this.tid = trasactionId.incrementAndGet();
			originalTid = req.opt("t");
			req.putLong("t", tid);
		}
		
		@Override
		public void close() throws IOException
		{
			revoked = true;
			revokePendingRequest(this);
		}
		
		@Override
		protected void finalize() throws Throwable
		{
			//safety net
			close();
		}
		
		protected Semaphore lock = new Semaphore(0);
		
		public synchronized boolean isResponsed()
		{
			return null != response;
		}
		
		public synchronized boolean isRevoked()
		{
			return null != response && revoked;
		}
		
		public boolean waitResponse(long timeout, TimeUnit unit) throws InterruptedException
		{
			AssertArgument.assertTrue(!isRevoked(), "Request has been revoked");
			lock.tryAcquire(timeout, unit);
			AssertArgument.assertTrue(!isRevoked(), "Request has been revoked");
			return isResponsed();
		}
		
		public DataObject ensureResponse(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
		{
			return ensureResponse(timeout, unit, null);
		}
		
		public DataObject ensureResponse(long timeout, TimeUnit unit, String errAppendMsg) throws InterruptedException, TimeoutException
		{
			if(!waitResponse(timeout, unit))
			{
				throw new TimeoutException("Request (`"+tid+"`) not responded within "+timeout+" "+unit+" for the request. "+errAppendMsg);
			}
			
			return response;
		}
		
		protected void receiveResponse(DataObject data)
		{
			AssertArgument.assertNotNull(data, "data");
			if(null != originalTid)
			{
				DataReprezTools.put(data, "t", originalTid);
			}
			synchronized(this)
			{
				response = data;
			}
			lock.release(Integer.MAX_VALUE);
		}
		
		public boolean isResponseMatches(DataObject a)
		{
			return tid == a.optLong("t", -1);
		}

		public void revoke()
		{
			revoked = true;
			lock.release(Integer.MAX_VALUE);
		}
	}
	
	public EventMediator<DataObject> getServerEventMediator()
	{
		return serverEvents;
	}
	
	protected void sendPacket(DataObject req)
	{
		if(LOG.mayLog(LogLevel.TRACE))
		{
			LoggingTools.tryLogFormat(LOG, LogLevel.TRACE, "Sending packet `%s`", req);
		}
		synchronized (send)
		{
			send.publish(req);
		}
	}
	
	protected GetBy1<DataObject, DataObject> transactionHandler = (req)->
	{
		JavaClientRpcPendingRequest p = new JavaClientRpcPendingRequest(req);
		synchronized(pendingRequests)
		{
			pendingRequests.addFirst(p);
		}
		
		//actually send the package
		sendPacket(req);
		
		try
		{
			p.lock.acquire();
		}
		catch (InterruptedException e)
		{
			Mirror.propagateAnyway(e);
		}
		
		if(p.isRevoked())
		{
			throw new RuntimeException("Request has been revoked.");
		}
		
		if(!p.isResponsed())
		{
			throw new RuntimeException("Response error: null response returned.");
		}
		
		return p.response;
	};
	
	public void publishResponse(DataObject resp)
	{
		JavaClientRpcPendingRequest target = null;
		synchronized(pendingRequests)
		{
			for(JavaClientRpcPendingRequest p:pendingRequests)
			{
				if(p.isResponseMatches(resp))
				{
					pendingRequests.remove(p);
					target = p;
					break;
				}
			}
		}
		
		if(null != target)
		{
			if(LOG.mayLog(LogLevel.TRACE))
			{
				LoggingTools.tryLogFormat(LOG, LogLevel.TRACE, "Answer request `%s` with response `%s`", target, resp);
			}
			target.receiveResponse(resp);
		}
		else
		{
			if(LOG.mayLog(LogLevel.TRACE))
			{
				LoggingTools.tryLogFormat(LOG, LogLevel.TRACE, "Server event received: `%s`", resp);
			}
			synchronized(serverEvents)
			{
				serverEvents.dispatchEvent(resp);
			}
		}
	}
	
	
	/**
	 * You can call this function to receiving server events without calling any
	 * client functions.
	 * */
	public void readResponse()
	{
		DataObject rec = null;
		synchronized(receive)
		{
			rec = receive.get();
		}
		AssertArgument.assertNotNull(rec, "Received packet");
		publishResponse(rec);
	}
	
	public synchronized void startPacketRead()
	{
		if(null == reader)
		{
			readResponses = true;
			reader = new Thread()
			{
				@Override
				public void run()
				{
					while(readResponses)
					{
						readResponse();
					}
				}
				
			};
			reader.start();
		}
		else
		{
			throw new RuntimeException("Packet reader thread already started");
		}
	}
	
	public synchronized void stopPacketRead()
	{
		if(null != reader)
		{
			readResponses = false;
			reader.interrupt();
			reader = null;
		}
		else
		{
			throw new RuntimeException("Packet reader thread not started");
		}
	}
	
	protected boolean revokePendingRequest(JavaClientRpcPendingRequest req)
	{
		synchronized(pendingRequests)
		{
			boolean ret = pendingRequests.remove(req);
			if(ret)
			{
				req.revoke();
			}
			return ret;
		}
	}
	
	public <T> T createApiObject(Class<T> cls, @MayNull String namespace)
	{
		return JavaRpcClientTools.createApiWithTransactionHandler(cls, transactionHandler, namespace, proto);
	}
	
	/**
	 * Revokes all pending request without closing the communication lines.
	 * */
	public void shudown()
	{
		synchronized(pendingRequests)
		{
			for(JavaClientRpcPendingRequest p:pendingRequests)
			{
				p.revoke();
			}
		}
	}
}