RpcTools.java
package eu.javaexperience.rpc;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import eu.javaexperience.arrays.ArrayTools;
import eu.javaexperience.asserts.AssertArgument;
import eu.javaexperience.collection.CollectionTools;
import eu.javaexperience.datareprez.DataObject;
import eu.javaexperience.datareprez.jsonImpl.DataObjectJsonImpl;
import eu.javaexperience.exceptions.OperationSuccessfullyEnded;
import eu.javaexperience.interfaces.simple.getBy.GetBy1;
import eu.javaexperience.interfaces.simple.getBy.GetBy2;
import eu.javaexperience.interfaces.simple.publish.SimplePublish1;
import eu.javaexperience.io.IOStream;
import eu.javaexperience.io.IOStreamServer;
import eu.javaexperience.reflect.Mirror;
import eu.javaexperience.rpc.bidirectional.BidirectionalRpcDefaultProtocol;
import eu.javaexperience.rpc.bidirectional.RpcClientProtocolHandler;
import eu.javaexperience.rpc.bulk.MultiplexedApiCall;
import eu.javaexperience.rpc.codegen.PhpRpcInterfaceGenerator;
import eu.javaexperience.rpc.codegen.RpcSourceBuilder;
import eu.javaexperience.rpc.discover.DiscoverRpc;
import eu.javaexperience.rpc.function.RpcFunctionParameter;
import eu.javaexperience.semantic.references.MayNull;
public class RpcTools
{
//TODO replace with discover API
public static final RpcFacility[] emptyRpcFacilityArray = new RpcFacility[0];
public static <C extends RpcRequest, F extends RpcFunction<C, ?>>
DataObject
callFunction
(
C ctx,
F function
)
{
return callFunction(ctx, null, function);
}
public static <C extends RpcRequest, F extends RpcFunction<C, ?>>
DataObject
callFunction
(
C ctx,
Object javaMethodThisParam,
F function
)
{
RpcProtocolHandler protocol = ctx.getProtocolHandler();
String functionName = protocol.getRequestFunctionName(ctx);
try
{
//TODO conflict
Object thisContext = javaMethodThisParam;//protocol.extractThisContext(ctx);
Object[] incoming = protocol.extractParameters(ctx);
RpcFunctionParameter[] ps = function.getParameterClasses();
Object[] callParam = new Object[Math.min(incoming.length, ps.length)];
for(int i=0;i<callParam.length;++i)
{
callParam[i] = ps[i].getCaster().cast(incoming[i]);
}
Object result = function.call(ctx, thisContext, functionName, callParam);
return protocol.createReturningValue(ctx, result);
}
catch(Throwable e)
{
e.printStackTrace();
return protocol.createException(ctx, e);
}
}
public static DataObject wrapReturningValue
(
RpcRequest ctx,
Object ret
)
{
RpcProtocolHandler protocol = ctx.getProtocolHandler();
return protocol.createReturningValue(ctx, ret);
}
public static DataObject wrapReturningValue
(
RpcProtocolHandler protocol,
Object ret
)
{
return protocol.createReturningValue(null, ret);
}
public static DataObject wrapException
(
RpcRequest ctx,
Throwable ret
)
{
RpcProtocolHandler protocol = ctx.getProtocolHandler();
return protocol.createException(ctx, ret);
}
public static <S extends RpcSession> RpcRequest createClientInvocation
(
S session,
long trace,
Object _this,
String method,
Object... args
)
{
BidirectionalRpcDefaultProtocol<S> PROTO = (BidirectionalRpcDefaultProtocol<S>) session.getDefaultRpcProtocolHandler();
RpcRequest req = PROTO.createClientRequest(session);
fillClientInvocation(req, trace, null, _this, method, args);
return req;
}
public static <S extends RpcSession> RpcRequest createClientNamespaceInvocation
(
S session,
long trace,
@MayNull String namespace,
Object _this,
String method,
Object... args
)
{
BidirectionalRpcDefaultProtocol<S> PROTO = (BidirectionalRpcDefaultProtocol<S>) session.getDefaultRpcProtocolHandler();
RpcRequest req = PROTO.createClientRequest(session);
fillClientInvocation(req, trace, namespace, _this, method, args);
return req;
}
public static <S extends RpcSession> void fillClientInvocation
(
RpcRequest req,
long trace,
String namspace,
Object _this,
String method,
Object[] args
)
{
RpcClientProtocolHandler PROTO = (RpcClientProtocolHandler) req.getProtocolHandler();
PROTO.putNamespace(req, namspace);
PROTO.putPacketTraceId(req, String.valueOf(trace));
PROTO.putThisParameter(req, _this);
PROTO.putRequestFunctionName(req, method);
PROTO.putParameters(req, args);
}
public static <SOCK extends IOStream, SESS extends RpcSession> SocketRpcServer<SOCK, SESS>
newServer
(
IOStreamServer<SOCK> serverSocket,
int concurrency,
RpcProtocolHandler proto,
final GetBy2<SESS, SOCK, RpcProtocolHandler> sessionCreator,
final GetBy2<DataObject, SESS, DataObject> requestHandler
)
{
AssertArgument.assertNotNull(sessionCreator, "session_creator");
AssertArgument.assertNotNull(requestHandler, "request_handler");
return new SocketRpcServer<SOCK, SESS>(serverSocket, concurrency, proto)
{
@Override
protected SESS init(SOCK socket)
{
return sessionCreator.getBy(socket, handler);
}
@Override
protected @MayNull DataObject handleRequest(SESS sess, DataObject request)
{
return requestHandler.getBy(sess, request);
}
};
}
public static <SOCK extends IOStream, SESS extends RpcSession> SocketRpcServer<SOCK, SESS> newParallelCallServer
(
IOStreamServer<SOCK> serverSocket,
int concurrency,
RpcProtocolHandler proto,
final GetBy2<SESS, SOCK, RpcProtocolHandler> sessionCreator,
final GetBy2<DataObject, SESS, DataObject> requestHandler
)
{
ThreadPoolExecutor exec =
new ThreadPoolExecutor(10, 300, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
return new SocketRpcServer<SOCK, SESS>
(
serverSocket,
concurrency,
proto
)
{
@Override
protected SESS init(SOCK socket)
{
return sessionCreator.getBy(socket, handler);
}
@Override
protected void responseRequest(SimplePublish1<DataObject> response, SESS sess, DataObject request, Object extraCtx)
{
exec.execute
(
()->
{
RpcSessionTools.setCurrentRpcSession(sess);
try
{
DataObject resp = handleRequest(sess, request);
if(null != resp)
{
response.publish(resp);
}
else
System.out.println("NULL response");
}
catch(Throwable t)
{
t.printStackTrace();
}
finally
{
RpcSessionTools.setCurrentRpcSession(null);
}
}
)
;
}
@Override
protected @MayNull DataObject handleRequest(SESS sess, DataObject request)
{
return requestHandler.getBy(sess, request);
}
@Override
public void close() throws IOException
{
super.close();
exec.shutdown();
}
};
}
public static abstract class RpcEndpointUnitRpcHandler<SESS extends RpcSession, REQ extends RpcRequest>
{
protected RpcProtocolHandler protocolHandler;
public abstract DataObject receive();
public abstract void send(DataObject response);
public abstract DataObject dispatch(REQ req);
public RpcEndpointUnitRpcHandler()
{
protocolHandler = new BidirectionalRpcDefaultProtocol<>(new DataObjectJsonImpl());
}
public SESS createSession()
{
return (SESS) new SimpleRpcSession(protocolHandler);
}
public void destroySession(SESS session)
{
}
public REQ createRequest(SESS sess, DataObject request)
{
return (REQ) new SimpleRpcRequest(sess, request);
}
public void execute()
{
SESS session = createSession();
RpcSessionTools.setCurrentRpcSession((RpcSession)session);
try
{
while(true)
{
try
{
DataObject req = receive();
if(null == req)
{
break;
}
DataObject ret = dispatch(createRequest(session, req));
if(null != ret)
{
send(ret);
}
}
catch(OperationSuccessfullyEnded skk)
{
continue;
}
}
}
finally
{
RpcSessionTools.setCurrentRpcSession(null);
}
}
}
public static <P extends RpcFunctionParameter, F extends RpcFunction<? extends RpcRequest, P>> String generatePhpRpcClass
(
String unitName,
Collection<F> functions
)
{
return generateRpcClassWithBuilder
(
PhpRpcInterfaceGenerator.BASIC_PHP_SOURCE_BUILDER,
unitName,
functions
);
}
public static <P extends RpcFunctionParameter, F extends RpcFunction<? extends RpcRequest, P>> String generateRpcClassWithBuilder
(
RpcSourceBuilder<RpcFunctionParameter, RpcFunction<RpcRequest, RpcFunctionParameter>> builder,
String unitName,
Collection<F> functions
)
{
return generateRpcClassWithBuilder(builder, unitName, functions, new HashMap<String, Object>());
}
public static <P extends RpcFunctionParameter, F extends RpcFunction<? extends RpcRequest, P>> String generateRpcClassWithBuilder
(
RpcSourceBuilder<RpcFunctionParameter, RpcFunction<RpcRequest, RpcFunctionParameter>> builder,
String unitName,
Collection<F> functions,
Map<String, Object> settings
)
{
return builder.buildRpcClientSource
(
unitName,
(Collection<RpcFunction<RpcRequest, RpcFunctionParameter>>) functions,
settings
);
}
public static String generateRpcClassesWithBuilder
(
RpcSourceBuilder<RpcFunctionParameter, RpcFunction<RpcRequest, RpcFunctionParameter>> builder,
Map<String, Object> settings,
Entry<String, JavaClassRpcFunctions<SimpleRpcRequest>>... rpcs
)
{
StringBuilder sb = new StringBuilder();
for(Entry<String, JavaClassRpcFunctions<SimpleRpcRequest>> rpc:rpcs)
{
sb.append(generateRpcClassWithBuilder(builder, rpc.getKey(), rpc.getValue().getFunctionList(), settings));
}
return sb.toString();
}
public static GetBy2<SimpleRpcSession, IOStream, RpcProtocolHandler> getSimpleSessionCreator()
{
return new GetBy2<SimpleRpcSession, IOStream, RpcProtocolHandler>()
{
@Override
public SimpleRpcSession getBy(IOStream a, RpcProtocolHandler b)
{
return new SimpleRpcSession(b);
}
};
}
public static Object extractReturnOrThrow(RpcRequest req) throws Throwable
{
RpcClientProtocolHandler<?> hand = (RpcClientProtocolHandler<?>) req.getProtocolHandler();
Throwable t = hand.extractException(req);
if(null != t)
{
throw t;
}
return hand.extractReturningValue(req);
}
public static <R extends RpcRequest> GetBy1<DataObject, R> createSimpleNamespaceDispatcher
(
final GetBy2<DataObject, R, String> _default,
final Collection<RpcFacility> rpcs
)
{
return createSimpleNamespaceDispatcher(_default, (RpcFacility[]) rpcs.toArray(JavaClassRpcFunctions.emptyRpcFacilityArray));
}
public static <R extends RpcRequest> GetBy1<DataObject, R> createSimpleNamespaceDispatcher
(
final GetBy2<DataObject, R, String> _default,
final RpcFacility... rpcs
)
{
return new GetBy1<DataObject, R>()
{
@Override
public DataObject getBy(R a)
{
String ns = a.getProtocolHandler().extractNamespace(a);
for(RpcFacility<R> rpc:rpcs)
{
if(Mirror.equals(ns, rpc.getRpcName()))
{
return rpc.dispatch(a);
}
}
if(null != _default)
{
return _default.getBy(a, ns);
}
return null;
}
};
}
public static List<RpcFacility> wrapApis(RpcFacility... apis)
{
ArrayList<RpcFacility> dst = new ArrayList<>();
CollectionTools.copyInto(apis, dst);
return Collections.unmodifiableList(dst);
}
public static GetBy1<DataObject, SimpleRpcRequest> createSimpleNamespaceDispatcherWithDiscoverApi(RpcFacility... instances)
{
DiscoverRpc discover = new DiscoverRpc(instances);
RpcFacility[] add = ArrayTools.arrayAppend(discover, instances);
return createSimpleNamespaceDispatcher(discover, add);
}
public static List<RpcFacility> addMultiplexer(List<RpcFacility> apis, GetBy1<RpcRequest, DataObject> requestCreator)
{
List<RpcFacility> rpc = new ArrayList<>();
CollectionTools.copyInto(apis, rpc);
rpc.add(new JavaClassRpcUnboundFunctionsInstance<RpcRequest>(new MultiplexedApiCall(apis, requestCreator), MultiplexedApiCall.class)
{
@Override
public String getRpcName()
{
return "MultiplexedApiCall";
}
});
return rpc;
}
public static void throwUnknownNamespace(String ns)
{
throw new RuntimeException("Unknown RPC namespace: "+ns);
}
}