FlyingMcdull

Spark RPC解读

Spark-1.6以后RPC默认使用Netty替代Akka,在Netty上加了一层封装,为实现对Spark的定制开发,必须对RPC实现方式有比较清晰的了解,本文解读Spark RPC实现。本文代码全部出自于Spark-2.0.0

背景介绍

Akka是一个异步的消息框架,所谓的异步,简言之就是消息发送方发送出消息,不用阻塞等待结果,接收方处理完返回结果即可。Akka支持百万级的消息传递,特别适合复杂的大规模分布式系统。Akka基于Actor模型,提供用于创建可扩展,弹性,快速响应的应用程序的平台。Actor封装了状态和行为的对象,不同的Actor之间可通过消息交换实现通信,每个Actor都有自己的消息收件箱。Akka可以简化并发场景下的开发,其异步,高性能的事件驱动模型,轻量级的事件处理可大大方便用于开发复杂的分布式系统。早期Spark大量采用Akka作为RPC。Netty也是一个知名的高性能,异步消息框架,Spark早期便使用它解决大文件传输问题,用来克服Akka的短板。根据社区的说法,因为很多Spark用户饱受Akka复杂依赖关系的困扰,所以后来干脆就直接用Netty代替了Akka。因为Akka和Netty基础知识不是本文的重点,相关知识可自行查阅。

Spark-1.6+中的RPC

Spark-1.6以前的版本中RPC是采用Akka实现,我们以master和worker之间的通信为例解释Akka RPC的工作模式。Spark-1.6以后,RPC默认采用Netty实现。RpcEndpoint注册到RpcEnv上然后才能接收消息,RpcEnv处理从RpcEndpointRef或者其他远程节点的消息,然后回发给相应的RpcEndpoint。Rpc组件之间的关系见下图:
spark-rpc-rpcenv

master实现

master的定义

我们看master类的定义

1
2
3
4
5
6
7
8
private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
...

可以看到master类继承自ThreadSafeRpcEndpoint,进一步定位可以发现ThreadSafeRpcEndpoint是继承自RpcEndpoint特质。

master启动

再看master的启动方法:

1
2
3
4
5
6
7
8
9
10
11
12
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}

RpcEndpoint特质

master的启动会创建一个RpcEnv并将自己注册到其中。继续看RpcEndpoint特质的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private[spark] trait RpcEndpoint {
//当前RpcEndpoint注册到的RpcEnv主子,可以类比为Akka中的actorSystem
val rpcEnv: RpcEnv
//直接用来发送消息的RpcEndpointRef,可以类比为Akka中的actorRef
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
//处理来自RpcEndpointRef.send或者RpcCallContext.reply的消息
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
//处理来自RpcEndpointRef.ask的消息,会有相应的回复
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
//篇幅限制,其余onError,onConnected,onDisconnected,onNetworkError,
//onStart,onStop,stop方法此处省略
}

RpcEnv抽象类

我们来看看RpcEnv的具体内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private[spark] abstract class RpcEnv(conf: SparkConf) {
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)
//返回endpointRef
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
//返回RpcEnv监听的地址
def address: RpcAddress
//注册一个RpcEndpoint到RpcEnv并返回RpcEndpointRef
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
//通过uri异步地查询RpcEndpointRef
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
//通过uri查询RpcEndpointRef,这种方式会产生阻塞
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}
//通过address和endpointName查询RpcEndpointRef,这种方式会产生阻塞
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
}
//关掉endpoint
def stop(endpoint: RpcEndpointRef): Unit
//关掉RpcEnv
def shutdown(): Unit
//等待结束
def awaitTermination(): Unit
//没有RpcEnv的话RpcEndpointRef是无法被反序列化的,这里是反序列化逻辑
def deserialize[T](deserializationAction: () => T): T
//返回文件server实例
def fileServer: RpcEnvFileServer
//开一个针对给定URI的channel用来下载文件
def openChannel(uri: String): ReadableByteChannel
}

另外RpcEnv有一个伴生对象,实现了create方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
private[spark] object RpcEnv {
def create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
new NettyRpcEnvFactory().create(config)
}
}

这就是在master启动方法中的create具体实现,可以看到调用了Netty工厂方法NettyRpcEnvFactory,该方法是对Netty的具体封装。

master中消息处理

上文可以看到,在RpcEndpoint中最核心的便是receive和receiveAndReply方法,定义了消息处理的核心逻辑,master中也有相应的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader =>
case CompleteRecovery =>
case RevokedLeadership =>
case RegisterApplication(description, driver) =>
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
case DriverStateChanged(driverId, state, exception) =>
case Heartbeat(workerId, worker) =>
case MasterChangeAcknowledged(appId) =>
case WorkerSchedulerStateResponse(workerId, executors, driverIds) =>
case WorkerLatestState(workerId, executors, driverIds) =>
case UnregisterApplication(applicationId) =>
case CheckForWorkerTimeOut =>
}

这里定义了master一系列的消息处理逻辑,而receiveAndReply中,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
case RequestSubmitDriver(description) =>
case RequestKillDriver(driverId) =>
case RequestDriverStatus(driverId) =>
case RequestMasterState =>
case BoundPortsRequest =>
case RequestExecutors(appId, requestedTotal) =>
case KillExecutors(appId, executorIds) =>
}

定义了对需要回复的消息组的处理逻辑。

worker实现

worker的定义

我们看看worker类的定义:

1
2
3
4
5
6
7
8
9
10
11
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {

和master同样继承自ThreadSafeRpcEndpoint特质,也是一个Endpoint类型。

worker启动方法

有了前面master的分析,按照相同的逻辑,我们直接看worker的启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {
// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}

自然而然的,和master启动方法基本如出一辙,但是问题出在,worker是如何和master建立联系的?其实细心一点你就可以发现,在master消息处理中,receiveAndReply方法第一个处理的消息便是RegisterWorker,也就是处理来自worker的注册消息,那么事情就变得简单了,worker是通过将自己注册到master的RpcEnv中,然后实现通信的。

worker注册到master RpcEnv

定位到worker中发送RegisterWorker消息到master Endpoint的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}

其中,masterEndpoint.ask是核心,发送了一个RegisterWorker消息到masterEndpoint并期待对方的RegisterWorkerResponse,对response做出相应的处理。这样worker就成功和master建立了连接,它们之间可以互相发送消息进行通信。ps:其实worker注册到master的步骤有一点复杂,涉及到容错等问题,具体的实现这里暂时不做讨论。

worker到master的通信

worker和master之间是一个主从关系,worker注册到master之后,master就可以通过消息传递实现对worker的管理,在worker中有一个方法:

1
2
3
4
5
6
7
8
private def sendToMaster(message: Any): Unit = {
master match {
case Some(masterRef) => masterRef.send(message)
case None =>
logWarning(
s"Dropping $message because the connection to master has not yet been established")
}
}

一目了然,就是干的发送消息到master的活儿,在worker中很多地方都用到这个方法,比如handleExecutorStateChanged(executorStateChanged: ExecutorStateChanged)方法中,sendToMaster(executorStateChanged)就向masterRef发送了executorStateChanged消息,前文中master中的recevie方法中,就有一个对ExecutorStateChanged消息的处理逻辑。

master到worker的通信

同样的,master要对worker实现管理也是通过发送消息实现的,比如launchExecutor(worker: WorkerInfo, exec: ExecutorDesc)方法中:

1
2
3
4
5
6
7
8
9
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
//向worker发送LaunchExecutor消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

master向worker发送了LaunchExecutor消息告诉worker应该启动executor了,而worker中的receive方法中对LaunchExecutor消息进行处理并完成master交代给自己的任务。

小结

复杂的分布式系统中RPC是最重要的一个模块,得益于前人的工作,Akka亦或Netty都为我们提供了完备的RPC实现框架。通过这些框架,我们可以不必去关心RPC实现方式,设计出复杂的分布式系统。本文只是本人在Spark学习过程中的一个记录,分布式系统远远不止这么简单,学习之路依旧漫长。