扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章主要介绍“Hadoop中RPC机制分析Server端”,在日常操作中,相信很多人在Hadoop中RPC机制分析Server端问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Hadoop中RPC机制分析Server端”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
创新新互联,凭借10多年的成都做网站、网站设计、外贸营销网站建设经验,本着真心·诚心服务的企业理念服务于成都中小企业设计网站有1000多家案例。做网站建设,选创新互联。
1. Server.Listener
RPC Client 端的 RPC 请求发送到 Server 端后, 首先由 Server.Listener 接收
Server.Listener 类继承自 Thread 类, 监听了 OP_READ 和 OP_ACCEPT 事件
Server.Listener 接收 RPC 请求, 在 Server.Listener.doRead() 方法中读取数据, 在 doRead() 方法中又调用了Server.Connection.readAndProcess() 方法,
最后会调用 Server.Connection.processRpcRequest() 方法, 源码如下:
private void processRpcRequest(RpcRequestHeaderProto header, DataInputStream dis) throws WrappedRpcServerException, InterruptedException { ... Writable rpcRequest; // 从成员变量dis中反序列化出Client端发送来的RPC请求( WritableRpcEngine.Invocation对象 ) try { //Read the rpc request rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf); rpcRequest.readFields(dis); } catch (Throwable t) { // includes runtime exception from newInstance ... } // 构造Server端Server.Call实例对象 Call call = new Call(header.getCallId(), header.getRetryCount(), rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header .getClientId().toByteArray()); // 将Server.Call实例对象放入调用队列中 callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count }
调用队列 callQueue 是 Server 的成员变量, Server.Listener 和 Server.Handler 是典型的生产者, 消费者模型,
Server.Listener( 生产者 )的doRead()方法最终调用Server.Connection.processRpcRequest() 方法,
而Server.Handler( 消费者 )处理RPC请求
2. Server.Handler 继承 Thread 类, 其主要工作是处理 callQueue 中的调用, 都在 run() 方法中完成. 在 run() 的主循环中, 每次处理一个从 callQueue 中出队的请求, Server.call() 是一个抽象方法, 实际是调用了 RPC.Server.call()方法, 最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用
/** Handles queued calls . */ private class Handler extends Thread { ... @Override public void run() { ... ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); while (running) { ... final Call call = callQueue.take(); // 获取一个RPC调用请求 ... Writable value = null; value = call.connection.user.doAs(new PrivilegedExceptionAction() { @Override public Writable run() throws Exception { // 调用RPC.Server.call()方法 // call.rpcKind : RPC调用请求的类型, 一般为Writable // call.connection.protocolName : RPC协议接口的类名 // call.rpcRequest : Invocation实例对象, 包括方法名, 参数列表, 参数列表的Class对象数组 // call.timestamp : 调用时间戳 return call(call.rpcKind, call.connection.protocolName, call.rpcRequest, call.timestamp); } }); } ... } }
RPC.Server.call() 方法如下:
@Override public Writable call(RPC.RpcKind rpcKind, String protocol, Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); }
最后通过 WritableRPCEngine.call() 方法完成 Server 端方法调用, 代码如下:
@Override public Writable call(org.apache.hadoop.ipc.RPC.Server server, String protocolName, Writable rpcRequest, long receivedTime) throws IOException, RPC.VersionMismatch { Invocation call = (Invocation)rpcRequest; // 将RPC请求强制转成WritableRpcEngine.Invocation对象 ... long clientVersion = call.getProtocolVersion(); final String protoName; ProtoClassProtoImpl protocolImpl; // Server端RPC协议接口的实现类的实例对象 ... // Invoke the protocol method try { ... // 获取RPC请求中调用的方法对象Method Method method = protocolImpl.protocolClass.getMethod(call.getMethodName(), call.getParameterClasses()); method.setAccessible(true); ... // 在Server端RPC协议接口的实现类的实例对象 protocolImpl 上调用具体的方法 Object value = method.invoke(protocolImpl.protocolImpl, call.getParameters()); ... // 调用正常结束, 返回调用结果 return new ObjectWritable(method.getReturnType(), value); } catch (InvocationTargetException e) { // 调用出现异常, 用IOException包装异常, 最后抛出该异常 Throwable target = e.getTargetException(); if (target instanceof IOException) { throw (IOException)target; } else { IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); throw ioe; } } catch (Throwable e) { ... } } }
在 WritableRpcEngine.call() 方法中, 传入的 rpcRequest 会被强制转换成 WritableRpcEngine.Invocation 类型的对象 call , 并通过 call 这个对象包含的方法名(getMethodName()方法)和参数列表的 Class对象数组(getParameterClasses())获取 Method 对象, 最终通过 Method 对象的invoke() 方法, 调用实现类的实例对象 protocolImpl 上的方法, 完成 Hadoop 的远程过程调用
好了, 现在 Server 端的具体方法已经被调用了, 调用结果分两种情况:
1) 调用正常结束, 则将方法的返回值和调用结果封装成一个 ObjectWritable 类型的对象, 并返回
2) 调用出现异常, 抛出 IOException 类型的异常
3. Server.Responder
这个类的功能: 发送 Hadoop 远程过程调用的应答给 Client 端, Server.Responder 类继承自 Thread 类, 监听了 OP_WRITE 事件, 即通道可写. 具体细节写不下去了
到此,关于“Hadoop中RPC机制分析Server端”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流