FlyingMcdull

Spark事件监听详解

Spark中很多组件之间是靠事件消息实现通信的,之前分析了一下Spark中RPC机制,RPC和事件消息机制目的都是实现组件之间的通信,前者解决远程通信问题,而后者则是在本地较为高效的方式。Spark中大量采用事件监听这种方式,实现driver端的组件之间的通信。本文就来解释一下Spark中事件监听是如何实现的。本文代码全部出自于Spark-2.0.0

观察者模式和监听器

在设计模式中有一个观察者模式,该模式建立一种对象与对象之间的依赖关系,一个对象状态发生改变时立即通知其他对象,其他对象就据此作出相应的反应。其中发生改变的对象称之为观察目标(也有叫主题的),被通知的对象称之为观察者,可以有多个观察者注册到一个观察目标中,这些观察者之间没有联系,其数量可以根据需要增减。
observer-design-pattern

Spark中的事件监听

SparkListener

Spark中的事件监听机制,本质上其实就是观察者模式的实现,查看源码我们可以经常看到listener这种命名的类或对象,顾名思义,这就是监听器类或对象。下面就以SparkListener为例来解析事件监听是如何设计的。首先我们看SparkListener

1
2
3
4
5
6
7
abstract class SparkListener extends SparkListenerInterface {
//很多方法代码省略
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
//很多方法代码省略
}

为了简化分析,我们以executor的增删事件监听来做具体的分析。这里我们可以看到onExecutorAdded和onExecutorRemoved两个方法,很明显这是用来处理executor增删事件的方法。SparkListener继承自SparkListenerInterface接口,其内部就是这些方法的声明,代码省略。

监听器的结构

很明显,要使得类的对象能够实现对executor事件的响应,就必须继承SparkListener类。举例来讲,我们可以看到SaveExecutorInfo类(org.apache.spark.deploy.SaveExecutorInfo),是继承自SparkListener类的。

1
2
3
4
5
6
7
private[spark] class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}

其声明中重载了onExecutorAdded方法,搜集executor的信息。

监听器如何使用

看了上面的分析,问题来了,SaveExecutorInfo是如何能够简单地通过继承自一个SparkListener类,甚至后者没有任何消息接受和处理逻辑,来实现事件响应呢?还是举例说明,我们可以看一下如何使用SaveExecutorInfo对象的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class LogUrlsStandaloneSuite extends SparkFunSuite with LocalSparkContext {
//部分代码省略
val listener = new SaveExecutorInfo
sc.addSparkListener(listener) //将一个SaveExecutorInfo监听器对象添加到sc中
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
// Browse to each URL to check that it's valid
info.logUrlMap.foreach { case (logType, logUrl) =>
val html = Source.fromURL(logUrl).mkString
assert(html.contains(s"$logType log page"))
}
}
//部分代码省略
}

监听器如何工作

这里可以看到,在声明了一个SaveExecutorInfo对象之后,需要将它添加到sc中,sc其实就是SparkContext对象,也就是一个Spark application唯一的入口。SparkContext中addSparkListener方法的代码如下:

1
2
3
4
5
6
7
8
/**
* :: DeveloperApi ::
* Register a listener to receive up-calls from events that happen during execution.
*/
@DeveloperApi
def addSparkListener(listener: SparkListenerInterface) {
listenerBus.addListener(listener)
}

这里的listenerBus,是一个监听器总线对象,其声明如下:

1
private[spark] val listenerBus = new LiveListenerBus

现在思路就变得很清晰了,SaveExecutorInfo对象是注册到LiveListenerBus对象中,然后通过LiveListenerBus对象来实现事件监听,其实这里我们通过取名就可以知道其设计思路,类似于计算机中的总线,设备都通过总线来传递消息,而LiveListenerBus就刚好充当了总线的角色,一个个SparkListener子类对象就是一个个的设备,它们可以接受来自总线的消息并作出相应的处理。

监听器总线如何传递消息

我们可以看一下LiveListenerBus类,该类实现了SparkListenerBus接口,直接看其入口start方法:

1
2
3
4
5
6
7
8
def start(sc: SparkContext): Unit = {
if (started.compareAndSet(false, true)) {
sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}

直接启动了一个listenerThread:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)//将事件通知到所有注册的listener中
} finally {
self.synchronized {
processingEvent = false
}
}
}
}
}
}

可以看到这个listenerThread是一个守护线程,其核心逻辑就是不停地在一个事件队列eventQueue里取出事件,如果事件合法且LiverListenerBus没有被关停,就将事件通知给所有注册的listener中,postToAll方法在ListenerBus接口中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listeners.iterator
while (iter.hasNext) {
val listener = iter.next()
try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
}
}
}

思路很清晰,就是用迭代器遍历listener,逐个将消息发送。而doPostEvent方法是一个抽象方法,其具体实现要由继承自ListerBus的类负责,比如之前举例中的SparkListener,就有相应的SparkListenerBus接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
//部分代码省略
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
//部分代码省略
case _ => listener.onOtherEvent(event)
}
}
}

这里对每个事件进行类型匹配,比如doPostEvent需要将executorAdded事件告知一个listener,对应地,这个listener就调用一下自己的onExecutorAdded方法,对该事件作出自己的反应,比如前面的SaveExecutorInfo就实现了onExecutorAdded方法,其内容就是保存一下executor的信息。到这里其实还有一个重要的问题被忽视,LiveListenerBus发给注册在它这里的listener的事件消息是从何而来的?细心一点就可以发现,LiveListenerBus中有一个post方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
def post(event: SparkListenerEvent): Unit = {
if (stopped.get) {
// Drop further events to make `listenerThread` exit ASAP
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
}
}

其核心就是向事件队列里添加相应的事件。

小结

总结之,Spark中监听器的实现核心其实就是一个个需要对事件响应的监听器对象,注册到一个监听器总线,需要发送事件消息的组件将发生的事件消息提交到总线,然后总线将事件消息转发给一个个注册在它上面的监听器,最后监听器对事件进行响应。其实就是一个典型的观察者模式使用。