在Apache Kafka简介的前半部分,您使用Kafka开发了几个小规模的生产者/消费者应用程序。从这些练习中,您应该熟悉Apache Kafka消息传递系统的基础知识。在下半部分,您将学习如何使用分区来分布负载并横向扩展应用程序,每天处理多达数百万条消息。您还将了解Kafka如何使用消息偏移来跟踪和管理复杂的消息处理,以及如何在消费者失败时保护您的Apache Kafka消息传递系统免于失败。我们将从第1部分开发用于发布 - 订阅和点对点用例的示例应用程序

Apache Kafka中的分区

Kafka中的topic可以细分为分区。例如,在创建名为Demo的topic时,您可以将其配置为具有三个分区。服务器将创建三个日志文件,每个文件分区一个。当生产者向topic发布消息时,它将为该消息分配分区ID。然后,服务器将消息仅附加到该分区的日志文件中。

如果您随后启动了两个消费者,则服务器可能会将分区1和2分配给第一个消费者,将分区3分配给第二个消费者。每个消费者只能从其分配的分区中读取。您可以在图1中看到为三个分区配置的Demo的topic。

为了扩展这个场景,想象一下有两个代理的Kafka集群,它位于两台机器中。分区演示tpoic时,您将其配置为具有两个分区和两个副本。对于此类配置,Kafka服务器会将两个分区分配给群集中的两个broker。每个broker都是其中一个分区的领导者。

当生产者发布消息时,它将转到分区领导者。领导者将获取消息并将其附加到本地计算机上的日志文件中。第二个broker会被动地将该提交日志复制到自己的机器上。如果分区负责人发生故障,第二个broker将成为新的领导者并开始提供客户端请求。以同样的方式,当消费者向分区发送请求时,该请求将首先发送给分区领导者,分区领导者将返回所请求的消息。

分区的好处

考虑分区基于Kafka的消息传递系统的好处:

  1. 可伸缩性:在只有一个分区的系统中,发布到topic的消息存储在一个日志文件中,该文件存在于一台计算机上。topic的消息数必须适合单个提交日志文件,并且存储的消息大小永远不会超过该计算机的磁盘空间。通过对topic进行分区,您可以通过将消息存储在群集中的不同计算机上来扩展系统。例如,如果您想为演示主题存储30千兆字节(GB)的消息,您可以构建一个由三台计算机组成的Kafka集群,每台计算机具有10 GB的磁盘空间。然后,您将topic配置为具有三个分区。
  2. 服务器负载平衡:拥有多个分区可让您在broker之间传播消息请求。例如,如果您的topic每秒处理100万条消息,则可以将其划分为100个分区,并将100个broker添加到群集中。每个broker都是单个分区的领导者,负责每秒响应10,000个客户端请求。
  3. 消费者负载平衡:与服务器负载平衡类似,在不同机器上托管多个消费者可以分散消费者负载。假设您希望从具有100个分区的topic每秒消耗100万条消息。您可以创建100个消费者并并行运行它们。Kafka服务器将为每个消费者分配一个分区,每个消费者将并行处理10,000个消息。由于Kafka仅将每个分区分配给一个消费者,因此在分区内将按顺序使用每个消息。

两种分区方式

生产者负责决定消息将进入的分区。生产者有两种控制这种分配的选择:

  • 自定义分区程序:您可以创建实现该org.apache.kafka.clients.producer.Partitioner接口的类。此自定义Partitioner将实现业务逻辑以确定发送消息的位置。
  • DefaultPartitioner:如果您不创建自定义分区程序类,则默认情况下将使用该类org.apache.kafka.clients.producer.internals.DefaultPartitioner。对于大多数情况,默认分区程序足够好,提供三个选项:
  • 手动:创建ProducerRecord时,使用重载的构造函数new ProducerRecord(topicName, partitionId,messageKey,message)指定分区ID。
  • 散列(局部敏感):创建ProducerRecord时,通过调用指定messageKey的构造方法 new ProducerRecord(topicName,messageKey,message)DefaultPartitioner将使用messageKey的散列来确保相同messageKey的所有消息都转到同一个生产者。这是最简单也是最常用的方法。
  • 喷涂(随机负载平衡):如果您不想控制哪些分区消息,只需调用new ProducerRecord(topicName, message)以创建您的ProducerRecord。在这种情况下,分区程序将以循环方式向所有分区发送消息,从而确保平衡的服务器负载。

对Apache Kafka应用程序进行分区

对于第1部分中的简单生产者/消费者示例,我们使用了 DefaultPartitioner。现在我们将尝试创建自定义分区程序。对于此示例,我们假设我们有一个零售网站,消费者可以使用该网站在世界任何地方订购产品。根据使用情况,我们知道大多数消费者都在美国或印度。我们希望对我们的应用程序进行分区,以便将来自美国或印度的订单发送给各自的消费者,而来自其他任何地方的订单将转发给第三个消费者。

首先,我们将创建一个实现org.apache.kafka.clients.producer.Partitioner接口的CountryPartitioner。我们必须实现以下方法:

  1. 当我们使用配置属性初始化类时,Kafka将调用configure()。此方法初始化特定于应用程序业务逻辑的函数,例如连接到数据库。在这种情况下,我们需要一个相当通用的分区器作为属性。然后,我们可以使用将消息流映射到分区。将来我们可以使用这种格式来改变哪些国家/地区获得自己的分区。PartitionerMapcountryNameconfigProperties.put("partitions.0","USA")
  2. Producer为每个消息调用partition()方法。在这种情况下,我们将使用它来读取消息并从消息中解析国家/地区的名称。如果国家的名称在countryToPartitionMap,它将返回存储在MappartitionId如果没有,它将散列国家的值并使用它来计算它应该去哪个分区。
  3. 我们调用close()来关闭分区程序。使用此方法可确保在关闭期间清除初始化期间获取的任何资源。

请注意,当Kafka调用configure()时,Kafka生成器会将我们为生成器配置的所有属性传递给Partitioner类。我们必须只读取那些以partitions.开头的属性,解析它们以获取partitionId并存储ID到 countryToPartitionMap

以下是我们的Partitioner界面自定义实现。

清单1. CountryPartitioner


    public class CountryPartitioner implements Partitioner {
        private static Map<String,Integer> countryToPartitionMap;

        public void configure(Map<String, ?> configs) {
            System.out.println("Inside CountryPartitioner.configure " + configs);
            countryToPartitionMap = new HashMap<String, Integer>();
            for(Map.Entry<String,?> entry: configs.entrySet()){
                if(entry.getKey().startsWith("partitions.")){
                    String keyName = entry.getKey();
                    String value = (String)entry.getValue();
                    System.out.println( keyName.substring(11));
                    int paritionId = Integer.parseInt(keyName.substring(11));
                    countryToPartitionMap.put(value,paritionId);
                }
            }
        }

        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                             Cluster cluster) {
            List partitions = cluster.availablePartitionsForTopic(topic);
            String valueStr = (String)value;
            String countryName = ((String) value).split(":")[0];
            if(countryToPartitionMap.containsKey(countryName)){
                //If the country is mapped to particular partition return it
                return countryToPartitionMap.get(countryName);
            }else {
                //If no country is mapped to particular partition distribute between remaining partitions
                int noOfPartitions = cluster.topics().size();
                return  value.hashCode()%noOfPartitions + countryToPartitionMap.size() ;
            }
        }

        public void close() {}
    }
    

Producer清单2(下面)中的类与第1部分中的简单生成器非常相似,其中两个更改以粗体标记:

  1. 我们使用等于值ProducerConfig.PARTITIONER_CLASS_CONFIG的键设置config属性,该值匹配我们CountryPartitioner类的完全限定名。我们还设置countryNamepartitionId,从而映射了我们想要传递给CountryPartitioner的属性。
  2. 我们将实现org.apache.kafka.clients.producer.Callback接口的类的实例作为producer.send()方法的第二个参数传递。一旦成功发布消息(附加了RecordMetadata对象),Kafka客户端将调用onCompletion()其方法。我们将能够使用此对象来找出发送消息的分区,以及分配给已发布消息的偏移量。

清单2.分区生产者


public class Producer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception {
        if (argv.length != 1) {
            System.err.println("Please specify 1 parameters ");
            System.exit(-1);
        }
        String topicName = argv[0];
        in = new Scanner(System.in);
        System.out.println("Enter message(type exit to quit)");

        //Configure the Producer
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

            configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName());
        configProperties.put("partition.1","USA");
        configProperties.put("partition.2","India");
        
        org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
        String line = in.nextLine();
        while(!line.equals("exit")) {
            ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, null, line);
            producer.send(rec, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset());
;
                }
            });
            line = in.nextLine();
        }
        in.close();
        producer.close();
    }
}

为消费者分配分区

Kafka服务器保证仅将分区分配给一个消费者,从而保证消息的消耗顺序。您可以手动分配分区或自动分配分区。

如果您的业务逻辑需要更多控制,那么您将需要手动分配分区。在这种情况下,您将使用KafkaConsumer.assign(<listOfPartitions>)将每个消费者感兴趣的分区列表传递给Kakfa服务器。

自动分配分区是默认和最常见的选择。在这种情况下,Kafka服务器将为每个使用者分配一个分区,并将重新分配分区以扩展新的使用者。

假设您正在创建一个包含三个分区的新topic。当您为新topic启动第一个消费者时,Kafka会将所有三个分区分配给同一个消费者。如果您随后启动第二个消费者,Kafka将重新分配所有分区,将一个分区分配给第一个下发者,将剩余的两个分区分配给第二个消费者。如果添加第三个消费者,Kafka将再次重新分配分区,以便为每个消费者分配一个分区。最后,如果您启动第四个和第五个消费者,那么三个消费者将拥有一个分配的分区,但其他消费者将不会收到任何消息。如果最初的三个分区之一出现故障,Kafka将使用相同的分区逻辑将该消费者的分区重新分配给其他消费者。

我们将为示例应用程序使用自动分配。我们的大部分消费者代码都与第1部分中的简单消费者代码相同。唯一的区别是我们将一个实例ConsumerRebalanceListener作为第二个参数传递给我们的KafkaConsumer.subscribe()方法。Kafka每次为此消费者分配或撤销分区时都会调用此类的方法。我们将覆盖ConsumerRebalanceListeneronPartitionsRevoked()onPartitionsAssigned()方法,并打印从此订阅者分配或撤消的分区列表。

清单3.分区的使用者


   private static class ConsumerThread extends Thread {
     private String topicName;
     private String groupId;
     private KafkaConsumer<String, String> kafkaConsumer;

     public ConsumerThread(String topicName, String groupId) {
         this.topicName = topicName;
         this.groupId = groupId;
     }

     public void run() {
         Properties configProperties = new Properties();
         configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
         configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

         //Figure out where to start processing messages from
         kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
         kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
             }
             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                 System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
             }
         });
         //Start processing messages
         try {
             while (true) {
                 ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                 for (ConsumerRecord<String, String> record : records)
                     System.out.println(record.value());
             }
         } catch (WakeupException ex) {
             System.out.println("Exception caught " + ex.getMessage());
         } finally {
             kafkaConsumer.close();
             System.out.println("After closing KafkaConsumer");
         }
     }

     public KafkaConsumer<String, String> getKafkaConsumer() {
         return this.kafkaConsumer;
     }
}
   

测试您的Apache Kafka应用程序

我们已准备好运行并测试生产者/消费者应用程序的当前迭代。如前所述,您可以使用清单1到清单3中的代码,或者在GitHub上下载完整的源代码

  1. 通过调用:编译并创建一个JAR mvn compile assembly:single
  2. 创建一个以三个分区和一个复制因子命名的主题part-demo     <KAFKA_HOME>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic part-demo  
  3. 启动生产者:             java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer part-demo
  4. 启动三个消费者,然后观察控制台以查看每次启动使用者的新实例时如何分配和撤消分区:             java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Consumer part-demo group1
  5. 在生产者控制台中键入一些消息,并验证消息是否路由到正确的使用者:             USA: First order             India: First order             USA: Second order             France: First order

图2显示了分区主题中的生产者/消费者输出。

能够将单个主题划分为多个部分是Kafka可扩展性的关键。通过分区,您可以水平扩展消息传递基础结构,同时还可以维护每个分区内的顺序 接下来,我们将了解Kafka如何使用消息偏移来跟踪和管理复杂的消息传递方案。

管理message偏移

我在第1部分中提到,每当生产者发布消息时,Kafka服务器就会为该消息分配一个偏移量。消费者能够通过设置或重置消息偏移来控制它想要消费的消息。在开发消费者时,您有两种管理偏移的选项:自动和手动。

两种类型的偏移

当您在Kafka客户端中启动使用者时,它将读取您的ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset)配置值。如果该配置设置为最早,则消费者将以该topic可用的最小偏移量开始。在向Kafka提出的第一个请求中,消费者会说:给我这个分区中的所有消息,其偏移量大于可用的最小值。它还将指定批量大小。Kafka服务器将以指定大小的批量返回所有匹配的消息。

消费者跟踪它处理的最后一条消息的偏移量,因此它将始终请求偏移量高于最后一个偏移量的消息。当消费者正常运行时,此设置有效,但如果消费者崩溃,或者您想停止维护,会发生什么?在这种情况下,您希望使用者记住上次处理的消息的偏移量,以便它可以从第一个未处理的消息开始。

为了确保消息持久性,Kafka使用两种类型的偏移:当前偏移量用于跟踪消费者正常工作时消耗的消息。该偏移还跟踪最后的消息抵消,但它发送信息到服务器kafka永久储存。

如果消费者由于某种原因而关闭或被关闭,它可以向Kafka服务器查询最后提交的偏移量并恢复消息消费,就好像没有丢失一样。就其本身而言,Kafka broker将此信息存储在一个名为__consumer_offsets的topic中。此数据将复制到多个broker,以便broker不会丢失偏移量。

提交偏移数据

您可以选择提交偏移数据的频率。如果您经常提交,则会受到性能损失。另一方面,如果消费者确实失败了,那么重新处理和消费的消息就会减少。您的另一个选择是减少提交(以获得更好的性能),但在发生故障时重新处理更多消息。在任何一种情况下,消费者都有两种提交偏移的选项:

  1. 自动提交:您可以设置auto.commit为true并使用以毫秒为单位的值设置auto.commit.interval.ms属性。启用此功能后,Kafka使用者将提交poll()调用而收到的最后一条消息的偏移量。该poll()调用在auto.commit.interval.ms后发出。
  2. 手动提交:您可以随时调用KafkaConsumercommitSync()commitAsync()方法。当您发出调用时,使用者将获取在poll()期间收到的最后一条消息的偏移量并将其提交给Kafka服务器。

手动偏移的三个用例

让我们考虑三种使用情况,您不希望使用Kafka的默认偏移管理基础架构。相反,您将手动确定要从哪个消息开始。

  1. 从头开始:在此用例中,您将捕获Kafka中的数据库更改。第一份数据是完整数据; 此后,您只会获得值已更改的列(更改的增量)。在这种情况下,您始终需要从头开始阅读topic中的所有消息,以构建记录的完整状态。要解决这种情况,您可以将消费者配置为通过调用kafkaConsumer.seekToBeginning(topicPartition)方法从头开始读取。请记住,默认情况下,Kafka将删除超过七天的消息,因此您需要为此用例配置更高的log.retention.hours值。
  2. 转到最后:现在让我们假设您通过实时分析交易来构建股票推荐应用程序。最糟糕的情况发生,您的消费者应用程序崩溃。在这种情况下,你已经使用过了kafkaConsumer.seekToEnd(topicPartition) 来配置偏移量以忽略停机期间的消息。相反,消费者将开始处理重启之时发生的消息
  3. 从给定的偏移开始:最后,假设您刚刚在生产环境中发布了新版本的生产者。在观看它产生一些消息后,您意识到它正在生成错误消息。你修复了生产者并重新开始。您不希望消费者使用这些错误消息,因此您可以通过调用kafkaConsumer.seek(topicPartition, startingOffset)手动将偏移量设置为生成的第一个良好消息。

消费者应用程序中的手动偏移

我们迄今为止开发的消费者代码每5秒自动提交一次记录。现在让我们更新消费者以获取手动设置偏移消耗的第三个参数。

如果使用等于0的最后一个参数的值,则使用者将假定您要从头开始,因此它将为每个分区调用一个kafkaConsumer.seekToBeginning()方法。如果传递值-1,则会假定您要忽略现有消息,并且仅消费在重新启动使用者后发布的消息。在这种情况下,它将为每个分区调用kafkaConsumer.seekToEnd()。最后,如果指定除0或-1以外的任何值,则会假定您已指定了消费者要从中开始的偏移量; 例如,如果您将第三个值传递为5,那么在重新启动时,使用者将使用偏移量大于5的消息。为此,它将调用kafkaConsumer.seek(<topicname>, <startingoffset>)

清单4.向使用者添加第三个参数


  private static class ConsumerThread extends Thread{
    private String topicName;
    private String groupId;
    private long startingOffset;
    private KafkaConsumer<String,String> kafkaConsumer;

    public ConsumerThread(String topicName, String groupId, long startingOffset){
        this.topicName = topicName;
        this.groupId = groupId;
        this.startingOffset=startingOffset;
    }
    public void run() {
        Properties configProperties = new Properties();
        configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
        configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //Figure out where to start processing messages from
        kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
        kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
            }
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
                Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
                while(topicPartitionIterator.hasNext()){
                    TopicPartition topicPartition = topicPartitionIterator.next();
                    System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
                    if(startingOffset ==0){
                        System.out.println("Setting offset to beginning");
                        kafkaConsumer.seekToBeginning(topicPartition);
                    }else if(startingOffset == -1){
                        System.out.println("Setting it to the end ");
                        kafkaConsumer.seekToEnd(topicPartition);
                    }else {
                        System.out.println("Resetting offset to " + startingOffset);
                        kafkaConsumer.seek(topicPartition, startingOffset);
                    }
                }
            }
        });
        //Start processing messages
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }

            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            kafkaConsumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
    public KafkaConsumer<String,String> getKafkaConsumer(){
        return this.kafkaConsumer;
    }
}

代码准备就绪后,您可以通过执行以下命令对其进行测试:


        java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.offset.Consumer part-demo group1 0
    

Kafka客户端应该打印偏移量为0的所有消息,或者您可以更改最后一个参数的值以在消息队列中跳转。

Apache Kafka中的消费者群体

传统的消息传递用例可以分为两种主要类型:点对点和发布 - 订阅。在点对点场景中,一个消费者使用一条消息。当消息中继银行交易时,只有一个消费者应该通过更新银行账户进行响应。在发布 - 订阅方案中,多个消费者将使用单个消息但对其作出不同的响应。当Web服务器出现故障时,您希望将警报发送给编程为以不同方式响应的消费者。

队列是指点对点场景,其中消息仅由一个消费者使用。主题是指发布 - 订阅方案,其中每个消费者都使用消息。Kafka没有为队列和主题用例定义单独的API; 相反,当您启动消费者时,您需要指定ConsumerConfig.GROUP_ID_CONFIG属性。

如果您对多个消费者使用相同的GROUP_ID_CONFIG消息,Kafka将假设它们都是单个组的一部分,并且它将仅向一个消费者传递消息。如果你在不同的group.id中启动两个消费者,Kafka将假设它们不相关,因此每个消费者将获得它自己的消息副本。

回想一下清单3中的分区使用者将groupId其作为第二个参数。现在我们将使用该groupId参数为消费者实现队列和主题用例。

  1. 创建一个以group-test两个分区命名的主题:   bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic group-test
  2. 启动一个可用于将消息发布到group-test刚刚创建的主题的生产者:   java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer group-test
  3. 启动三个听取发布到group-test主题的消息的消费者。使用group1你的价值group id。这将为您提供三个消费者group1   java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group1
  4. 启动第四个消费者,但这次改变了group idto 的值group2。这将为您提供三个消费者group1和一个消费者group2   java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group2
  5. 返回生产者控制台并开始键入消息。您发布的每条新消息都应在group2消费者窗口中出现一次,并在三个group1消费者窗口中出现一次,如图3所示。

第2部分的结论

大数据消息系统的早期用例需要批处理,例如运行夜间ETL过程或定期将数据从RDBMS移动到NoSQL数据存储区。在过去几年中,对实时处理的需求增加,特别是对于欺诈检测和应急响应系统。Apache Kafka是为这些类型的实时场景而构建的。

Apache Kafka是一个很好的开源产品,但确实有一些限制; 例如,您无法在主题到达目标之前从主题内部查询数据,也不能跨多个地理位置分散的群集复制数据。您可以将MapR Streams(商业产品)与Kafka API结合使用,以实现这些和其他更复杂的发布 - 订阅方案。

原文链接:https://www.javaworld.com/article/3066873/big-data-messaging-with-kafka-part-2.html