Remote Method Invocation(远程方法调用),它是一种机制,能够让在某个 Java虚拟机上的对象调用另一个 Java 虚拟机中的对象上的方法。可以用此方法调用的任何对象必须实现该远程接口。调用这样一个对象时,其参数为 "marshalled" 并将其从本地虚拟机发送到远程虚拟机(该远程虚拟机的参数为 "unmarshalled")上。该方法终止时,将编组来自远程机的结果并将结果发送到调用方的虚拟机。如果方法调用导致抛出异常,则该异常将指示给调用方。它主要是为java分布式而设计的,但由于在数据传输时采用了序列化,并且没有做一定的过滤所以导致了一系列安全问题。
Registry registry = LocateRegistry.createRegistry(1099);
public static Registry createRegistry(int port) throws RemoteException { return new RegistryImpl(port); }
public RegistryImpl(int port) throws RemoteException { if (port == Registry.REGISTRY_PORT && System.getSecurityManager() != null) { // grant permission for default port only. try { AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { public Void run() throws RemoteException { LiveRef lref = new LiveRef(id, port); setup(new UnicastServerRef(lref)); return null; } }, null, new SocketPermission("localhost:"+port, "listen,accept")); } catch (PrivilegedActionException pae) { throw (RemoteException)pae.getException(); } } else { LiveRef lref = new LiveRef(id, port); setup(new UnicastServerRef(lref)); } }
public LiveRef(ObjID objID, int port) { this(objID, TCPEndpoint.getLocalEndpoint(port), true); } public LiveRef(ObjID objID, Endpoint endpoint, boolean isLocal) { ep = endpoint; id = objID; this.isLocal = isLocal; }
public static TCPEndpoint getLocalEndpoint(int port) { return getLocalEndpoint(port, null, null); } public static TCPEndpoint getLocalEndpoint(int port, RMIClientSocketFactory csf, RMIServerSocketFactory ssf) { TCPEndpoint ep = null; synchronized (localEndpoints) { TCPEndpoint endpointKey = new TCPEndpoint(null, port, csf, ssf); LinkedList<TCPEndpoint> epList = localEndpoints.get(endpointKey); String localHost = resampleLocalHost(); if (epList == null) { ep = new TCPEndpoint(localHost, port, csf, ssf); epList = new LinkedList<TCPEndpoint>(); epList.add(ep); ep.listenPort = port; ep.transport = new TCPTransport(epList); localEndpoints.put(endpointKey, epList); if (TCPTransport.tcpLog.isLoggable(Log.BRIEF)) { TCPTransport.tcpLog.log(Log.BRIEF, "created local endpoint for socket factory " + ssf + " on port " + port); } } else { synchronized (epList) { ep = epList.getLast(); String lastHost =; int lastPort = ep.port; TCPTransport lastTransport = ep.transport; // assert (localHost == null ^ lastHost != null) if (localHost != null && !localHost.equals(lastHost)) { if (lastPort != 0) { epList.clear(); } ep = new TCPEndpoint(localHost, lastPort, csf, ssf); ep.listenPort = port; ep.transport = lastTransport; epList.add(ep); } } } } return ep; }
public UnicastServerRef(LiveRef ref) { super(ref); } public UnicastRef(LiveRef liveRef) { ref = liveRef; }
private void setup(UnicastServerRef uref) throws RemoteException { ref = uref; uref.exportObject(this, null, true); }
可以看到这给把UnicastServerRef赋给了ref变量,现在大致的对象关系是RegistryImpl <- UnicastRef <- LiveRef <- TCPEndpoint <- TCPTransport
public Remote exportObject(Remote impl, Object data, boolean permanent) throws RemoteException { Class<?> implClass = impl.getClass(); Remote stub; try { stub = Util.createProxy(implClass, getClientRef(), forceStubUse); } catch (IllegalArgumentException e) { throw new ExportException( "remote object implements illegal remote interface", e); } if (stub instanceof RemoteStub) { setSkeleton(impl); } Target target = new Target(impl, this, stub, ref.getObjID(), permanent); ref.exportObject(target); hashToMethod_Map = hashToMethod_Maps.get(implClass); return stub; }
public static Remote createProxy(Class<?> implClass, RemoteRef clientRef, boolean forceStubUse) throws StubNotFoundException { Class<?> remoteClass; try { remoteClass = getRemoteClass(implClass); } catch (ClassNotFoundException ex ) { throw new StubNotFoundException( "object does not implement a remote interface: " + implClass.getName()); } if (forceStubUse || !(ignoreStubClasses || !stubClassExists(remoteClass))) { return createStub(remoteClass, clientRef); } final ClassLoader loader = implClass.getClassLoader(); final Class<?>[] interfaces = getRemoteInterfaces(implClass); final InvocationHandler handler = new RemoteObjectInvocationHandler(clientRef); /* REMIND: private remote interfaces? */ try { return AccessController.doPrivileged(new PrivilegedAction<Remote>() { public Remote run() { return (Remote) Proxy.newProxyInstance(loader, interfaces, handler); }}); } catch (IllegalArgumentException e) { throw new StubNotFoundException("unable to create proxy", e); } }
public void setSkeleton(Remote impl) throws RemoteException { if (!withoutSkeletons.containsKey(impl.getClass())) { try { skel = Util.createSkeleton(impl); } catch (SkeletonNotFoundException e) { withoutSkeletons.put(impl.getClass(), null); } } }
然后再把前面创建的RegistryImpl, UnicastRef,RegistryImpl_Stub类封装到Target对象中,继续调用LiveRef.exportObject() -> TCPEndpoint.exportObject() -> TCPTransport.exportObject(),最后调用了TCPTransport的exportObject方法。
public void exportObject(Target target) throws RemoteException { synchronized (this) { listen(); exportCount++; } boolean ok = false; try { super.exportObject(target); ok = true; } finally { if (!ok) { synchronized (this) { decrementExportCount(); } } } }
LocateRegistry.getRegistry("", 1099);
public static Registry getRegistry(String host, int port) throws RemoteException { return getRegistry(host, port, null); } public static Registry getRegistry(String host, int port, RMIClientSocketFactory csf) throws RemoteException { Registry registry = null; if (port <= 0) port = Registry.REGISTRY_PORT; if (host == null || host.length() == 0) { try { host =; } catch (Exception e) { host = ""; } } LiveRef liveRef = new LiveRef(new ObjID(ObjID.REGISTRY_ID), new TCPEndpoint(host, port, csf, null), false); RemoteRef ref = (csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef); return (Registry) Util.createProxy(RegistryImpl.class, ref, false); }
protected UnicastRemoteObject() throws RemoteException { this(0); } protected UnicastRemoteObject(int port) throws RemoteException { this.port = port; exportObject((Remote) this, port); } public static Remote exportObject(Remote obj, int port) throws RemoteException { return exportObject(obj, new UnicastServerRef(port)); } private static Remote exportObject(Remote obj, UnicastServerRef sref) throws RemoteException { // if obj extends UnicastRemoteObject, set its ref. if (obj instanceof UnicastRemoteObject) { ((UnicastRemoteObject) obj).ref = sref; } return sref.exportObject(obj, null, false); }
public Remote exportObject(Remote impl, Object data, boolean permanent) throws RemoteException { Class<?> implClass = impl.getClass(); Remote stub; try { stub = Util.createProxy(implClass, getClientRef(), forceStubUse); } catch (IllegalArgumentException e) { throw new ExportException( "remote object implements illegal remote interface", e); } if (stub instanceof RemoteStub) { setSkeleton(impl); } Target target = new Target(impl, this, stub, ref.getObjID(), permanent); ref.exportObject(target); hashToMethod_Map = hashToMethod_Maps.get(implClass); return stub; }
final ClassLoader loader = implClass.getClassLoader(); final Class<?>[] interfaces = getRemoteInterfaces(implClass); final InvocationHandler handler = new RemoteObjectInvocationHandler(clientRef); try { return AccessController.doPrivileged(new PrivilegedAction<Remote>() { public Remote run() { return (Remote) Proxy.newProxyInstance(loader, interfaces, handler); }}); } catch (IllegalArgumentException e) { throw new StubNotFoundException("unable to create proxy", e); }
private static Class<?>[] getRemoteInterfaces(Class<?> remoteClass) { ArrayList<Class<?>> list = new ArrayList<>(); getRemoteInterfaces(list, remoteClass); return list.toArray(new Class<?>[list.size()]); }
private static void getRemoteInterfaces(ArrayList<Class<?>> list, Class<?> cl) { Class<?> superclass = cl.getSuperclass(); if (superclass != null) { getRemoteInterfaces(list, superclass); } Class<?>[] interfaces = cl.getInterfaces(); for (int i = 0; i < interfaces.length; i++) { Class<?> intf = interfaces[i]; if (Remote.class.isAssignableFrom(intf)) { if (!(list.contains(intf))) { Method[] methods = intf.getMethods(); for (int j = 0; j < methods.length; j++) { checkMethod(methods[j]); } list.add(intf); } } } }
public void bind(String name, Remote obj) throws RemoteException, AlreadyBoundException, AccessException { checkAccess("Registry.bind"); synchronized (bindings) { Remote curr = bindings.get(name); if (curr != null) throw new AlreadyBoundException(name); bindings.put(name, obj); } }
LocateRegistry.createRegistry(1099)返回一个RegistryImpl对象,然后其属性ref <- UnicastServerRef <- LiveRef <- TCPEndpoint <- TCPTransport ,skel <- RegistryImpl_Skel,还创建一个RegistryImpl_Stub被封装到Target对象中最后存入静态变量ObjectTable.objTable。
public Remote lookup(String var1) throws AccessException, NotBoundException, RemoteException { try { RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L); try { ObjectOutput var3 = var2.getOutputStream(); var3.writeObject(var1); } catch (IOException var18) { throw new MarshalException("error marshalling arguments", var18); } super.ref.invoke(var2); Remote var23; try { ObjectInput var6 = var2.getInputStream(); var23 = (Remote)var6.readObject(); } ... return var23; } ... }
大概看一下就整个函数里面主要就执行了两个函数super.ref.newCall()和super.ref.invoke(),newCall的返回值作为了invoke方法的参数,然后还有个readObject,其他都是try catch相关的东西。
public RemoteCall newCall(RemoteObject obj, Operation[] ops, int opnum,long hash) throws RemoteException { Connection conn = ref.getChannel().newConnection(); try { ... RemoteCall call = new StreamRemoteCall(conn, ref.getObjID(), opnum, hash); try { marshalCustomCallData(call.getOutputStream()); } catch (IOException e) { throw new MarshalException("error marshaling " + "custom call data"); } return call; } catch (RemoteException e) { ref.getChannel().free(conn, false); throw e; } }
public void executeCall() throws Exception { byte returnType; try { ... releaseOutputStream(); DataInputStream rd = new DataInputStream(conn.getInputStream()); byte op = rd.readByte(); ... getInputStream(); returnType = in.readByte(); in.readID(); // id for DGC acknowledgement } catch (UnmarshalException e) { ... } // read return value switch (returnType) { case TransportConstants.NormalReturn: break; case TransportConstants.ExceptionalReturn: Object ex; try { ex = in.readObject(); } catch (Exception e) { throw new UnmarshalException("Error unmarshaling return", e); } ... default: ... } }
private void listen() throws RemoteException { assert Thread.holdsLock(this); TCPEndpoint ep = getEndpoint(); int port = ep.getPort(); if (server == null) { try { server = ep.newServerSocket(); Thread t = AccessController.doPrivileged( new NewThreadAction(new AcceptLoop(server), "TCP Accept-" + port, true)); t.start(); } ... } else { ... } }
public void run() { try { executeAcceptLoop(); } finally { try { serverSocket.close(); } catch (IOException e) { } } } private void executeAcceptLoop() { while (true) { Socket socket = null; try { socket = serverSocket.accept(); InetAddress clientAddr = socket.getInetAddress(); String clientHost = (clientAddr != null ? clientAddr.getHostAddress() : ""); try { connectionThreadPool.execute( new ConnectionHandler(socket, clientHost)); } catch (RejectedExecutionException e) { ... } } catch (Throwable t) { ... } ... } } }
public void run() { Thread t = Thread.currentThread(); String name = t.getName(); try { ... AccessController.doPrivileged((PrivilegedAction<Void>)() -> { run0(); return null; }, NOPERMS_ACC); } finally { t.setName(name); } }
private void run0() { TCPEndpoint endpoint = getEndpoint(); int port = endpoint.getPort(); ... try { InputStream sockIn = socket.getInputStream(); InputStream bufIn = sockIn.markSupported() ? sockIn : new BufferedInputStream(sockIn); // Read magic (or HTTP wrapper) bufIn.mark(4); DataInputStream in = new DataInputStream(bufIn); int magic = in.readInt(); if (magic == POST) { ...//一些http请求的处理 } short version = in.readShort(); if (magic != TransportConstants.Magic || version != TransportConstants.Version) { closeSocket(socket); return; } OutputStream sockOut = socket.getOutputStream(); BufferedOutputStream bufOut = new BufferedOutputStream(sockOut); DataOutputStream out = new DataOutputStream(bufOut); int remotePort = socket.getPort(); ... TCPEndpoint ep; TCPChannel ch; TCPConnection conn; // send ack (or nack) for protocol byte protocol = in.readByte();
int magic = in.readInt(); short version = in.readShort(); byte protocol = in.readByte();
switch (protocol) { case TransportConstants.SingleOpProtocol: ... case TransportConstants.StreamProtocol: // send ack out.writeByte(TransportConstants.ProtocolAck); ... out.writeUTF(remoteHost); out.writeInt(remotePort); out.flush(); String clientHost = in.readUTF(); int clientPort = in.readInt(); ep = new TCPEndpoint(remoteHost, socket.getLocalPort(), endpoint.getClientSocketFactory(), endpoint.getServerSocketFactory()); ch = new TCPChannel(TCPTransport.this, ep); conn = new TCPConnection(ch, socket, bufIn, bufOut); // read input messages handleMessages(conn, true); break; case TransportConstants.MultiplexProtocol: ... default: // protocol not understood, send nack and close socket out.writeByte(TransportConstants.ProtocolNack); out.flush(); break; }
void handleMessages(Connection conn, boolean persistent) { int port = getEndpoint().getPort(); try { DataInputStream in = new DataInputStream(conn.getInputStream()); do { int op =; // transport op ... switch (op) { case TransportConstants.Call: // service incoming RMI call RemoteCall call = new StreamRemoteCall(conn); if (serviceCall(call) == false) return; break; case TransportConstants.Ping: // send ack for ping DataOutputStream out = new DataOutputStream(conn.getOutputStream()); out.writeByte(TransportConstants.PingAck); conn.releaseOutputStream(); break; case TransportConstants.DGCAck: DGCAckHandler.received(; break; default: throw new IOException("unknown transport op " + op); } } while (persistent); } ... }
public boolean serviceCall(final RemoteCall call) { try { /* read object id */ final Remote impl; ObjID id; ... Transport transport = id.equals(dgcID) ? null : this; Target target = ObjectTable.getTarget(new ObjectEndpoint(id, transport)); ... final Dispatcher disp = target.getDispatcher(); target.incrementCallCount(); try { /* call the dispatcher */ transportLog.log(Log.VERBOSE, "call dispatcher"); final AccessControlContext acc = target.getAccessControlContext(); ClassLoader ccl = target.getContextClassLoader(); ClassLoader savedCcl = Thread.currentThread().getContextClassLoader(); try { setContextClassLoader(ccl); currentTransport.set(this); try { new<Void>() { public Void run() throws IOException { checkAcceptPermission(acc); disp.dispatch(impl, call); return null; } }, acc); } catch ( pae) { throw (IOException) pae.getException(); } } finally { setContextClassLoader(savedCcl); currentTransport.set(null); } } catch (IOException ex) { transportLog.log(Log.BRIEF,"exception thrown by dispatcher: ", ex); return false; } finally { target.decrementCallCount(); } } catch (RemoteException e) { ... } return true; }
前面我们在创建代理对象的时候每个stub最后都被封装到了Target对象中最后保存到了静态对象ObjectTable.objTable中。这个函数里面开始根据id,transport获取了RegistryImpl_Stub对应的target,然后下面获取dispatcher,实际上就是UnicastServerRef对象。下面设置了一些值和异常处理,然后调用了disp.dispatch(impl, call),impl是从target中获取的,call是前面传递的函数参数。
public void dispatch(Remote obj, RemoteCall call) throws IOException { int num; long op; try { // read remote call header ObjectInput in;e try { in = call.getInputStream(); num = in.readInt(); if (num >= 0) { if (skel != null) { oldDispatch(obj, call, num); return; } else { throw new UnmarshalException("skeleton class not found but required " + "for client version"); } } op = in.readLong(); } catch (Exception readEx) { throw new UnmarshalException("error unmarshalling call header", readEx); } ....
public void oldDispatch(Remote obj, RemoteCall call, int op) throws IOException { long hash; // hash for matching stub with skeleton try { // read remote call header ObjectInput in; try { in = call.getInputStream(); try { Class<?> clazz = Class.forName("sun.rmi.transport.DGCImpl_Skel"); if (clazz.isAssignableFrom(skel.getClass())) { ((MarshalInputStream)in).useCodebaseOnly(); } } catch (ClassNotFoundException ignore) { } hash = in.readLong(); } catch (Exception readEx) { throw new UnmarshalException("error unmarshalling call header", readEx); } logCall(obj, skel.getOperations()[op]); unmarshalCustomCallData(in); // dispatch to skeleton for remote object skel.dispatch(obj, call, op, hash); } catch (Throwable e) { ... } finally { call.releaseInputStream(); // in case skeleton doesn't call.releaseOutputStream(); } }
这个函数里面先做了一些判断skel等,然后打印日志等。后面又调用了skel.dispatch(obj, call, op, hash)。
public void dispatch(Remote var1, RemoteCall var2, int var3, long var4) throws Exception { if (var4 != 4905912898345647071L) { throw new SkeletonMismatchException("interface hash mismatch"); } else { RegistryImpl var6 = (RegistryImpl)var1; String var7; Remote var8; ObjectInput var10; ObjectInput var11; switch(var3) { case 0: ... case 1: ... case 2: try { var10 = var2.getInputStream(); var7 = (String)var10.readObject(); } catch (IOException var89) { throw new UnmarshalException("error unmarshalling arguments", var89); } catch (ClassNotFoundException var90) { throw new UnmarshalException("error unmarshalling arguments", var90); } finally { var2.releaseInputStream(); } var8 = var6.lookup(var7); try { ObjectOutput var9 = var2.getResultStream(true); var9.writeObject(var8); break; } catch (IOException var88) { throw new MarshalException("error marshalling return", var88); } case 3: ... case 4: ... default: throw new UnmarshalException("invalid method number"); } } }
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (! Proxy.isProxyClass(proxy.getClass())) { throw new IllegalArgumentException("not a proxy"); } if (Proxy.getInvocationHandler(proxy) != this) { throw new IllegalArgumentException("handler mismatch"); } if (method.getDeclaringClass() == Object.class) { return invokeObjectMethod(proxy, method, args); } else if ("finalize".equals(method.getName()) && method.getParameterCount() == 0 && !allowFinalizeInvocation) { return null; // ignore } else { return invokeRemoteMethod(proxy, method, args); } }
可以看到前面做了一些判断,然后判断调用的方法是否存在Object对象中(如hashcode,toString等),这些方法可以就在本地调用。其他的方法就调用invokeRemoteMethod(proxy, method, args)实现远程调用。跟进invokeRemoteMethod()方法可以看到它主要调用了UnicastRef.invoke()方法,继续跟进。
public Object invoke(Remote obj,Method method,Object[] params,long opnum) throws Exception { ... Connection conn = ref.getChannel().newConnection(); RemoteCall call = null; boolean reuse = true; boolean alreadyFreed = false; try { ... // create call context call = new StreamRemoteCall(conn, ref.getObjID(), -1, opnum); // marshal parameters try { ObjectOutput out = call.getOutputStream(); marshalCustomCallData(out); Class<?>[] types = method.getParameterTypes(); for (int i = 0; i < types.length; i++) { marshalValue(types[i], params[i], out); } } ... // unmarshal return call.executeCall(); try { Class<?> rtype = method.getReturnType(); if (rtype == void.class) return null; ObjectInput in = call.getInputStream(); Object returnValue = unmarshalValue(rtype, in); alreadyFreed = true; clientRefLog.log(Log.BRIEF, "free connection (reuse = true)"); ref.getChannel().free(conn, true); return returnValue; } ... }