在生产者通过channel的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看Channel接口,会发现存在3个重载的basicPublish方法
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;他们共有的参数分别是:
当mandatory标志位设置为true时,如果exchange根据本身类型和消息routingKey没法找到1个适合的queue存储消息,那末broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息抛弃;通俗的讲,mandatory标志告知broker代理服务器最少将消息route到1个队列中,否则就将消息return给发送者;
下面我们通过几个实例测试下mandatory标志的作用:
测试1:设置mandatory标志,且exchange未绑定队列
public class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmBindingKey"; int count = 3; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //创建生产者 Sender producer = new Sender(factory, count, exchangeName, routingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String routingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.routingKey = routingKey; } public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //发送持久化消息 for(int i = 0;i < count;i++) { //第1个参数是exchangeName(默许情况下代理服务器端是存在1个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第2个参数是路由键 channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes()); } } catch (Exception e) { e.printStackTrace(); } } }
第45行我们将basicPublish的第3个参数mandatory设置成了true,表示开启了mandatory标志,但我们没有为当前exchange绑定任何队列;
通过wireshark抓包看到下面输出:
可以看到最后履行了basic.return方法,将发布者发出的消息返还给了发布者,查看协议的Arguments参数部份可以看到,Reply-Text字段值为:NO_ROUTE,表示消息并没有路由到适合的队列中;
那末我们该怎样获得到没有被正确路由到适合队列的消息呢?这时候候可以通过为channel信道设置ReturnListener监听器来实现,具体实现代码见下:
public class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmBindingKey"; int count = 3; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //创建生产者 Sender producer = new Sender(factory, count, exchangeName, routingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String routingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.routingKey = routingKey; } public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //发送持久化消息 for(int i = 0;i < count;i++) { //第1个参数是exchangeName(默许情况下代理服务器端是存在1个""名字的exchange的, //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话 //我们需要将该参数设置成创建的exchange的名字),第2个参数是路由键 channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes()); } channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5) throws IOException { //此处便是履行Basic.Return以后回调的地方 String message = new String(arg5); System.out.println("Basic.Return返回的结果: "+message); } }); } catch (Exception e) { e.printStackTrace(); } } }在设置了ReturnListener监听器以后,broker(代理服务器)发出basic.return方法以后,就会回调第52行的handleReturn方法,在这个方法里面我们就能够进行消息的重新发布操作啦;
测试2:设置mandatory标志,且为exchange绑定队列(路由键和绑定键1致)
public class ProducerTest { public static void main(String[] args) { String exchangeName = "confirmExchange"; String queueName = "confirmQueue"; String routingKey = "confirmRoutingKey"; String bindingKey = "confirmRoutingKey"; //String bindingKey = "confirmBindingKey"; int count = 3; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("172.16.151.74"); factory.setUsername("test"); factory.setPassword("test"); factory.setPort(5672); //创建生产者 Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey); producer.run(); } } class Sender { private ConnectionFactory factory; private int count; private String exchangeName; private String queueName; private String routingKey; private String bindingKey; public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) { this.factory = factory; this.count = count; this.exchangeName = exchangeName; this.queueName = queueName; this.routingKey = routingKey; this.bindingKey = bindingKey; } public void run() { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建exchange channel.exchangeDeclare(exchangeName, "direct", true, false, null); //创建队列 channel.queueDeclare(queueName, true, false, false, null); //绑定exchange和queue channel.queueBind(queueName, exchangeName, bindingKey); //发送持久化消息 for(int i = 0;i < count;i++) { //第1个参数是exchangeName(默许情况下代理服务器端是存在1个""名字的exchange的, //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话 //我们需要将该参数设置成创建的exchange的名字),第2个参数是路由键 channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes()); } channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5) throws IOException { //此处便是履行Basic.Return以后回调的地方 String message = new String(arg5); System.out.println("Basic.Return返回的结果: "+message); } }); } catch (Exception e) { e.printStackTrace(); } } }
通过抓包发现其实不会有basic.return方法被调用,查看RabbitMQ管理界面发现消息已到达了队列;
测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不1致)
代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不1致的,此时我们运行程序,同时抓包得到下面截图:
注意1点,我们发送了3条消息,那末相应的应当履行3次basic.return,其中第1次和第2次basic.return显示在1行上了,第3次是单唯一行,不要误认为只履行了两次,从协议的具体返回内容里我们一样看到了Reply-Text字段值是NO_ROUTE,这类现象在测试1中已见过了;
到此,我们明白了mandatory标志的作用:在消息没有被路由到适合队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达适合的队列,测试1演示的是创建了exchange但是没有为他绑定队列致使的消息未到达适合队列,测试3演示的是创建了exchange同时创建了queue,但是在将二者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不1致致使的消息未到达适合队列;
参考资料:
RabbitMQ(2) AMQP协议mandatory和immediate标志位区分
RabbitMQ之mandatory