使用 Apache Kafka 构建集成和数据处理管道,结合 Spring

工程 | Josh Long | 2015 年 4 月 15 日 | ...

应用程序生成的数据量比以往任何时候都多,而在分析之前,应对负载本身就是一个巨大的挑战。Apache Kafka 应对了这一挑战。它最初由 LinkedIn 设计,随后于 2011 年开源。该项目旨在为处理实时数据流提供一个统一、高吞吐量、低延迟的平台。其设计深受事务日志的影响。它是一个消息系统,类似于传统的 RabbitMQ、ActiveMQ、MQSeries 等消息系统,但它非常适合日志聚合、持久消息、快速(每秒**数百**兆字节!)读写,并且可以容纳众多客户端。当然,这使得它**非常适合**云规模的架构!

Kafka 为许多大型生产系统提供支持。LinkedIn 将其用于活动数据和操作指标,为 LinkedIn 新闻源和 LinkedIn Today 以及进入 Hadoop 的离线分析提供动力。Twitter 将其用作流处理基础设施的一部分。Kafka 为 Foursquare 的在线到在线和在线到离线消息提供支持。它用于将 Foursquare 监控和生产系统与基于 Hadoop 的离线基础设施集成。Square 使用 Kafka 作为总线,将所有系统事件通过 Square 的各种数据中心移动。这包括指标、日志、自定义事件等。在消费者端,它输出到 Splunk、Graphite 或类似 Esper 的实时警报系统。Netflix 每天使用它处理 300-600 亿条消息。Airbnb、Mozilla、Goldman Sachs、Tumblr、Yahoo、PayPal、Coursera、Urban Airship、Hotels.com 以及其他看似无穷无尽的网络巨头也在使用它。显然,它在一些强大的系统中发挥着重要作用!

安装 Apache Kafka

安装 Apache Kafka 有很多不同的方法。如果您使用 OSX 并安装了 Homebrew,只需运行 brew install kafka 即可。您也可以从 Apache 下载最新的分发版本。我下载了 kafka_2.10-0.8.2.1.tgz,解压后,您会发现其中包含 Apache Zookeeper 和 Kafka 的分发版本,因此不需要其他任何东西。我将 Apache Kafka 安装在我的 $HOME 目录下的 bin 子目录中,然后创建了一个指向 $HOME/bin/kafka 的环境变量 KAFKA_HOME

首先启动 Apache Zookeeper,指定其所需配置属性文件的位置:

$KAFKA_HOME/bin/zookeeper-server-start.sh  $KAFKA_HOME/config/zookeeper.properties

Apache Kafka 分发版本自带 Zookeeper 和 Kafka 的默认配置文件,这使得入门变得简单。在更高级的用例中,您需要自定义这些文件。

然后启动 Apache Kafka。它同样需要一个配置文件,如下所示:

$KAFKA_HOME/bin/kafka-server-start.sh  $KAFKA_HOME/config/server.properties

server.properties 文件包含(除其他内容外)连接到 Apache Zookeeper 的默认值 (zookeeper.connect)、应通过套接字发送多少数据、默认有多少分区以及代理 ID (broker.id - 在集群中必须唯一) 等信息。

同一目录中还有其他脚本可用于发送和接收虚拟数据,这对于确认一切正常运行非常方便!

现在 Apache Kafka 已经启动并运行,让我们看看如何在我们的应用程序中使用 Apache Kafka。

一些高级概念...

一个 Kafka **broker** 集群由一个或多个服务器组成,每个服务器可以运行一个或多个 broker 进程。Apache Kafka 设计为高可用;没有**主**节点。所有节点都可以互换。数据从一个节点复制到另一个节点,以确保在发生故障时数据仍然可用。

在 Kafka 中,一个**主题(topic)**是一个类别,类似于 JMS 目的地,或者同时类似于 AMQP 交换器和队列。主题是分区的,消息生产者决定消息应发送到主题的哪个分区。分区中的每条消息都被分配一个唯一的序列 ID,即其**偏移量(offset)**。更多的分区允许消费者更大的并行性,但这也会导致 broker 上产生更多文件。

**生产者(Producers)**将消息发送到 Apache Kafka broker 主题,并为他们生成的每条消息指定要使用的分区。消息生产可以是同步的或异步的。生产者还指定他们想要什么样的复制保证。

**消费者(Consumers)**监听主题上的消息并处理已发布消息流。正如您使用过其他消息系统所预期的那样,这通常(而且很有用!)是异步的。

Spring XD 以及许多其他分布式系统一样,Apache Kafka 使用 Apache Zookeeper 来协调集群信息。Apache Zookeeper 提供了一个共享的层次结构命名空间(称为**znodes**),节点可以共享该命名空间以了解集群拓扑和可用性(这也是 Spring Cloud 即将支持它的另一个原因...)。

Zookeeper 在您与 Apache Kafka 的交互中非常重要。例如,Apache Kafka 提供了两种不同的 API 来充当消费者。更高级别的 API 入门更简单,它可以处理分区等所有细节。它需要引用一个 Zookeeper 实例来维护协调状态。

现在让我们看看如何在 Spring 中使用 Apache Kafka。

结合 Spring Integration 使用 Apache Kafka

最近发布的 Spring Integration for Apache Kafka 1.1 非常强大,它提供了入站适配器,用于使用较低级别的 Apache Kafka API 以及较高级别的 API。

目前,该适配器主要采用 XML 配置,尽管 Spring Integration Java 配置 DSL 的工作正在进行中,并且里程碑版本已经可用。我们现在将在这里同时介绍这两种方式。

为了使所有这些示例都能正常工作,我添加了 libs-milestone-local Maven 仓库并使用了以下依赖项:

  • org.apache.kafka:kafka_2.10:0.8.1.1
  • org.springframework.boot:spring-boot-starter-integration:1.2.3.RELEASE
  • org.springframework.boot:spring-boot-starter:1.2.3.RELEASE
  • org.springframework.integration:spring-integration-kafka:1.1.1.RELEASE
  • org.springframework.integration:spring-integration-java-dsl:1.1.0.M1

结合 Spring Integration XML DSL 使用 Spring Integration Apache Kafka

首先,让我们看看如何使用 Spring Integration 出站适配器将 Spring Integration 流中的 Message<T> 实例发送到外部 Apache Kafka 实例。该示例非常简单:一个名为 inputToKafka 的 Spring Integration channel 充当管道,将 Message<T> 消息转发到出站适配器 kafkaOutboundChannelAdapter。适配器本身可以从 kafka:producer-context 元素中指定的默认配置获取其配置,也可以从适配器本地的配置覆盖中获取。给定的 kafka:producer-context 元素中可以有一个或多个配置。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                                              topic="event-stream"
                                              compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

</beans>

以下是 Spring Boot 应用程序中的 Java 代码,通过将消息发送到传入的 inputToKafka MessageChannel 来使用出站适配器触发消息发送。

package xml;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

@SpringBootApplication
@EnableIntegration
@ImportResource("/xml/outbound-kafka-integration.xml")
public class DemoApplication {

    private Log log = LogFactory.getLog(getClass());

    @Bean
    @DependsOn("kafkaOutboundChannelAdapter")
    CommandLineRunner kickOff(@Qualifier("inputToKafka") MessageChannel in) {
        return args -> {
            for (int i = 0; i < 1000; i++) {
                in.send(new GenericMessage<>("#" + i));
                log.info("sending message #" + i);
            }
        };
    }

    public static void main(String args[]) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

使用新的 Apache Kafka Spring Integration Java 配置 DSL

Spring Integration 1.1 版本发布后不久,Spring Integration 的明星人物 Artem Bilan 就开始 着手添加 Spring Integration Java 配置 DSL 的类似功能,结果非常出色!它尚未正式发布(您目前需要添加 libs-milestone 仓库),但我鼓励您尝试一下。对我来说它工作得很好,而且 Spring Integration 团队总是非常乐于在可能的情况下获得早期反馈!这里有一个示例,演示了从两个不同的 IntegrationFlow 发送和消费消息。生产者与上面的 XML 示例类似。

此示例中的新功能是轮询消费者。它是面向批处理的,将在固定间隔拉取其看到的所有消息。在我们的代码中,收到的消息将是一个 Map,其键是主题,值是另一个 Map,其中包含分区 ID 和读取记录的批次(在此例中为 10 条记录)。还有一种基于 MessageListenerContainer 的替代方案,它可以处理传入的消息。

package jc;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.integration.dsl.kafka.KafkaHighLevelConsumerMessageSourceSpec;
import org.springframework.integration.dsl.kafka.KafkaProducerMessageHandlerSpec;
import org.springframework.integration.dsl.support.Consumer;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
 * Demonstrates using the Spring Integration Apache Kafka Java Configuration DSL.
 * Thanks to Spring Integration ninja <a href="https://springjava.cn/team/artembilan">Artem Bilan</a>
 * for getting the Java Configuration DSL working so quickly!
 *
 * @author Josh Long
 */
@EnableIntegration
@SpringBootApplication
public class DemoApplication {

  public static final String TEST_TOPIC_ID = "event-stream";

  @Component
  public static class KafkaConfig {

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
    private String topic;

    @Value("${kafka.address:localhost:9092}")
    private String brokerAddress;

    @Value("${zookeeper.address:localhost:2181}")
    private String zookeeperAddress;

    KafkaConfig() {
    }

    public KafkaConfig(String t, String b, String zk) {
        this.topic = t;
        this.brokerAddress = b;
        this.zookeeperAddress = zk;
    }

    public String getTopic() {
        return topic;
    }

    public String getBrokerAddress() {
        return brokerAddress;
    }

    public String getZookeeperAddress() {
        return zookeeperAddress;
    }
  }

  @Configuration
  public static class ProducerConfiguration {

    @Autowired
    private KafkaConfig kafkaConfig;

    private static final String OUTBOUND_ID = "outbound";

    private Log log = LogFactory.getLog(getClass());

    @Bean
    @DependsOn(OUTBOUND_ID)
    CommandLineRunner kickOff( 
           @Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
        return args -> {
            for (int i = 0; i < 1000; i++) {
                in.send(new GenericMessage<>("#" + i));
                log.info("sending message #" + i);
            }
        };
    }

    @Bean(name = OUTBOUND_ID)
    IntegrationFlow producer() {

      log.info("starting producer flow..");
      return flowDefinition -> {

        Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> spec =
          (KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata)->
            metadata.async(true)
              .batchNumMessages(10)
              .valueClassType(String.class)
              .<String>valueEncoder(String::getBytes);

        KafkaProducerMessageHandlerSpec messageHandlerSpec =
          Kafka.outboundChannelAdapter(
               props -> props.put("queue.buffering.max.ms", "15000"))
            .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .addProducer(this.kafkaConfig.getTopic(), 
                this.kafkaConfig.getBrokerAddress(), spec);
        flowDefinition
            .handle(messageHandlerSpec);
      };
    }
  }

  @Configuration
  public static class ConsumerConfiguration {

    @Autowired
    private KafkaConfig kafkaConfig;

    private Log log = LogFactory.getLog(getClass());

    @Bean
    IntegrationFlow consumer() {

      log.info("starting consumer..");

      KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
          new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
            .consumerProperties(props ->
                props.put("auto.offset.reset", "smallest")
                     .put("auto.commit.interval.ms", "100"))
            .addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
              .topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
              .maxMessages(10)
              .valueDecoder(String::new));

      Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));

      return IntegrationFlows
        .from(messageSourceSpec, endpointConfigurer)
        .<Map<String, List<String>>>handle((payload, headers) -> {
            payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue()));
            return null;
        })
        .get();
    }
  }

  public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
  }
}

该示例大量使用了 Java 8 的 lambda 表达式。

生产者花了一些时间确定单次发送操作将发送多少消息,如何编码键和值(毕竟 Kafka 只知道 byte[] 数组),以及消息是同步发送还是异步发送。在下一行,我们配置出站适配器本身,然后定义一个 IntegrationFlow,以便所有消息都通过 Kafka 出站适配器发送出去。

消费者花了一些时间确定要连接哪个 Zookeeper 实例,在一个批次中接收多少条消息(10 条)等。一旦收到消息批次,它们就会被传递到 handle 方法,我在其中传递了一个 lambda 表达式,它将遍历 payload 的 body 并打印出来。没什么特别的。

结合 Spring XD 使用 Apache Kafka

Apache Kafka 是一个消息总线,当用作集成总线时,它非常强大。然而,它的真正价值在于它足够快且具有足够的伸缩性,可以用于在处理管道中路由大数据。如果您正在进行数据处理,那么您绝对需要 Spring XD!Spring XD 使在复杂的流处理管道中使用 Apache Kafka 变得非常简单(因为其支持是基于 Apache Kafka Spring Integration 适配器构建的!)。Apache Kafka 被暴露为 Spring XD 的一个**源(source)**——数据来源地,以及一个**汇(sink)**——数据目的地。

Spring XD 提供了一个非常方便的 DSL,用于创建类似于 bash 的管道-过滤器流。Spring XD 是一个集中的运行时,用于管理、伸缩和监控数据处理作业。它构建于 Spring Integration、Spring Batch、Spring Data 和 Spring for Hadoop 之上,是一个一站式数据处理平台。Spring XD 作业从**源(sources)**读取数据,通过可能计数、过滤、丰富或转换数据的处理组件运行它们,然后将它们写入汇(sinks)。

Spring Integration 和 Spring XD 的大师级人物 Marius Bogoevici,他在 Spring Integration 和 Spring XD 实现 Apache Kafka 的最近工作中贡献良多,整理了一个非常好的示例,演示了 如何建立一个完整可用的 Spring XD 和 Kafka 流README 文件将引导您完成设置 Apache Kafka、Spring XD 和所需主题的整个过程。然而,关键在于使用 Spring XD shell 和 shell DSL 来组合流。Spring XD 组件是预配置的命名组件,但有许多参数可以通过 XD shell 和 DSL 的 --.. 参数进行覆盖。(顺便说一句,这个 DSL 是由著名的 Spring Expression language 作者,杰出的 Andy Clement 编写的!)这里有一个示例,配置了一个流,用于从 Apache Kafka 源读取数据,然后将消息写入一个名为 log 的组件,这是一个汇(sink)。在这种情况下,log 可以是 syslogd、Splunk、HDFS 等。

xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=event-stream | log" --deploy

就是这样!当然,这只是 Spring XD 的一个初步体验,但希望您会同意它的可能性是诱人的。

使用 Lattice 和 Docker 部署 Kafka 服务器

使用 Lattice(一个支持流行 Docker 镜像格式等多种容器格式的分布式运行时)可以轻松设置一个示例 Kafka 安装。Spotify 提供了一个 Docker 镜像,它设置了一个共存的 Zookeeper 和 Kafka 镜像。您可以轻松地将其部署到 Lattice 集群,如下所示:

ltc create --run-as-root m-kafka spotify/kafka

从那里,您可以轻松地伸缩 Apache Kafka 实例,并且更轻松地从基于云的服务中消费 Apache Kafka。

后续步骤

您可以在我的 GitHub 账户上找到此博客的代码

我们只触及了皮毛!

如果您想了解更多(为什么不呢?),那么务必查看 Marius Bogoevici 和 Mark Pollack 博士即将举行的关于使用 Spring XD 和 Apache Kafka 构建响应式数据管道的网络研讨会,他们将在其中演示使用 RxJava、Spring XD 和 Apache Kafka 是多么容易!

获取 Spring 时事通讯

通过 Spring 时事通讯保持联系

订阅

抢先一步

VMware 提供培训和认证,助您加速前进。

了解更多

获取支持

Tanzu Spring 通过一项简单订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部