SpringCloud 09 —— 消息总线Spring Cloud Bus 之 Kafka

Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

Kafka 的特点

  • 高吞吐量(顺序读写)
  • 高可靠性(数据备份)
  • 可持久化(本次磁盘持久化)
  • 可扩展性(集群热扩(缩)容)
  • 分区内有序
  • 生产者与消费者多样性(支持多语言)

Kafka的基本概念

  • Producer
    消息生产者,就是向kafka broker发消息的客户端。

  • Consumer
    消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

  • Topic
    主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

  • Partition
    消息分区,一个topic可以分为多个 partition,每个
    partition是一个有序的队列。partition中的每条消息都会被分配一个有序的
    id(offset)。

  • Broker
    一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

  • Consumer Group
    消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

  • Offset
    消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

Kafka具有四个核心API

  • 使用 Producer API发布消息到kafka集群中一个或多个topic。

  • 使用 Consumer API来订阅一个或多个topic,并处理产生的消息。

  • 使用 Streams API充当一个流处理器,从1个或多个topic消费输入流,并生产输出流到1个或多个输出topic,有效地将输入流转换到输出流。

  • 使用Connector API可以构建和运行可重复使用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,针对关系型数据库的连接器可以捕获到表的每个变化。

基本安装与验证

Kafka官方下载地址

解压后进入windows目录,kafka基于zookeeper所以先启动里面的zookeeper。

./zookeeper-server-start.bat ./../../config/zookeeper.properties

image.png

然后启动kafka

./kafka-server-start.bat ./../../config/server.properties

创建一个主题

./kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cloud

查看创建的主题列表

./kafka-topics.bat --list --zookeeper localhost:2181

可以看到我们刚刚创建的主题cloud

启动生产者

./kafka-console-producer.bat --broker-list localhost:9092 --topic cloud

启动消费者

./kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic cloud --from-beginning

验证

我在生产者端输入数据可以看到在消费者获取到消费。

zookeeper可视化工具———zkui

github上下载源码包

下载完成后运行

mvn clean install

将config文件拷贝到打包后的target目录中,然后运行打完的jar包启动.

java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar

浏览器验证

默认管理地址http://localhost:9090

默认账号密码:admin/manager

同样在我们刚刚拷贝的config文件中可以修改自定义的配置。

Kafka 简单使用

这里只是一个简单的演示,如果希望对kafka有完整的了解建议阅读Kafka教程

前面我们讲解了RabbitMQ,也了解了RabbitMQ的工作模式,下面我们来看一下Kafka 是如何进行消息发送的,Kafka 号称性能怪兽,吞吐量非常高,当然kafka 设计之初就是为了解决高吞吐量的问题。Kafka 也一直被用与大数据、海量日志的处理等问题。

新建cloud-mq-kafka项目模块,引入依赖:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

新建启动类

@SpringBootApplication
public class KafkaMqApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaMqApplication.class, args);
    }
}

应用配置

server:
  port: 8781
spring:
  application:
    name: cloud-mq-kafka
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: 1
      retries: 0
      batch-size: 16384
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      group-id: testGroup
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

kafka 每个配置对应的意思如下:

  • kafka.bootstrap-servers
    指定kafka server的地址,集群配多个,中间,逗号隔开

  • kafka.producer.asks
    可以设置的值为:all, -1, 0, 1
    procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
    acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
    acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
    acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。

  • kafka.producer.retries
    写入失败时,重试次数。当leader节点失效,一个副本节点会替代成为leader节点,此时可能出现写入失败。
    当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。

  • kafka.producer.batch-size
    每次批量发送消息的数量,produce积累到一定数据,一次发送

  • kafka.producer.key-serializer
    指定消息key编解码方式

  • kafka.producer.value-serializer
    指定消息体的编解码方式

  • kafka.consumer.group-id
    指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名

  • kafka.consumer.auto-offset-reset
    smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest

  • kafka.consumer.enable-auto-commit
    设置自动提交offset

  • kafka.consumer.auto-commit-interval
    如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。

  • kafka.consumer.key-deserializer
    指定消息key编解码方式

  • kafka.consumer.value-deserializer
    指定消息体编解码方式

当然kafka 还有很多配置,我们简单使用这些已经够了

创建发送方

发送方需要引入KafkaTemplate,用来发送消息,发送的时候我们指定消息的topic 和内容。并把消息内容打印出来。并把发送类注册为组件。

@Component
@Slf4j
public class Sender {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    public boolean send(){
        String message="Hello World:"+ DateUtil.now();
        log.info("Sender:"+message);
        //第一个参数是topic,第二个参数是内容
        kafkaTemplate.send("cloud",message);
        return true;
    }
}

创建消费方

@Component
@Slf4j
public class Consumer {
    @KafkaListener(topics = "cloud")
    public void onMessage(String message){
        log.info(message);
    }
}

启动kafka后启动Sender,然后单元测试启动Consumer

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class SenderTest {

    @Autowired
    private Sender sender;

    @Test
    void send() {
        for (int i = 0; i <10 ; i++) {
            sender.send();
        }
    }
}

Spring Cloud Config 改造(Kafka)

前面我们使用了RabbitMQ 来改造Spring Cloud Config,这里我们使用Kafka 来改造Spring Cloud Config。在RabbitMQ 那一章使用的是spring-cloud-starter-bus-amqp包,和RabbitMQ 那一节集成 Spring Cloud Bus 集成的区别是包和连接配置有区别,下面我们看下Kafka的依赖包。

新建项目模块cloud-config-kafka-client,引入依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-bus</artifactId>
        </dependency>
    </dependencies>

启动类

这里和RabbitMQ 集成Sring Cloud Bus 是一样的

@EnableDiscoveryClient
@SpringBootApplication
public class ConfigKafkaClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConfigKafkaClientApplication .class, args);
    }
}

应用配置

配置中和RabbitMQ 集成的区别主要是把RabbitMQ 的连接信息换成Kafka的连接信息,确保Kafka 是启动的。同时我们将应用的健康信息和bus-refresh接口暴露出去

server:
  port: 8779
spring:
  application:
    name: register-eureka-client
  cloud:
    config: #自己指定的和服务发现的2选1
#      uri: http://localhost:8778/  自己指定的configServer
      profile: dev
      label: master
      discovery: #基于服务发现的
        enabled: true
        service-id: config-server
    # kafka
    stream:
      kafka:
        binder:
          brokers: localhost:9092
    bus:
      trace:
        enabled: true
management:
  endpoints:
    web:
      exposure:
        include: refresh,health,info,bus-refresh

启动项目

先将cloud-config-native-server启动起来,然后启动本项目,否则会报错,获取不到配置信息浏览器或者Postman 测试localhost:8779/api/version

如果修改了服务的配置,例如git 仓库修改的配置,通过localhost:8779/actuator/bus-refresh即可给全部客户端刷新配置,这里和RabbitMQ 集成Spring Cloud Bus中一样。

更新时间:2020-04-29 12:03:20

本文由 寻非 创作,如果您觉得本文不错,请随意赞赏
采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
原文链接:https://www.zhouning.group/archives/springcloud09消息总线springcloudbus之kafka
最后更新:2020-04-29 12:03:20

评论

Your browser is out of date!

Update your browser to view this website correctly. Update my browser now

×