使用 Spring 集成和处理 Apache Kafka 数据管道

工程 | Josh Long | 2015 年 4 月 15 日 | ...

应用程序产生的数据比以往任何时候都多,而一个巨大的挑战(甚至在分析之前)就是首先容纳负载。 Apache 的 Kafka 解决了这一挑战。它最初由 LinkedIn 设计,并在 2011 年开源。该项目的目的是提供一个统一的、高吞吐量、低延迟的平台来处理实时数据馈送。该设计深受事务日志的影响。它是一个消息系统,类似于传统的 RabbitMQ、ActiveMQ、MQSeries 等消息系统,但它非常适合日志聚合、持久消息、快速(数百兆字节/秒!)读写,并且可以容纳众多客户端。当然,这使其非常适合云规模架构!

Kafka 为许多大型生产系统提供支持。LinkedIn 使用它来处理活动数据和运营指标,以支持 LinkedIn 新闻提要和 LinkedIn 今日,以及进入 Hadoop 的离线分析。Twitter 将其用作其流处理基础架构的一部分。Kafka 支持 Foursquare 的在线到在线和在线到离线消息传递。它用于将 Foursquare 监控和生产系统与基于 Hadoop 的离线基础设施集成。Square 使用 Kafka 作为总线,将所有系统事件通过 Square 的各个数据中心移动。这包括指标、日志、自定义事件等。在消费者方面,它输出到 Splunk、Graphite 或 Esper 等实时警报。Netflix 每天使用它处理 3000-6000 亿条消息。它还被 Airbnb、Mozilla、高盛、Tumblr、雅虎、PayPal、Coursera、Urban Airship、Hotels.com 以及其他众多大型网络明星使用。显然,它在一些强大的系统中发挥着重要作用!

安装 Apache Kafka

有许多不同的方法可以安装 Apache Kafka。如果您使用的是 OSX 并且使用的是 Homebrew,则可以简单地使用 brew install kafka。您还可以从 Apache 下载最新版本。我下载了 kafka_2.10-0.8.2.1.tgz,解压缩它,然后在其中您会发现 Apache Zookeeper 以及 Kafka 的发行版,因此无需其他任何操作。我将 Apache Kafka 安装在我的 $HOME 目录下的另一个目录 bin 中,然后我创建了一个环境变量 KAFKA_HOME,它指向 $HOME/bin/kafka

首先启动 Apache Zookeeper,指定它所需的配置文件位置

$KAFKA_HOME/bin/zookeeper-server-start.sh  $KAFKA_HOME/config/zookeeper.properties

Apache Kafka 发行版同时包含 Zookeeper 和 Kafka 的默认配置文件,这使得入门变得很容易。在更高级的使用场景中,您需要自定义这些文件。

然后启动 Apache Kafka。它也需要一个配置文件,如下所示

$KAFKA_HOME/bin/kafka-server-start.sh  $KAFKA_HOME/config/server.properties

server.properties 文件包含(除其他外)连接到 Apache Zookeeper(zookeeper.connect)的默认值、通过套接字发送的数据量、默认的分区数量以及代理 ID(broker.id - 在集群中必须唯一)。

同一目录中还有其他脚本可用于发送和接收虚拟数据,这在确定一切正常运行方面非常方便!

现在 Apache Kafka 已经启动并运行,让我们看看如何在应用程序中使用它。

一些高级概念..

Kafka 代理集群由一个或多个服务器组成,每个服务器可能运行一个或多个代理进程。Apache Kafka 被设计为高可用性;没有节点。所有节点都是可互换的。数据从一个节点复制到另一个节点以确保在发生故障时数据仍然可用。

在 Kafka 中,主题是一个类别,类似于 JMS 目标或 AMQP 交换和队列。主题被分区,消息生产者决定将消息发送到主题的哪个分区。分区中的每条消息都分配一个唯一的序列 ID,即其偏移量。更多分区允许更大的并行消费,但这也会导致代理上的更多文件。

生产者将消息发送到 Apache Kafka 代理主题并指定每个生产消息的分区。消息生产可以是同步的或异步的。生产者还指定他们想要的复制保证。

消费者监听主题上的消息并处理发布的消息流。正如您在使用其他消息系统时所期望的那样,这通常(并且很有用地!)是异步的。

Spring XD 和许多其他分布式系统一样,Apache Kafka 使用 Apache Zookeeper 来协调集群信息。Apache Zookeeper 提供了一个共享的层次命名空间(称为znodes),节点可以共享该命名空间以了解集群拓扑和可用性(这也是 Spring Cloud 将来支持它的另一个原因..)。

Zookeeper 在您与 Apache Kafka 的交互中非常重要。例如,Apache Kafka 具有两种不同的 API 用作消费者。更高级别的 API 更易于入门,它处理处理分区等所有细微差别。它需要引用一个 Zookeeper 实例来维护协调状态。

现在让我们转向使用 Spring 与 Apache Kafka 集成。

使用 Spring Integration 与 Apache Kafka 集成

最近发布的 用于 Apache Kafka 的 Spring Integration 1.1 功能非常强大,并提供了用于处理低级 Apache Kafka API 和高级别 API 的入站适配器。

目前,适配器首先使用 XML 配置,尽管适配器的 Spring Integration Java 配置 DSL 正在开发中,并且里程碑版本已可用。我们现在将同时介绍这两种方法。

为了使所有这些示例都能正常工作,我添加了 libs-milestone-local Maven 存储库 并使用了以下依赖项

  • org.apache.kafka:kafka_2.10:0.8.1.1
  • org.springframework.boot:spring-boot-starter-integration:1.2.3.RELEASE
  • org.springframework.boot:spring-boot-starter:1.2.3.RELEASE
  • org.springframework.integration:spring-integration-kafka:1.1.1.RELEASE
  • org.springframework.integration:spring-integration-java-dsl:1.1.0.M1

使用 Spring Integration XML DSL 与 Apache Kafka 集成

首先,让我们看看如何使用 Spring Integration 出站适配器将 Message<T> 实例从 Spring Integration 流发送到外部 Apache Kafka 实例。该示例相当简单:名为 inputToKafka 的 Spring Integration channel 充当管道,将 Message<T> 消息转发到出站适配器 kafkaOutboundChannelAdapter。适配器本身可以从 kafka:producer-context 元素中指定的默认值或适配器本地配置覆盖中获取其配置。给定的 kafka:producer-context 元素中可能存在一个或多个配置。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                                              topic="event-stream"
                                              compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

</beans>

这是来自 Spring Boot 应用程序的 Java 代码,通过将消息发送到传入的 inputToKafka MessageChannel 来触发使用出站适配器发送消息。

package xml;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

@SpringBootApplication
@EnableIntegration
@ImportResource("/xml/outbound-kafka-integration.xml")
public class DemoApplication {

    private Log log = LogFactory.getLog(getClass());

    @Bean
    @DependsOn("kafkaOutboundChannelAdapter")
    CommandLineRunner kickOff(@Qualifier("inputToKafka") MessageChannel in) {
        return args -> {
            for (int i = 0; i < 1000; i++) {
                in.send(new GenericMessage<>("#" + i));
                log.info("sending message #" + i);
            }
        };
    }

    public static void main(String args[]) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

使用新的 Apache Kafka Spring Integration Java 配置 DSL

在 Spring Integration 1.1 发布后不久,Spring Integration 明星 Artem Bilan 开始 添加 Spring Integration Java 配置 DSL 模拟,结果令人惊叹!它尚未 GA(您现在需要添加 libs-milestone 存储库),但我鼓励您尝试一下并进行测试。它对我来说运行良好,Spring Integration 团队始终渴望在任何时候获得早期反馈!这是一个示例,它演示了从两个不同的 IntegrationFlow 发送消息和消费消息。生产者类似于上面的 XML 示例。

本例的新增内容是轮询消费者。它是以批处理为中心的,并且会以固定的时间间隔拉取所有它看到的消息。在我们的代码中,接收到的消息将是一个映射,其键为主题,值为另一个映射,该映射包含分区 ID 和读取的批次(在本例中为 10 条记录)的记录。还有一种基于MessageListenerContainer的替代方案,可以按消息到达的顺序处理消息。

package jc;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.integration.dsl.kafka.KafkaHighLevelConsumerMessageSourceSpec;
import org.springframework.integration.dsl.kafka.KafkaProducerMessageHandlerSpec;
import org.springframework.integration.dsl.support.Consumer;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
 * Demonstrates using the Spring Integration Apache Kafka Java Configuration DSL.
 * Thanks to Spring Integration ninja <a href="https://springjava.cn/team/artembilan">Artem Bilan</a>
 * for getting the Java Configuration DSL working so quickly!
 *
 * @author Josh Long
 */
@EnableIntegration
@SpringBootApplication
public class DemoApplication {

  public static final String TEST_TOPIC_ID = "event-stream";

  @Component
  public static class KafkaConfig {

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
    private String topic;

    @Value("${kafka.address:localhost:9092}")
    private String brokerAddress;

    @Value("${zookeeper.address:localhost:2181}")
    private String zookeeperAddress;

    KafkaConfig() {
    }

    public KafkaConfig(String t, String b, String zk) {
        this.topic = t;
        this.brokerAddress = b;
        this.zookeeperAddress = zk;
    }

    public String getTopic() {
        return topic;
    }

    public String getBrokerAddress() {
        return brokerAddress;
    }

    public String getZookeeperAddress() {
        return zookeeperAddress;
    }
  }

  @Configuration
  public static class ProducerConfiguration {

    @Autowired
    private KafkaConfig kafkaConfig;

    private static final String OUTBOUND_ID = "outbound";

    private Log log = LogFactory.getLog(getClass());

    @Bean
    @DependsOn(OUTBOUND_ID)
    CommandLineRunner kickOff( 
           @Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
        return args -> {
            for (int i = 0; i < 1000; i++) {
                in.send(new GenericMessage<>("#" + i));
                log.info("sending message #" + i);
            }
        };
    }

    @Bean(name = OUTBOUND_ID)
    IntegrationFlow producer() {

      log.info("starting producer flow..");
      return flowDefinition -> {

        Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> spec =
          (KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata)->
            metadata.async(true)
              .batchNumMessages(10)
              .valueClassType(String.class)
              .<String>valueEncoder(String::getBytes);

        KafkaProducerMessageHandlerSpec messageHandlerSpec =
          Kafka.outboundChannelAdapter(
               props -> props.put("queue.buffering.max.ms", "15000"))
            .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .addProducer(this.kafkaConfig.getTopic(), 
                this.kafkaConfig.getBrokerAddress(), spec);
        flowDefinition
            .handle(messageHandlerSpec);
      };
    }
  }

  @Configuration
  public static class ConsumerConfiguration {

    @Autowired
    private KafkaConfig kafkaConfig;

    private Log log = LogFactory.getLog(getClass());

    @Bean
    IntegrationFlow consumer() {

      log.info("starting consumer..");

      KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
          new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
            .consumerProperties(props ->
                props.put("auto.offset.reset", "smallest")
                     .put("auto.commit.interval.ms", "100"))
            .addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
              .topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
              .maxMessages(10)
              .valueDecoder(String::new));

      Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));

      return IntegrationFlows
        .from(messageSourceSpec, endpointConfigurer)
        .<Map<String, List<String>>>handle((payload, headers) -> {
            payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue()));
            return null;
        })
        .get();
    }
  }

  public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
  }
}

此示例大量使用了 Java 8 lambda 表达式。

生产者花费一些时间来确定在单个发送操作中将发送多少条消息,键和值如何编码(毕竟 Kafka 只知道byte[]数组)以及是否应同步或异步发送消息。在下一行,我们配置了出站适配器本身,然后定义了一个IntegrationFlow,以便所有消息都通过 Kafka 出站适配器发送出去。

消费者花费一些时间来确定要连接到哪个 Zookeeper 实例,每个批次要接收多少条消息(10 条)等。一旦收到消息批次,它们就会传递给handle方法,在该方法中我传递了一个 lambda 表达式,该表达式将枚举有效负载的主体并将其打印出来。没有什么花哨的。

使用 Spring XD 与 Apache Kafka

Apache Kafka 是一种消息总线,用作集成总线时非常强大。但是,它真正脱颖而出是因为它足够快且可扩展,可以用于将大数据路由到处理管道中。如果您正在进行数据处理,您真的需要 Spring XD!Spring XD 使使用 Apache Kafka(因为支持构建在 Apache Kafka Spring Integration 适配器上!)在复杂的流处理管道中变得非常简单。Apache Kafka 作为 Spring XD 的(数据来自的地方)和接收器(数据去往的地方)公开。

Spring XD 公开了用于创建类似 bash 的管道和过滤器流的超级便捷 DSL。Spring XD 是一个集中式运行时,用于管理、扩展和监视数据处理作业。它构建在 Spring Integration、Spring Batch、Spring Data 和 Spring for Hadoop 之上,成为一站式数据处理中心。Spring XD 作业从读取数据,将其通过可能对数据进行计数、过滤、丰富或转换的处理组件运行,然后将其写入接收器。

Spring Integration 和 Spring XD 专家 Marius Bogoevici(他在 Spring Integration 和 Spring XD 中完成了 Apache Kafka 的大部分近期工作)构建了一个非常好的示例,演示了如何使完整的 Spring XD 和 Kafka 流正常工作README 将引导您完成 Apache Kafka、Spring XD 和必需主题的设置。但是,本质上,当您使用 Spring XD shell 和 shell DSL 来组合流时。Spring XD 组件是预配置的命名组件,但具有许多参数,您可以使用 XD shell 和 DSL 通过--..参数覆盖这些参数。(顺便说一句,该 DSL 由 Spring 表达式语言的名人 Andy Clement 编写!)这是一个配置流以从 Apache Kafka 源读取数据,然后将消息写入名为log的组件(这是一个接收器)的示例。在这种情况下,log 可以是 syslogd、Splunk、HDFS 等。

xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=event-stream | log" --deploy

就是这样!当然,这只是 Spring XD 的一个尝试,但希望您会同意其可能性令人着迷。

使用 Lattice 和 Docker 部署 Kafka 服务器

使用 Lattice(一个支持包括非常流行的 Docker 镜像格式在内的其他容器格式的分布式运行时)轻松设置示例 Kafka 安装。 Spotify 提供了一个 Docker 镜像,用于设置并置的 Zookeeper 和 Kafka 镜像。您可以轻松地将其部署到 Lattice 集群,如下所示

ltc create --run-as-root m-kafka spotify/kafka

从那里,您可以轻松地扩展 Apache Kafka 实例,更轻松地从您的云服务中使用 Apache Kafka。

后续步骤

您可以在我的 GitHub 帐户上找到此博客的代码

我们只是触及了表面!

如果您想了解更多信息(为什么不呢?),请务必查看 Marius Bogoevici 和 Mark Pollack 博士即将举行的关于使用 Spring XD 和 Apache Kafka 的反应式数据管道的网络研讨会,他们将在其中演示使用 RxJava、Spring XD 和 Apache Kafka 变得多么容易!

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部