ActiveMQ源码解析(四):聊聊消息的可靠传输机制和事务控制


<span style="font-size:12px;">ActiveMQSession session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);</span>

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException { //检查session状态,如果session已关闭则抛出状态异常 checkClosed(); //检查destination类型,如果不符合要求就转变成ActiveMQDestination if (destination == null) { if (info.getDestination() == null) { throw new UnsupportedOperationException("A destination must be specified."); } throw new InvalidDestinationException("Don't understand null destinations"); } ActiveMQDestination dest; if (destination.equals(info.getDestination())) { dest = (ActiveMQDestination)destination; } else if (info.getDestination() == null) { dest = ActiveMQDestination.transform(destination); } else { throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName()); } if (dest == null) { throw new JMSException("No destination specified"); } if (transformer != null) { //把各种不同的message转换成ActiveMQMessage Message transformedMessage = transformer.producerTransform(session, this, message); if (transformedMessage != null) { message = transformedMessage; } } if (producerWindow != null) { try { producerWindow.waitForSpace(); } catch (InterruptedException e) { throw new JMSException("Send aborted due to thread interrupt."); } } //发送消息到broker中的topic this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete); //消息计数 stats.onMessage(); }


protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { //检查session状态如果closed抛出状态异常 checkClosed(); if (destination.isTemporary() && connection.isDeleted(destination)) { throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); } //竞争锁(互斥信号量),如果1个session的多个producer发送消息这里会保证有序性 synchronized (sendMutex) { // tell the Broker we are about to start a new transaction //告知broker开始1个新事务,只有session的确认模式是SESSION_TRANSACTED时事务上下网才会开启事务 doStartTransaction(); //从事务上下文中获得事务id TransactionId txid = transactionContext.getTransactionId(); long sequenceNumber = producer.getMessageSequence(); //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 //在jms协议头中设置传输模式即消息是不是需要持久化 message.setJMSDeliveryMode(deliveryMode); long expiration = 0L; //检查producer中的message是不是过期 if (!producer.getDisableMessageTimestamp()) { //message获得时间戳 long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); //设置过期时间 if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } //设置消息过期时间 message.setJMSExpiration(expiration); //设置消息优先级 message.setJMSPriority(priority); //设置消息是非重发的 message.setJMSRedelivered(false); // transform to our own message format here //将消息转化成ActiveMQMessage,message针对不同的数据格式有很多种,比如map message,blob message ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); //设置目的地,这里是1个topic msg.setDestination(destination); //设置消息id msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); // Set the message id. //如果消息是经过转换的,那末原消息更新新的id if (msg != message) { message.setJMSMessageID(msg.getMessageId().toString()); // Make sure the JMS destination is set on the foreign messages too. //设置目的地 message.setJMSDestination(destination); } //clear the brokerPath in case we are re-sending this message //清除brokerpath msg.setBrokerPath(null); //设置事务id msg.setTransactionId(txid); // if (connection.isCopyMessageOnSend()) { msg = (ActiveMQMessage)msg.copy(); } //设置连接器 msg.setConnection(connection); //把消息的属性和消息体都设置为只读,避免被修改 msg.onSend(); //生产者id msg.setProducerId(msg.getMessageId().getProducerId()); if (LOG.isTraceEnabled()) { LOG.trace(getSessionId() + " sending message: " + msg); } //如果onComplete没设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且(消息非持久化或连接器是异步发送模式或存在事务id的情况下)异步发送,否则同步发送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { //异步发送走transport的oneway通道 this.connection.asyncSendPacket(msg); if (producerWindow != null) { // Since we defer lots of the marshaling till we hit the // wire, this might not // provide and accurate size. We may change over to doing // more aggressive marshaling, // to get more accurate sizes.. this is more important once // users start using producer window // flow control. int size = msg.getSize(); producerWindow.increaseUsage(size); } } else { if (sendTimeout > 0 && onComplete==null) { //同步发送走transport的request和asyncrequest通道 this.connection.syncSendPacket(msg,sendTimeout); }else { this.connection.syncSendPacket(msg, onComplete); } } } }



public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); //停止session线程 if (wasRunning) { session.stop(); } this.messageListener.set(listener); //session重新分发未消费的message session.redispatch(this, unconsumedMessages); //开启session线程 if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
public void run() { MessageDispatch messageDispatch; while ((messageDispatch = executor.dequeueNoWait()) != null) { final MessageDispatch md = messageDispatch; final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); MessageAck earlyAck = null; //如果消息过期创建新的确认消息 if (message.isExpired()) { earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); earlyAck.setFirstMessageId(message.getMessageId()); } else if (connection.isDuplicate(ActiveMQSession.this, message)) { LOG.debug("{} got duplicate: {}", this, message.getMessageId()); earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); earlyAck.setFirstMessageId(md.getMessage().getMessageId()); earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); } //如果消息已过期,或消息有冲突则发送确认消息重新开始while循环 if (earlyAck != null) { try { asyncSendPacket(earlyAck); } catch (Throwable t) { LOG.error("error dispatching ack: {} ", earlyAck, t); connection.onClientInternalException(t); } finally { continue; } } //如果是确认模式是CLIENT_ACKNOWLEDGE或INDIVIDUAL_ACKONWLEDGE则设置空回调函数,这样consumer确认消息后会履行回调函数 if (isClientAcknowledge()||isIndividualAcknowledge()) { message.setAcknowledgeCallback(new Callback() { @Override public void execute() throws Exception { } }); } //在发送前调用途理函数 if (deliveryListener != null) { deliveryListener.beforeDelivery(this, message); } //设置delivery id md.setDeliverySequenceId(getNextDeliveryId()); lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); /* * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. * */ synchronized (redeliveryGuard) { try { ack.setFirstMessageId(md.getMessage().getMessageId()); //如果是事务模式则开启事务 doStartTransaction(); ack.setTransactionId(getTransactionContext().getTransactionId()); if (ack.getTransactionId() != null) { //事务状态下添加1个匿名同步器,用于处理同步事务比如回滚 getTransactionContext().addSynchronization(new Synchronization() { final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); @Override public void beforeEnd() throws Exception { // validate our consumer so we don't push stale acks that get ignored if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); } LOG.trace("beforeEnd ack {}", ack); sendAck(ack); } @Override public void afterRollback() throws Exception { LOG.trace("rollback {}", ack, new Throwable("here")); // ensure we don't filter this as a duplicate connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect if (clearRequestsCounter.get() > clearRequestCount) { LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); return; } // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); return; } RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); int redeliveryCounter = md.getMessage().getRedeliveryCounter(); if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { // We need to NACK the messages so that they get // sent to the // DLQ. // Acknowledge the last message. MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); asyncSendPacket(ack); } else { MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); ack.setFirstMessageId(md.getMessage().getMessageId()); asyncSendPacket(ack); // Figure out how long we should wait to resend // this message. long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); for (int i = 0; i < redeliveryCounter; i++) { redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); } /* * If we are a non blocking delivery then we need to stop the executor to avoid more * messages being delivered, once the message is redelivered we can restart it. * */ if (!connection.isNonBlockingRedelivery()) { LOG.debug("Blocking session until re-delivery..."); executor.stop(); } connection.getScheduler().executeAfterDelay(new Runnable() { @Override public void run() { /* * wait for the first delivery to be complete, i.e. after delivery has been called. * */ synchronized (redeliveryGuard) { /* * If its non blocking then we can just dispatch in a new session. * */ if (connection.isNonBlockingRedelivery()) { ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); } else { /* * If there has been an error thrown during afterDelivery then the * endpoint will be marked as dead so redelivery will fail (and eventually * the session marked as stale), in this case we can only call dispatch * which will create a new session with a new endpoint. * */ if (afterDeliveryError.get()) { ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); } else { executor.executeFirst(md); executor.start(); } } } } }, redeliveryDelay); } md.getMessage().onMessageRolledBack(); } }); } LOG.trace("{} onMessage({})", this, message.getMessageId()); //触发消息事件监听函数 messageListener.onMessage(message); } catch (Throwable e) { LOG.error("error dispatching message: ", e); // A problem while invoking the MessageListener does not // in general indicate a problem with the connection to the broker, i.e. // it will usually be sufficient to let the afterDelivery() method either // commit or roll back in order to deal with the exception. // However, we notify any registered client internal exception listener // of the problem. connection.onClientInternalException(e); } finally { //发送确认消息 if (ack.getTransactionId() == null) { try { asyncSendPacket(ack); } catch (Throwable e) { connection.onClientInternalException(e); } } } //触发投递事件监听函数 if (deliveryListener != null) { try { deliveryListener.afterDelivery(this, message); } catch (Throwable t) { LOG.debug("Unable to call after delivery", t); afterDeliveryError.set(true); throw new RuntimeException(t); } } } /* * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. * It also needs to be outside the redelivery guard. * */ try { executor.waitForQueueRestart(); } catch (InterruptedException ex) { connection.onClientInternalException(ex); } } }



      消费者和broker通讯终究实现消息确认,消息确认机制1共有5种,4种jms的和1种activemq补充的,AUTO_ACKNOWLEDGE(自动确认)、CLIENT_ACKNOWLEDGE(客户确认)、DUPS_OK_ACKNOWLEDGE(批量确认)、SESSION_TRANSACTED(事务确认)、INDIVIDUAL_ACKNOWLEDGE(单条确认),consumer在不同的模式下会发不同的命令到broker,broker再根据不同的命令进行操作,如果consumer正常发送ack命令给broker,broker会从topic移除消息并烧毁,如果未从消费者接遭到确认命令,broker会将消息转移到dlq队列(dead letter queue),并根据delivery mode进行重试或报异常。


       消息事务是在生产者producer到broker或broker到consumer进程中同1个session中产生的,保证几条消息在发送进程中的原子性。可以在connection的createSession方法中指定1个布尔值开启,如果消息确认机制是事务确认,那末在发送message的进程中session就会开启事务(实际上broker的),不用用户显示调用 beginTransaction,这时候所有通过session发送的消息都被缓存下来,用户调用session.commit时会发送所有消息,当发送出现异常时用户可以调用rollback进行回滚操作,只有在开启事务状态下有效。  最后附上1张他人画的activemq消息处理的流转图。
