码迷,mamicode.com
首页 > 其他好文 > 详细

Spark-RPC理解

时间:2018-12-10 22:35:40      阅读:221      评论:0      收藏:0      [点我收藏+]

标签:实现类   lock   receive   本质   类型   直接   private   基本   RoCE   

基本架构

Akka Actor式RPC架构

  1. Spark采用的是AkkaActor架构实现RPC,但是实际使用过程为了兼容不同节点之间的文件下载,采用Netty来实现Actor功能。
  2. Spark RPC由三部分组成:
  • RpcEnv RPC的执行上下文,等同于ActorSystem,用于管理RpcEndpoint和RpcEndpointRef
  • RpcEndpoint RPC通信实体的抽象,等同于Actor,用于接收客户端发送来的请求,方法主要有receive,onConnected, onDisconnnected, onStart, onStop, onError等
  • RpcEndpointRef RPC通信实体的引用,等同于ActorRef,在客户端被调用,用来向服务端请求,主要方法是ask和askWithRetry

    核心组件

    Dispatcher

  • InboxMessage: 外部发送过来的消息(onStart, onStop, RPCMessage, OneWayMessage...)
  • EndpointData: 包装(RpcEndpoint, NettyRpcEndpointRef, Inbox(InboxMessge队列))
  • MessageLoop: 通过线程池调度,读取阻塞队列中是否有消息,有的话就直接读取,否则阻塞
  • Inbox来源(消息来源):
  • [x] - 注册RpcEndpoint(会生成OnStart消息)
  • [x] - 去注册RpcEndpoint(会生成onStop消息)
  • [x] - postMessage, 投递消息给指定的RpcEndpoint
  • [x] - 停止Dispatcher

    TransportClientFactory

    RPC客户端的工厂类,用于批量生成TransportClient
  • ClientPool,ClientFactory内部通过<sokectAddress, ClientPool> 建立套接字(Socket网络连接)与ClientPool(TransportClient)的关联,同时通过object与TransportClient建立1V1的锁关联关系;即对于一个socket,会有多个TransportClient与其关联,spark通过每一个TransportClient使用不同的lock(object),来进行并行,本质还是一个利用线程池(连接缓冲池)的思想
    其类型定义为
class TransClientFactory {
    ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
}
class ClientPool {
    TransportClient[] clients;
    Object[] locks;
}
  • TransportClient
    包含5种发送消息的方法: fetchChunk, stream, sendRPC, sendRPCSyns, send
  • TransportClientBootstrap 由TransportContext传入,启动加载(如 SAAL和加密认证之类的启动操作)

    TransportContext

    通过createClientFactory创建TransportClientFactory,间接通过createClient创建TransportClient; 通过createServer创建TransServer实例
  • TransportConf 配置稳健加载
  • RpcHandler,是一个abstract类,实现类为NettyRpcHandler,internalReceive负责将ByteBuffer转换成RequestMessage; postMessage用于投递消息, 然后交由对应的RPCEndpoint处理
val msgDispatch = internalReceive(client, message)
dispatcher.postMessage(msgDispatch, callback)
  • NettyStreamMessage: 提供文件服务能力

    NettyRPCEnv

  • timeoutScheduler 超时请求的调度器,使用的ScheduleredExcutorService
  • clientConnectExecutor
  • outboxes: 在send()时在messages中add消息,然后调用drainOutbox()循环遍历发送messages中所有消息;drainOutbox()在没有client时会调用launchuConnectTask()创建TransportClient
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
class Outbox {
    nettyEnv;   //所在环境
    address;    //远端NettyRpcEnv地址
    messages;   //向外发送的消息列表
    client; // TransportClient
    connectFuture; //连接任务的Future引用
    stopped; //是否停止
    draining;   //Outbox正有线程处理消息
}
  • RPC客户端发送请求流程
  1. 调用NettyRpcEndpointRef的send/ask方法向RpcEndpoint发送消息;

a) 如果是同一节点,直接使用Dispatcher的postLocalMessage和postOneWayMessage,直接将消息放入EndpointData的Inbox中;

b) 如果发送方在远处,将消息封装成OutboxMessage,放入远端RpcEndpoint对应的Outbox的messages列表中;

  1. Outbox的drainOutbox循环从messages获取OutboxMessage,调用TransportClient向远端发送消息;
  2. 与远端的TransportServer建立连接之后,经Netty管道,NettyRpcHandler处理,投递到远端的Dispatcher的EndpointData的Inbox中进行处理

    TransportServer

    -TransportRequestHandler:主要是handle()方法,该方法根据request的类型,调用不同的 processXX()方法进行处理
processFetchRequest 处理获取块请求
processRPCRequest 处理RPC请求
processStreamRequest 处理Stream请求
processOneWayMessage 处理无需回复的请求

RPC服务端实现

  • TransportServer

    要点总结

  1. Spark RPC是用Netty实现了数据流传输,以及Actor这种RPC框架的,其中NettyRpcEnv相当于ActorySysm, RpcEndpoint相当于Actor(远端的服务,或者说接口,注册在服务端), RpcEndpointRef相当于ActorRef(服务引用,在客户端使用),双方通信通过Message这个载体;
  2. 客户端发送消息时,通过<address, Outbox[messages, client]>这种结构,向address不断地发送消息;
  3. 服务端通过NettyRpcHandler进行消息的receive,转换成InboxMessage,放入Dispatcher中,Dispatcher使用messageLoop循环遍历Inbox,取出InboxMessage,根据消息路由,调用相应方法进行处理,即路由功能

Spark-RPC理解

标签:实现类   lock   receive   本质   类型   直接   private   基本   RoCE   

原文地址:https://www.cnblogs.com/fkpj/p/10099544.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!