SpringCloud 09 —— 消息总线Spring Cloud Bus 之 Kafka

Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

Kafka 的特点

  • 高吞吐量(顺序读写)
  • 高可靠性(数据备份)
  • 可持久化(本次磁盘持久化)
  • 可扩展性(集群热扩(缩)容)
  • 分区内有序
  • 生产者与消费者多样性(支持多语言)

Kafka的基本概念

  • Producer
    消息生产者,就是向kafka broker发消息的客户端。

  • Consumer
    消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

  • Topic
    主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

  • Partition
    消息分区,一个topic可以分为多个 partition,每个
    partition是一个有序的队列。partition中的每条消息都会被分配一个有序的
    id(offset)。

  • Broker
    一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

  • Consumer Group
    消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

  • Offset
    消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

Kafka具有四个核心API

  • 使用 Producer API发布消息到kafka集群中一个或多个topic。

  • 使用 Consumer API来订阅一个或多个topic,并处理产生的消息。

  • 使用 Streams API充当一个流处理器,从1个或多个topic消费输入流,并生产输出流到1个或多个输出topic,有效地将输入流转换到输出流。

  • 使用Connector API可以构建和运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,针对关系型数据库的连接器可以捕获到表的每个变化。

基本安装与验证

Kafka官方下载地址

解压后进入windows目录,kafka基于zookeeper所以先启动里面的zookeeper。

./zookeeper-server-start.bat ./../../config/zookeeper.properties

image.png

然后启动kafka

./kafka-server-start.bat ./../../config/server.properties

创建一个主题

./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cloud

查看创建的主题列表

./kafka-topics.bat --list --zookeeper localhost:2181

可以看到我们刚刚创建的主题cloud

启动生产者

./kafka-console-producer.bat --broker-list localhost:9092 --topic cloud

启动消费者

./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic cloud --from-beginning

验证

我在生产者端输入数据可以看到在消费者获取到消费。

zookeeper可视化工具———zkui

github上下载源码包

下载完成后运行

mvn clean install

将config文件拷贝到打包后的target目录中,然后运行打完的jar包启动.

java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar

浏览器验证

默认管理地址http://localhost:9090

默认账号密码:admin/manager

同样在我们刚刚拷贝的config文件中可以修改自定义的配置。

Kafka 简单使用

这里只是一个简单的演示,如果希望对kafka有完整的了解建议阅读Kafka教程

前面我们讲解了RabbitMQ,也了解了RabbitMQ的工作模式,下面我们来看一下Kafka 是如何进行消息发送的,Kafka 号称性能怪兽,吞吐量非常高,当然kafka 设计之初就是为了解决高吞吐量的问题。Kafka 也一直被用与大数据、海量日志的处理等问题。

新建cloud-mq-kafka项目模块,引入依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

新建启动类

@SpringBootApplication
public class KafkaMqApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaMqApplication.class, args);
    }
}

应用配置

server:
  port: 8781
spring:
  application:
    name: cloud-mq-kafka
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: 1
      retries: 0
      batch-size: 16384
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      group-id: testGroup
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

kafka 每个配置对应的意思如下:

  • kafka.bootstrap-servers
    指定kafka server的地址,集群配多个,中间,逗号隔开

  • kafka.producer.asks
    可以设置的值为:all, -1, 0, 1
    procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。

  • kafka.producer.retries
    写入失败时,重试次数。当leader节点失效,一个副本节点会替代成为leader节点,此时可能出现写入失败。
    当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。

  • kafka.producer.batch-size
    每次批量发送消息的数量,produce积累到一定数据,一次发送

  • kafka.producer.key-serializer
    指定消息key编解码方式

  • kafka.producer.value-serializer
    指定消息体的编解码方式

  • kafka.consumer.group-id
    指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名

  • kafka.consumer.auto-offset-reset
    smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest

    </