抢占先机
VMware 提供培训和认证,以加速您的进步。
了解更多大家好,Spring 爱好者们!在本期 Spring Tips 中,我们将研究阿里巴巴的 Apache RocketMQ。 我们之前已经在 Spring Tips 中讨论过阿里巴巴。 请查看之前的 Spring Tips,我们探索了一些 Spring Cloud Alibaba。
为了使用 Apache RocketMQ,您需要按照 RocketMQ 快速入门中的步骤操作。 本期 Spring Tips 介绍了 Apache RocketMQ,它最初是阿里巴巴内部开发和使用的技术,并在著名的中国销售节日 11/11 的熔炉中得到验证,有点像美国的“网络星期一”或“黑色星期五”。 有点像那样,但规模大得多。 2019 年,阿里巴巴(单独,没有其他电子商务引擎参与)在 24 小时内创造了近 400 亿美元的收入。 这需要发送数万亿条消息,并且这些消息能够扩展以满足需求。 RocketMQ 是他们唯一可以信任的东西。
运行 Apache RocketMQ 时,您需要使用 Java 8。(当然,在编写连接到 Apache RocketMQ 的 Spring 应用程序时,您可以使用任何版本的 Java。)我使用 SDK Manager(“SDKman” - sdk
)来切换到合适的 Java 版本。
sdk use java 8.0.242.hs-adpt
如果尚未安装,这将安装一个可用的版本。 完成后,您需要运行 NameServer。
${ROCKETMQ_HOME}/bin/mqnamesrv
然后你需要运行 Broker 本身。
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876
如果要使用基于 SQL 的过滤,需要在 Broker 的配置 $ROCKETMQ_HOME/conf/broker.conf
中添加一个属性,然后告诉 RocketMQ 使用该配置。
enablePropertyFilter = true
我使用如下脚本来启动所有内容。
export JAVA_HOME=$HOME/.sdkman/candidates/java/8.0.242.hs-adpt
${ROCKETMQ_HOME}/bin/mqnamesrv &
${ROCKETMQ_HOME}/bin/mqbroker -n localhost:9876 -c ${ROCKETMQ_HOME}/conf/broker.conf
让我们看一个简单的生产者类,它使用 Spring Boot 自动配置和 RocketMQTemplate
。
为了使用它,你需要在 Spring Initializr 上创建一个新项目。我生成了一个使用最新 Java 版本的新项目,然后我确保包含 Lombok
。我们还需要 Apache RocketMQ 客户端和合适的 Spring Boot 自动配置
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
自动配置将创建与正在运行的 Apache RocketMQ Broker 的连接,该连接由某些属性告知。
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=greetings-producer-group
第一个属性 name-server
告诉应用程序 Apache RocketMQ 名称服务器的位置。 然后,名称服务器知道 Broker 的位置。 你还需要为生产者和消费者指定一个组。 在这里,我们使用 greetings-producer-group
。
package com.example.producer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.support.MessageBuilder;
import java.time.Instant;
@RequiredArgsConstructor
@SpringBootApplication
public class ProducerApplication {
@Bean
ApplicationListener<ApplicationReadyEvent> ready(RocketMQTemplate template) {
return event -> {
var now = Instant.now();
var destination = "greetings-topic";
for (var name : "Tammie,Kimly,Josh,Rob,Mario,Mia".split(",")) {
var payload = new Greeting("Hello @ " + name + " @ " + now.toString());
var messagePostProcessor = new MessagePostProcessor() {
@Override
public Message<?> postProcessMessage(Message<?> message) {
var headerValue = Character.toString(name.toLowerCase().charAt(0));
return MessageBuilder
.fromMessage(message)
.setHeader("letter", headerValue)
.build();
}
};
template.convertAndSend(destination, payload, messagePostProcessor);
}
};
}
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
private String message;
}
我不知道是否还有比这更简单的了! 这是一个简单的 for 循环,处理每个名称,创建一个新的 Greeting
对象,然后使用 RocketMQTemplate
将有效负载发送到 Apache RocketMQ 主题 greetings-topic
。 在这里,我们使用了 RocketMQTemplate
对象的重载,它接受一个 MessagePostProcessor
。 MessagePostProcessor
是一个回调,我们可以在其中转换将要发送的 Spring Framework Message
对象。 在这个例子中,我们贡献了一个头部值 letter
,它包含名称的第一个字母。 我们将在消费者中使用它。
让我们看看消费者。 从 Spring Initializr 生成一个新的 Spring Boot 应用程序,并确保添加 Apache RocketMQ 自动配置。 你还需要在客户端的 application.properties
中指定名称服务器。
自动配置支持定义实现 RocketMQListener<T>
的 Bean,其中 T
是消费者将接收的有效负载的类型。 在这种情况下,有效负载是 Greeting
。
package com.example.consumer;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.spring.annotation.SelectorType.SQL92;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
class Greeting {
private String message;
}
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
consumerGroup = "simple-group"
)
class SimpleConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info(greeting.toString());
}
}
在这个例子中,SimpleConsumer
只是记录来自 Apache RocketMQ 中的 greetings-topic
主题的所有传入消息。 在这里,消费者将处理主题上的每条消息。 让我们看看另一个很棒的功能 - 选择器 - 它允许我们有选择地处理传入的消息。 让我们用两个新的监听器替换现有的 RocketMQ 监听器。 每个监听器将使用与 SQL92 兼容的谓词来确定是否应该处理传入的消息。 一个监听器仅处理 letter
头部与 m
、k
或 t
匹配的消息。 另一个只匹配那些 letter
头部与 j
匹配的消息。
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
selectorExpression = " letter = 'm' or letter = 'k' or letter = 't' ",
selectorType = SQL92,
consumerGroup = "sql-consumer-group-mkt"
)
class MktSqlSelectorConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info("'m', 'k', 't': " + greeting.toString());
}
}
@Log4j2
@Service
@RocketMQMessageListener(
topic = "greetings-topic",
selectorExpression = " letter = 'j' ",
selectorType = SQL92,
consumerGroup = "sql-consumer-group-j"
)
class JSqlSelectorConsumer implements RocketMQListener<Greeting> {
@Override
public void onMessage(Greeting greeting) {
log.info("'j': " + greeting.toString());
}
}
还不错吧? 除了在 24 小时内处理数万亿条消息外,Apache RocketMQ 还支持很多其他事情! 它可以将长尾消息存储在磁盘上,而不会降低性能。 它支持消息的序列化(排序)、事务、批处理等。 它甚至支持预定的消息 - 仅在特定时间间隔后传递的消息。 不用说,我是 Apache RocketMQ 的忠实粉丝。