当大数据运动开始时,它主要集中在批处理上。分布式数据存储和查询工具(如MapReduce,Hive和Pig)都旨在分批处理数据而不是连续处理数据。企业每晚都会运行多个作业,从数据库中提取数据,然后分析,转换并最终存储数据。最近,企业发现了分析和处理数据和事件的能力,而不是每隔几个小时就会发生一次。然而,大多数传统的消息传递系统不能扩展以实时处理大数据。所以LinkedIn的工程师构建并开源Apache Kafka:一种分布式消息传递框架,通过扩展商用硬件来满足大数据的需求。

在过去几年中,Apache Kafka已经出现,以解决各种情况。在最简单的情况下,它可以是用于存储应用程序日志的简单缓冲区。结合Spark Streaming等技术,它可用于跟踪数据更改并对数据执行操作,然后将其保存到最终目标。Kafka的预测模式使其成为检测欺诈的有力工具,例如在信用卡交易发生时检查信用卡交易的有效性,而不是等待数小时后的批处理。

这个由两部分组成的教程介绍了Kafka,从如何在开发环境中安装和运行它开始。您将了解Kafka的架构,然后介绍如何开发开箱即用的Apache Kafka消息传递系统。最后,您将构建一个自定义生产者/消费者应用程序,通过Kafka服务器发送和使用消息。在本教程的后半部分,您将学习如何对消息进行分区和分组,以及如何控制Kafka消费者将使用哪些消息。

什么是Apache Kafka?

Apache Kafka是为大数据扩展而构建的消息传递系统。与Apache ActiveMQRabbitMq类似,Kafka使构建在不同平台上的应用程序能够通过异步消息传递进行通信。但Kafka与这些更传统的消息传递系统的关键方式不同:

  • 它旨在通过添加更多服务器来横向扩展。
  • 它为生产者和消费者流程提供了更高的吞吐量。
  • 它可用于支持批处理和实时用例。
  • 它不支持Java的面向消息的中间件API JMS

Apache Kafka的架构

在我们探索Kafka的架构之前,您应该了解它的基本术语:

  • producer是将消息发布到主题的一个过程。
  • consumer是订阅一个或多个主题并且消费发布到主题的消息的过程。
  • topic是消息发布的主题的名称。
  • broker是在一台机器上运行的进程。
  • cluster是一起工作的一组broker

Apache Kafka的架构非常简单,可以在某些系统中实现更好的性能和吞吐量。Kafka中的每个topic都像一个简单的日志文件。当生产者发布消息时,Kafka服务器会将其附加到其给定topic的日志文件的末尾。服务器还分配一个偏移量,该偏移量是用于永久识别每条消息的数字。随着消息数量的增加,每个偏移量的值增加; 例如,如果生产者发布三条消息,第一条消息可能获得偏移量1,第二条消息偏移量为2,第三条偏移量为3。

当Kafka消费者首次启动时,它将向服务器发送拉取请求,要求检索偏移值大于0的特定topic的任何消息。服务器将检查该topic的日志文件并返回三个新消息。消费者将处理消息,然后发送偏移量大于3的消息请求,依此类推。

在Kafka中,客户端负责记住偏移计数和检索消息.Kafka服务器不跟踪或管理消息消耗。默认情况下,Kafka服务器将保留七天的消息。服务器中的后台线程检查并删除七天或更早的消息。只要消息在服务器上,消费者就可以访问消息。它可以多次读取消息,甚至可以按收到的相反顺序读取消息。但是,如果消费者在七天之前未能检索到消息,那么它将错过该消息。

LinkedIn和其他企业的生产使用表明,通过适当的配置,Apache Kafka每天能够处理数百GB的数据。2011年,三位LinkedIn工程师使用基准测试来证明Kafka可以实现比ActiveMQ和RabbitMQ更高的吞吐量。

Apache Kafka快速设置和演示

我们将在本教程中构建一个自定义应用程序,但让我们首先安装和测试一个开箱即用的生产者和消费者的Kafka实例。

  1. 访问Kafka下载页面以安装最新版本(撰写本文时为0.9)。
  2. 将二进制文件解压缩到一个software/kafka文件夹中。对于当前版本,它是software/kafka_2.11-0.9.0.0
  3. 将当前目录更改为指向新文件夹。
  4. 通过执行以下命令启动Zookeeper服务器:bin/zookeeper-server-start.sh config/zookeeper.properties
  5. 执行以下命令启动Kafka服务器:bin/kafka-server-start.sh config/server.properties
  6. 创建一个可用于测试的测试topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld
  7. 启动一个简单的控制台使用者,它可以使用发布到给定topic的消息,例如javaworldbin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning
  8. 启动一个简单的生产者控制台,可以将消息发布到测试topic:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld
  9. 尝试在生产者控制台中输入一条或两条消息。您的消息应显示在使用者控制台中。

Apache Kafka的示例应用程序

您已经了解了Apache Kafka如何开箱即用。接下来,让我们开发一个自定义生产者/消费者应用程序。生产者将从控制台检索用户输入,并将每个新行作为消息发送到Kafka服务器。消费者将检索给定topic的消息并将其打印到控制台。在这种情况下,生产者和消费者组件是您自己的kafka-console-producer.shkafka-console-consumer.sh

让我们从创建一个Producer.java类开始。此客户端类包含从控制台读取用户输入并将该输入作为消息发送到Kafka服务器的逻辑。

我们通过从java.util.Properties类创建对象并设置其属性来配置生产者。该ProducerConfig类定义了所有不同的属性可用,但Kafka的默认值足以满足大多数用途。对于默认配置,我们只需要设置三个必需属性:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)设置主机:端口对的列表,用于以host1:port1,host2:port2,...格式建立与Kakfa集群的初始连接。即使我们的Kafka集群中有多个代理,我们也只需要指定第一个代理的值host:port。Kafka客户端将使用此值在代理上进行发现调用,该代理将返回集群中所有代理的列表。最好在BOOTSTRAP_SERVERS_CONFIG中指定多个代理,这样如果第一个代理停止运行,客户端将能够尝试其他代理。

Kafka服务器需要byte[] key, byte[] value格式化的消息。Kafka的客户端库不是转换每个键和值,而是允许我们使用更友好的类型Stringint发送消息。库将这些转换为适当的类型。例如,示例应用程序没有特定于消息的key,因此我们将使用null作为key。对于值,我们将使用 String,即用户在控制台上输入的数据。

要配置消息key,我们用org.apache.kafka.common.serialization.ByteArraySerializer设定KEY_SERIALIZER_CLASS_CONFIG的值。这是有效的,因为null不需要转换为byte[]。对于消息值,我们为VALUE_SERIALIZER_CLASS_CONFIG设置了org.apache.kafka.common.serialization.StringSerializer,因为该类知道如何将String转换为 byte[]

Kafka 生产者

Properties使用必要的配置属性填充类之后,我们可以使用它来创建对象KafkaProducer。每当我们要发送的消息后,该Kafka服务器,我们将创建一个对象ProducerRecord,并调用KafkaProducersend()方法发送消息。ProducerRecord有两个参数:应该发布消息的topic的名称,以及实际的消息。使用生产者时,不要忘记调用该方法:Producer.close()

清单1. KafkaProducer


        public class Producer {
          private static Scanner in;
          public static void main(String[] argv)throws Exception {
              if (argv.length != 1) {
                  System.err.println("Please specify 1 parameters ");
                  System.exit(-1);
              }
              String topicName = argv[0];
              in = new Scanner(System.in);
              System.out.println("Enter message(type exit to quit)");

              //Configure the Producer
              Properties configProperties = new Properties();
              configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
              configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
              configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

              org.apache.kafka.clients.producer.Producer producer = new KafkaProducer<String, String>(configProperties);
              String line = in.nextLine();
              while(!line.equals("exit")) {
                  ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);
                  producer.send(rec);
                  line = in.nextLine();
              }
              in.close();
              producer.close();
          }
        }
      

配置消息使用者

接下来,我们将创建一个订阅topic的简单消费者。每当向topic发布新消息时,它将读取该消息并将其打印到控制台。消费者代码与生产者代码非常相似。我们首先创建一个对象java.util.Properties,设置其特定于消费者的属性,然后使用它来创建一个新对象KafkaConsumerConsumerConfig类定义了我们可以设置的所有属性。只有四个强制属性:

BOOTSTRAP_SERVERS_CONFIG(bootstrap.servers)KEY_DESERIALIZER_CLASS_CONFIG(key.deserializer)VALUE_DESERIALIZER_CLASS_CONFIG(value.deserializer)GROUP_ID_CONFIG(bootstrap.servers)

正如我们为生产者类所做的那样,我们将使用BOOTSTRAP_SERVERS_CONFIG为消费者类配置主机/端口对。此配置允许我们以host1:port1,host2:port2,...格式建立与Kakfa集群的初始连接。正如我之前提到的,Kafka服务器需要byte[]键和byte[]值格式的消息,并且有自己的实现来序列化不同的类型byte[]。正如我们对生产者所做的那样,在消费者方面,我们将不得不使用自定义反序列化器转换byte[]回适当的类型。在示例应用程序的情况下,我们知道生产者正在使用`ByteArraySerializer`

key和StringSerializer值。因此,在客户端,我们需要使用org.apache.kafka.common.serialization.ByteArrayDeserializer序列化key和org.apache.kafka.common.serialization.StringDeserializer序列化值。将这些类为赋值KEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG将使消费者反序列化由生产者发送的byte[]类型的数据。最后,我们需要设置值GROUP_ID_CONFIG。这应该是字符串格式的组名。我会在一分钟内详细解释这个配置。现在,只需查看具有四个强制属性集的Kafka消费者:

清单2. KafkaConsumer


  public class Consumer {
      private static Scanner in;
      private static boolean stop = false;

      public static void main(String[] argv)throws Exception{
          if (argv.length != 2) {
              System.err.printf("Usage: %s <topicName> <groupId>\n",
                      Consumer.class.getSimpleName());
              System.exit(-1);
          }
          in = new Scanner(System.in);
          String topicName = argv[0];
          String groupId = argv[1];

          ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
          consumerRunnable.start();
          String line = "";
          while (!line.equals("exit")) {
              line = in.next();
          }
          consumerRunnable.getKafkaConsumer().wakeup();
          System.out.println("Stopping consumer .....");
          consumerRunnable.join();
      }

      private static class ConsumerThread extends Thread{
          private String topicName;
          private String groupId;
          private KafkaConsumer<String,String> kafkaConsumer;

          public ConsumerThread(String topicName, String groupId){
              this.topicName = topicName;
              this.groupId = groupId;
          }
          public void run() {
              Properties configProperties = new Properties();
              configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");

              //Figure out where to start processing messages from
              kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
              kafkaConsumer.subscribe(Arrays.asList(topicName));
              //Start processing messages
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                      for (ConsumerRecord<String, String> record : records)
                          System.out.println(record.value());
                  }
              }catch(WakeupException ex){
                  System.out.println("Exception caught " + ex.getMessage());
              }finally{
                  kafkaConsumer.close();
                  System.out.println("After closing KafkaConsumer");
              }
          }
          public KafkaConsumer<String,String> getKafkaConsumer(){
             return this.kafkaConsumer;
          }
      }
  }

消费者和消费者线程

将清单2中的消费者代码分为两部分来确保Consumer在退出之前关闭对象。我将依次描述每个类。首先,ConsumerThread是一个内部类,它将topic名称和组名称作为其参数。在该类的run()方法中,它创建一个具有适当属性的KafkaConsumer对象。它通过调用kafkaConsumer.subscribe()方法订阅topic,然后每100毫秒轮询Kafka服务器以检查topic中是否有任何新消息。它将遍历任何新消息的列表并将其打印到控制台。

Consumer类中,我们创建一个新对象,并在另一个ConsumerThread线程中启动它。在ConsumerThead开始一个无限循环,并保持轮询新消息的topic。同时在Consumer类中,主线程等待用户进入exit控制台。一旦用户进入退出,它就会调用该KafkaConsumer.wakeup()方法,导致KafkaConsumer停止轮询新消息并抛出一个WakeupException。然后,我们可以通过调用kafkaConsumerclose()方法关闭KafkaConsumer

运行该应用程序

要测试此应用程序,您可以从IDE运行清单1和清单2中的代码,也可以按照以下步骤操作:

  1. 通过执行以下命令下载示例代码KafkaAPIClient : git clone https://github.com/sdpatil/KafkaAPIClient.git.
  2. 编译代码并使用以下命令创建胖JAR : mvn clean compile assembly:single.
  3. 启动消费者:java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1
  4. 启动生产者:java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test
  5. 在生产者控制台中输入消息,然后检查该消息是否出现在使用者中。试试几条消息。
  6. 键入exit消费者和生产者控制台以关闭它们。

第1部分的结论

在本教程的前半部分,您已经了解了使用Apache Kafka进行大数据消息传递的基础知识,包括Kafka的概念性概述,设置说明以及如何使用Kafka配置生产者/消费者消息传递系统。

正如您所见,Kafka的架构既简单又高效,专为性能和吞吐量而设计。在第2部分中,我将介绍一些使用Kafka进行分布式消息传递的更高级技术,从使用分区细分主题开始。我还将演示如何管理消息偏移以支持不同的用例。

英文原文:https://www.javaworld.com/article/3060078/big-data-messaging-with-kafka-part-1.html