国内最全IT社区平台 联系我们 | 收藏本站
华晨云阿里云优惠2
您当前位置:首页 > 服务器 > Spark 定制版:010~Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

Spark 定制版:010~Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

来源:程序员人生   发布时间:2016-06-06 08:06:31 阅读次数:2429次

本讲内容:

a. 数据接收架构设计模式
b. 数据接收源码完全研究

注:本讲内容基于Spark 1.6.1版本(在2016年5月来讲是Spark最新版本)讲授。

上节回顾

上1讲中,我们给大家具体分析了Receiver启动的方式及其启动设计带来的多个问题:

a. 如果有多个InputDStream,那就要启动多个Receiver,每一个Receiver也就相当于分片partition,那我启动Receiver的时候理想的情况下是在不同的机器上启动Receiver,但是Spark Core的角度来看就是利用程序,感觉不到Receiver的特殊性,所以就会依照正常的Job启动的方式来处理,极有可能在1个Executor上启动多个Receiver;这样的话便可能致使负载不均衡

b. 有可能启动Receiver失败,只要集群存在,Receiver就不应当启动失败

c. 从运行进程中看,1个Reveiver就是1个partition的话,Reveiver的启动伴随1个Task启动,如果Task启动失败,以Task启动的Receiver也会失败

由此,我们通过源码分析,完全解析了Spark Streaming是如何解决这些问题的:

a. Spark使用1个Job启动1个Receiver.最大程度的保证了负载均衡

b. Spark Streaming已指定每一个Receiver运行在那些Executor上,在Receiver运行之前就指定了运行的地方

c. 如果Receiver启动失败,此时其实不是Job失败,在内部会重新启动Receiver

开讲

本讲我们主要给大家介绍Spark Streaming在接收数据的全生命周期贯通;

a. 当有Spark Streaming有利用程序的时候Spark Streaming会延续不断的接收数据

b. 1般Receiver和Driver不在1个进程中的,所以接收到数据以后要不断的汇报给Driver

c. Spark Streaming要接收数据肯定要使用消息循环器,循环器不断的接收到数据以后,然后将数据存储起来,再将存储完的数据汇报给Driver

d. Sparkstreaming接收数据的全部流程类似于MVC模式,M就是Receiver,V就是Driver,C就是ReceiverSupervisor

e. ReceiverSupervisor是控制器,Receiver的启动是靠ReceiverTracker启动的,Receiver接收到数据以后是靠ReceiverSupervisor存储数据的。然后Driver就取得元数据也就是界面,通过界面来操作底层的数据,这个元数据就相当于指针

Spark Streaming接收数据流程以下:

这里写图片描述

接收数据的时候肯定有1个循环器不断的接收数据,接收到数据肯定也有存储器,存储过以后向Driver汇报。接收数据和存储数据固然要分为两个不同的模块。

这里写图片描述

ReceiverSupervisorImpl是receiver的监控器,同时负责receiver的写操作 这个方法需要传入1个Iterator,实时上里边就只有1个Receiver

取得receiver,这个receiver是根据数据输入来源InputDstream取得的receiver。以SocketInputDstream为例,它的receiver就是SocketReceiver.这里的receiver只是1个援用,并没有被实例化。作为1个参数传入ReceiverSupervisorImpl

这里写图片描述

为了启动Receiver启动了1个spark作业,每个Receiver的启动都会有1个作业来负责,Receiver是1个1个的启动的如果是将所有的Receiver作为1个作业的不同task来启动会有很多弱点

a. Reciver启动可能失败进而致使利用程序失败
b. 运行的进程中会有任务倾斜的问题,将所有的Receiver作为1个作业的不同task来运行是采取的spark core的调度方式,在很不幸的情况下会出现所有Receiver运行在1个节点上,Receiver要不断的接收数据,需要消耗很多资源,就会致使这个节点负载特别大。

将每一个Receiver都作为1个job来运行就会最大可能的负载均衡,不过这样也有可能失败,失败以后不会重试job,而是重新schedule提交1个新的job来运行

Receiver,并且不会在之前运行的executor上启动,只要sparkstreaming程序不停止,假设Receiver出故障就会不停止的进行重新echedule并启动,确保Receiver1定会启动还有很重要的1点是,当重新启动1个Receiver时,是用1个线程池在新的线程中启动的

这里写图片描述

ReceiverSupervisorImpl负责处理Receiver接收到的数据,处理以后汇报给ReceiverTracker,所以ReceiverSupervisorImpl内部有和ReceiverTracker进行通讯的endpoint。这个负责向ReceiverTracker发送消息。

private val trackerEndpoint = RpcUtils.makeDriverRef(“ReceiverTracker”, env.conf,env.rpcEnv)

这个负责接收ReceiverTracker发送的消息,CleanupOldBlocks是用来清除运行完的每一个batch的Blocks,UpdateRateLimit是用来随时调剂限流(限流实际上是限的数据存储的速度)

这里写图片描述

ReceiverSupervisor的start方法

这里写图片描述

在onStart中启动的是BlockGenerator,BlockGenerator是把接收到的1条1条的数据生成block存储起来,1个BlockGenerator只服务于1个Receiver。所以BlockGenerator要在Receiver启动之前启动

这里写图片描述

BlockGenerator种有1个定时器。这个定时器每隔1定(默许是200ms,和设定的batchduration无关)的时间就履行以下方法。这个方法就是把接收到的数据1条1条的放入到这个buffer缓存中,再把这个buffer依照1定的时间或尺寸合并成block。除定时器之外还有1条线程不停的把生成的block交给blockmanager存储起来。

这里写图片描述

下面来看startReceiver方法

这里写图片描述

在启动Receiver之前还要向ReceiverTracker要求是不是可以启动Receiver。当返回是true才会启动。ReceiverTracker接收到汇报的信息就把注册Receiver的信息。

这里写图片描述

Receiver的启动只是调用receiver.onStart(),Receiver就在work节点上运行了

以SocketReceiver为例我看看它的onStart方法

这里写图片描述

备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、Spark大神级专家:王家林
3、新浪微博: http://www.weibo.com/ilovepains

生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠
程序员人生
------分隔线----------------------------
分享到:
------分隔线----------------------------
关闭
程序员人生