SocketRpcServer.java
package eu.javaexperience.rpc;
import java.io.Closeable;
import java.io.IOException;
import eu.javaexperience.asserts.AssertArgument;
import eu.javaexperience.datareprez.DataCommon;
import eu.javaexperience.datareprez.DataObject;
import eu.javaexperience.datareprez.DataReceiver;
import eu.javaexperience.datareprez.DataSender;
import eu.javaexperience.datareprez.jsonImpl.DataObjectJsonImpl;
import eu.javaexperience.exceptions.OperationSuccessfullyEnded;
import eu.javaexperience.interfaces.simple.publish.SimplePublish1;
import eu.javaexperience.io.IOStream;
import eu.javaexperience.io.IOStreamServer;
import eu.javaexperience.io.IOTools;
import eu.javaexperience.reflect.Mirror;
import eu.javaexperience.semantic.references.MayNull;
import eu.javaexperience.server.AbstractServer;
public abstract class SocketRpcServer<SOCK extends IOStream, SESS extends RpcSession> extends AbstractServer<SOCK> implements Closeable
{
protected final RpcProtocolHandler handler;
public SocketRpcServer
(
IOStreamServer<SOCK> srv,
int initialWorkerCount,
RpcProtocolHandler handler
)
{
super(srv, initialWorkerCount);
AssertArgument.assertNotNull(this.handler = handler, "rpc protocolhandler");
}
public SocketRpcServer
(
IOStreamServer<SOCK> srv,
int initialWorkerCount
)
{
this(srv, initialWorkerCount, new RpcDefaultProtocol(new DataObjectJsonImpl()));
}
protected Object createExtraContext(SOCK sock, SESS session)
{
return null;
}
protected void destoryExtraContext(Object o)
{
}
@Override
protected void execute(SOCK sock)
{
manageLoad();
DataCommon prototype = handler.getDefaultCommunicationProtocolPrototype();
DataReceiver rec = null;
SESS session = null;
Object ctx = null;
try
{
DataSender ds = prototype.newDataSender(sock.getOutputStream());
SimplePublish1<DataObject> response = resp->
{
synchronized (ds)
{
try
{
ds.send(resp);
}
catch (IOException e)
{
Mirror.propagateAnyway(e);
}
}
};
rec = prototype.newDataReceiver(sock.getInputStream());
session = init(sock);
RpcSessionTools.setCurrentRpcSession((RpcSession)session);
ctx = createExtraContext(sock, session);
while(!sock.isClosed())
{
try
{
DataObject req = rec.receiveDataObject();
if(null == req)
{
break;
}
responseRequest(response, session, req, ctx);
}
catch(OperationSuccessfullyEnded skk)
{
continue;
}
}
}
catch(Throwable e)
{
onException(e);
}
finally
{
try
{
if(null != session)
{
session.destroy();
}
}
catch(Exception e)
{}
RpcSessionTools.setCurrentRpcSession(null);
IOTools.silentClose(sock);
}
manageLoad();
}
protected void onException(Throwable t)
{
if(!(t instanceof IOException))
{
t.printStackTrace();
}
}
protected void responseRequest(SimplePublish1<DataObject> response, SESS sess, DataObject request, Object extraCtx)
{
DataObject resp = handleRequest(sess, request);
if(null != resp)
{
response.publish(resp);
}
}
protected abstract SESS init(SOCK socket);
/**
* At this level we don't care about calling conventions: theres no magic,
* preserved key for method name and not specified argument passing method
* applied.
* if null returned we don't send any answer to the endpoint, just start
* receiving a new call.
*
* no catch or any error handling method applied, if exception throwed
* (other than {@link OperationSuccessfullyEnded}) the connection will be
* terminated.
* */
protected abstract @MayNull DataObject handleRequest(SESS sess, DataObject request);
@Override
public void close() throws IOException
{
srv.close();
}
}