过滤器示例
在大多数情况下,标签是一个简单而有用的设计来选择你想要的信息。 例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将收到包含TAGA或TAGB或TAGC的消息。 但限制是一条消息只能有一个标签,而这对于复杂的情况可能不适用。 在这种情况下,您可以使用SQL表达式来过滤消息。
原则
SQL功能可以通过发送消息时放入的属性进行一些计算。 在RocketMQ定义的语法下,您可以实现一些有趣的逻辑。 这里是一个例子:
------------
| message |
|----------| a
>
5 AND b = 'abc'
| a = 10 | --------------------
>
Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a
>
5 AND b = 'abc'
| a = 1 | --------------------
>
Missed
| b = 'abc'|
| c = true |
------------
语法
RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它。
- 数字比较的,比如: >, >=, <, <=,BETEEN, = ;
- 字符比较的,比如:=, <>, IN;
IS NULL或者IS NOT NULL- 逻辑
AND,OR,NOT;
常量类型是:
- 数字,如123,3.1415;
- 字符,如'abc',必须用单引号。
- NULL,特殊常量;
- 布尔值,TRUE或FALSE;
使用限制:
只有消费者可以通过SQL92选择消息。 界面是:
public void subscribe(final String topic, final MessageSelector messageSelector)
Producer example
You can put properties in message through methodputUserPropertywhen sending.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown();
Consumer example
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();