[置顶] pipeline-filter模式变体之尾循环
来源:程序员人生 发布时间:2014-11-09 09:39:28 阅读次数:3720次
pipeline-filter作为1种处理数据的模式(见【POSA】卷4)可以将利用的任务划分为几个自我完备的数据处理步骤,并连接到1个数据管道。本文介绍1种不太常见的pipeline-filter的变体――尾循环的pipeline-filter,固然这也是在特定的需求场景下才会出现的。
首先,我们来看1个常见的pipeline-filter的模式图:
模式的思路比较简单明了,就是对数据的处理步骤进行拆分。然后以统1的编程接口加上递归的方式,将它们串在1起。
最近在写的1个Message broker中处理消息通讯的时候也采取了这类模式来切分消息处理步骤。在发送消息的时候这类模式使用得非常的顺畅,因此很自然得在接收消息的时候一样采取了这类模式。我们可以先来简单来看1个发送消息的时候全部pipeline就是像下面这样:
关于名词的说明:本文中谈到的handler可以类比为filter,提到的handler-chain可以类比为pipeline,只是叫法不同。
如果不去多想,接收消息的pipeline也应当跟发送消息类似,只是produce-handler变成了consume-handler。但在代码实现的时候,rabbitmq-java-client的实现方式使得这类模式的应用有些受阻。正常情况下,我们的理解是1个消息的处理或1个消息集合的处理,会穿过1个pipeline,但官方提供的java-client对接收消息的实现是socket-blocking以等待消息推送到client的(push)方式。由于官方client的这类实现方式,使得外部封装的做法最好是将socket-blocking搬迁到1个独立的EventLoop-Thread上,然后获得到消息以后,解封送并以事件的方式对外提供消息,而客户端在该事件中实现自己的处理逻辑便可,也就是说是1种异步接收的方式,仔细想一想也确切应当是这类push的方式。
因而可知在接收消息时的pipeline还是很不同于发送消息的。对接收消息而言,filter分成两个部份,第1部份的多个filter只履行1次(主要是在真正开始接收消息之前,处理1些前置任务,比如权限检查,参数验证等);第2部份的多个filter却要不断得在另外1个EventLoop-Thread上循环履行(由于这些filter触及到对接收到的message进行处理)。所以,在接收消息时的示意图为:
其中,下面框起来的两个handler是在EventLoop-Thread上循环履行的。
明显,上面用于produce的那种pipeline-filter不能应对这类变化(既没法跨线程也没法在就其中的几个进行循环)。而此时不可能独立得为consume单独实现1套新的pipeline-filter(由于在pipeline-filter的基础设施上,我们已将produce,consume和request,response、publish,subscribe等都抽象为消息传输(carry))。因此,我们只能在同1套运行机制上,找到1种办法来满足所有的消息传输方式。
我们的做法是,实现1个“过渡handler”(此处的handler即为filter,只是取名不同而已),并实现handle方法。该handler用于将后续的逻辑过渡到1个独立的EventLoop-Thread上并启动EventLoop-Thread(把传递给当前handler的上下文和chain对象传递到事件处理线程上去),其后的所有handler都在该线程上循环履行。
其实现代码段以下:
public void handle(@NotNull MessageContext context,
@NotNull IHandlerChain chain) {
if (!context.isSync()) {
ReceiveEventLoop eventLoop = new ReceiveEventLoop();
eventLoop.setChain(chain);
eventLoop.setContext(context);
eventLoop.setChannelDestroyer(context.getDestroyer());
eventLoop.setCurrentConsumer((QueueingConsumer) context.getOtherParams().get("consumer"));
context.setReceiveEventLoop(eventLoop);
//repeat current handler
((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true);
eventLoop.startEventLoop();
} else {
chain.handle(context);
}
}
如上,ReceiveEventLoop即为1个独立的EventLoop-Thread,启动以后,对pipeline发起的线程而言,它启动的本次调用链(handle方法的递归调用)已结束。因此主线程将会从该调用的触发点向下继续履行
而后续的filter在eventloop线程上独立运行:
public void run() {
try {
while (true) {
QueueingConsumer.Delivery delivery = this.currentConsumer.nextDelivery();
AMQP.BasicProperties properties = delivery.getProperties();
byte[] msgBody = delivery.getBody();
context.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//.....
this.context.setConsumedMsg(msg);
this.chain.handle(this.context);
}
} catch (InterruptedException e) {
logger.info("[run] close the consumer's message handler!");
} catch (IOException e) {
logger.error("[run] occurs a IOException : " + e.getMessage());
this.shutdown();
} catch (ConsumerCancelledException e) {
logger.info("[run] the consumer has been canceled ");
this.shutdown();
} catch (Exception e) {
logger.error("[run] occurs a Exception : " + e.getMessage());
this.shutdown();
}
logger.info("******** thread id " + this.getThreadID() + " quit from message receiver ********");
}
有1个问题:我们必须拿到对EventLoop-Thread的控制权(外部可以在任意时刻关闭该eventloop),而获得其控制权的关键代码就是上上段代码中的:
context.setReceiveEventLoop(eventLoop);
我们将当前EventLoop的实例包装成1个Thread类型的属性,并为其开放了相应的开、关方法,将其控制权丢给外部:
public void startEventLoop() {
this.currentThread.start();
}
public void shutdown() {
this.channelDestroyer.destroy(context.getChannel());
this.currentThread.interrupt();
}
然后在主线程发起接收消息的方法最后会返回1个IReceiverCloser接口的实例,在其接口方法close中调用shutdown,来对其进行关闭:
//launch pipeline
carry(ctx);
return new IReceiverCloser() {
@Override
public void close() {
synchronized (this) {
if (ctx.getReceiveEventLoop().isAlive()) {
ctx.getReceiveEventLoop().shutdown();
}
}
}
};
另外一个问题,handler-chain是如何知道从某个handler以后转入eventloop线程会开始循环履行?是如何实现的?它来自于第1段代码中的以下这句代码:
((MessageCarryHandlerChain) chain).setEnableRepeatBeforeNextHandler(true);
这句代码会在进入下1个handler之前设置1个“循环履行”的标志。下面看看,我们是如何来改造handlerchain来到达这个目的的,在MessageCarryHandlerChain(它实现了IHandlerChain接口)的实现中,有以下4个变量:
private List<AbstractHandler> handlerChain;
private int pos = 0;
private boolean enableRepeat = false;
private int repeatPos = ⑴;
- handlerChain:解析并顺序存储每一个handler
- pos:用来记录当前已履行到哪一个handler
- enableRepeat:用来标识是不是启用重复履行handler
- repeatPos:用于记录重复履行的handler的起始位置
在设置启用enableRepeat属性的时候,会记录当前的位置状态:
public void setEnableRepeatBeforeNextHandler(boolean enableRepeat) {
this.enableRepeat = enableRepeat;
if (this.enableRepeat) {
this.repeatPos = this.pos;
} else {
this.repeatPos = Integer.MIN_VALUE;
}
}
MessageCarryChain的handle实现,这也是handler串接的核心:
public void handle(MessageContext context) {
if (this.repeatPos != Integer.MIN_VALUE) {
if (this.pos < handlerChain.size()) {
AbstractHandler currentHandler = handlerChain.get(pos++);
currentHandler.handle(context, this);
} else if (this.enableRepeat) {
this.pos = this.repeatPos;
}
}
}
在处理第1条到来的消息时,对应到上面while(true)中的最后1句:
this.chain.handle(this.context);
调用会进入MessageCarryChain的handle方法,并履行:
if (this.pos < handlerChain.size()) {
}
判断分支,在其中会触发下1个handler的handle方法,并1直履行下去直到判断条件不成立以后会履行else逻辑,将之前保存的起始循环的handler的位置置为新的handler的位置。
因而,当下1次,事件循环在上面while(true)中收到新的消息时,会再次履行上面的if判断分支(由于在接收上1条消息时,已将pos重置了,所以If判断条件又重新成立)并以循环位置的handler作为起始,直到走到handlerChain中的最后1个以后,又将当前位置的pos重置为repeatPos保存的位置(注意repeatPos在第1次被设置以后不再改变),如此循环往复。也就在表面上构成了上图所示的尾循环。
为何说只是表面上构成了呢。由于在表述中,我故意“疏忽”了这样1个现实――pipeline-filter模式根本上还是利用了递归来实现的,递归会有个还原点,用于“保护现场”以后再“还原现场”。因此,上面MessageCarryChain中的handle代码段中的:
currentHandler.handle(context, this);
对每一个被履行的handler都是还原点,当第1轮handler履行完成(调用完这句:this.pos = this.repeatPos;)都会在还原点层层回退(履行还原点以后的该方法内部的剩余代码)。因此,在收到第2个消息时,实际上是触发了新1轮的handler-chain履行流程,只是由于pos在之前被置为chain中的循环起始位置,而不是从0开始而已。但对后面尾循环的handler而言,它们递归调用的本质没有改变,所以其实只是看起来在尾部“循环”而已。
说明:其实如果你回顾pipeline-filter模式的本质,它们是用来处理数据的。我这里不论是发送还是接收消息统筹处理了消息以外的1些逻辑。如果这里只处理消息,实际上是可以不用跨线程和尾循环的。我只是利用了这类模式,将消息通讯的各个环节进行拆分,组合,重用从而不可躲避得遇到了这个问题,如果回到纯洁的pipeline-filter模式,是不需要这么做的。
如果我没表述清楚的,请直接看代码:Messagebus-Consume
生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠