Spark中很多组件之间是靠事件消息实现通信的,之前分析了一下Spark中RPC机制,RPC和事件消息机制目的都是实现组件之间的通信,前者解决远程通信问题,而后者则是在本地较为高效的方式。Spark中大量采用事件监听这种方式,实现driver端的组件之间的通信。本文就来解释一下Spark中事件监听是如何实现的。本文代码全部出自于Spark-2.0.0。
观察者模式和监听器
在设计模式中有一个观察者模式,该模式建立一种对象与对象之间的依赖关系,一个对象状态发生改变时立即通知其他对象,其他对象就据此作出相应的反应。其中发生改变的对象称之为观察目标(也有叫主题的),被通知的对象称之为观察者,可以有多个观察者注册到一个观察目标中,这些观察者之间没有联系,其数量可以根据需要增减。
Spark中的事件监听
SparkListener
Spark中的事件监听机制,本质上其实就是观察者模式的实现,查看源码我们可以经常看到listener这种命名的类或对象,顾名思义,这就是监听器类或对象。下面就以SparkListener为例来解析事件监听是如何设计的。首先我们看SparkListener
为了简化分析,我们以executor的增删事件监听来做具体的分析。这里我们可以看到onExecutorAdded和onExecutorRemoved两个方法,很明显这是用来处理executor增删事件的方法。SparkListener继承自SparkListenerInterface接口,其内部就是这些方法的声明,代码省略。
监听器的结构
很明显,要使得类的对象能够实现对executor事件的响应,就必须继承SparkListener类。举例来讲,我们可以看到SaveExecutorInfo类(org.apache.spark.deploy.SaveExecutorInfo),是继承自SparkListener类的。
其声明中重载了onExecutorAdded方法,搜集executor的信息。
监听器如何使用
看了上面的分析,问题来了,SaveExecutorInfo是如何能够简单地通过继承自一个SparkListener类,甚至后者没有任何消息接受和处理逻辑,来实现事件响应呢?还是举例说明,我们可以看一下如何使用SaveExecutorInfo对象的。
监听器如何工作
这里可以看到,在声明了一个SaveExecutorInfo对象之后,需要将它添加到sc中,sc其实就是SparkContext对象,也就是一个Spark application唯一的入口。SparkContext中addSparkListener方法的代码如下:
这里的listenerBus,是一个监听器总线对象,其声明如下:
现在思路就变得很清晰了,SaveExecutorInfo对象是注册到LiveListenerBus对象中,然后通过LiveListenerBus对象来实现事件监听,其实这里我们通过取名就可以知道其设计思路,类似于计算机中的总线,设备都通过总线来传递消息,而LiveListenerBus就刚好充当了总线的角色,一个个SparkListener子类对象就是一个个的设备,它们可以接受来自总线的消息并作出相应的处理。
监听器总线如何传递消息
我们可以看一下LiveListenerBus类,该类实现了SparkListenerBus接口,直接看其入口start方法:
直接启动了一个listenerThread:
可以看到这个listenerThread是一个守护线程,其核心逻辑就是不停地在一个事件队列eventQueue里取出事件,如果事件合法且LiverListenerBus没有被关停,就将事件通知给所有注册的listener中,postToAll方法在ListenerBus接口中实现:
思路很清晰,就是用迭代器遍历listener,逐个将消息发送。而doPostEvent方法是一个抽象方法,其具体实现要由继承自ListerBus的类负责,比如之前举例中的SparkListener,就有相应的SparkListenerBus接口:
这里对每个事件进行类型匹配,比如doPostEvent需要将executorAdded事件告知一个listener,对应地,这个listener就调用一下自己的onExecutorAdded方法,对该事件作出自己的反应,比如前面的SaveExecutorInfo就实现了onExecutorAdded方法,其内容就是保存一下executor的信息。到这里其实还有一个重要的问题被忽视,LiveListenerBus发给注册在它这里的listener的事件消息是从何而来的?细心一点就可以发现,LiveListenerBus中有一个post方法:
其核心就是向事件队列里添加相应的事件。
小结
总结之,Spark中监听器的实现核心其实就是一个个需要对事件响应的监听器对象,注册到一个监听器总线,需要发送事件消息的组件将发生的事件消息提交到总线,然后总线将事件消息转发给一个个注册在它上面的监听器,最后监听器对事件进行响应。其实就是一个典型的观察者模式使用。