kafka从入门到实践

7/10/2021 kafka

题图:pixabay

上周在公上周在公司做了一次内部分享,关于kafka科普相关的。总结输出一下:

# kafka 是什么?

开源的消息引擎系统。流处理平台。我们说的更多的是"消息队列"。

# 流处理是什么?

流是数据。处理是动作。流处理就是不断对数据进行结果计算的动作。它的适用场景更多的是:

  • 监控告警
  • 日志流处理
  • BI模型训练
  • ...

流

# 我们常说的mq是什么?

message queue。消息队列

消息即数据。队列即存放消息的容器。那种先进先出的数据结构,大家应该再熟悉不过了。

# 那么它的本质是什么?

发-存-收

mq

# kafka 在mq中的优劣?

引用自 mq对比选型 (opens new window)

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,吞吐量比RocketMQ和Kafka要低一个数量级 万级,吞吐量比RocketMQ和Kafka要低一个数量级 十万级,RocketMQ也是可以支撑高吞吐的一种MQ 十万级别,Kafka最大优点就是吞吐量大,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
Topic数量对吞吐量的影响 - - Topic可以达到几百、几千个的级别,吞吐量会有小幅度的下降。这是RocketMQ的一大优势,可在同等数量机器下支撑大量的Topic Topic从几十个到几百个的时候,吞吐量会大幅下降。所以在同等机器数量下,Kafka尽量保证Topic数量不要过多。如果支撑大规模Topic需要增加更多的机器
时效性 ms级 微秒级,这是rabbitmq的一大特点,延迟是最低的 ms级 延迟在ms级以内
可用性 高,基于主从架构实现可用性 高,基于主从架构实现可用性 非常高,分布式架构 非常高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 - 经过参数优化配置,可以做到零丢失 经过参数配置,消息可以做到零丢失
功能支持 MQ领域的功能及其完备 基于erlang开发,所以并发性能极强,性能极好,延时低 MQ功能较为完备,分布式扩展性好 功能较为简单,主要支持加单MQ功能
优势 非常成熟,功能强大,在业内大量公司和项目中都有应用 erlang语言开发,性能极好、延时很低,吞吐量万级、MQ功能完备,管理界面非常好,社区活跃;互联网公司使用较多 接口简单易用,阿里出品有保障,吞吐量大,分布式扩展方便、社区比较活跃,支持大规模的Topic、支持复杂的业务场景,可以基于源码进行定制开发 超高吞吐量,ms级的时延,极高的可用性和可靠性,分布式扩展方便
劣势 偶尔有较低概率丢失消息,社区活跃度不高 吞吐量较低,erlang语音开发不容易进行定制开发,集群动态扩展麻烦 接口不是按照标准JMS规范走的,有的系统迁移要修改大量的代码,技术有被抛弃的风险 有可能进行消息的重复消费
应用 主要用于解耦和异步,较少用在大规模吞吐的场景中 都有使用 用于大规模吞吐、复杂业务中 在大数据的实时计算和日志采集中被大规模使用,是业界的标准

# xxx 为什么选择kafka作为统一的队列?(省略)

  • 维护成本
  • 高可用
  • 技术栈

# kafka的性能优势体现在什么地方?

# 零拷贝-针对读

引用自一文读懂Kafka零拷贝 (opens new window)

img

那零拷贝是什么?

零拷贝(*Zero-copy*)技术,因为我们没有在内存层面去拷贝数据,也就是说全程没有通过 CPU 来搬运数据,所有的数据都是通过 DMA 来进行传输的。

# 批量压缩

对于日志类的场景,可以考虑压缩。其余场景不建议使用压缩。压缩会消耗额外的CPU。

同步发的话就不存在批量发送了。批量发送的话,这一批消息会被压缩在一起,而单条发时,就是每一条分别压缩。我们知道,在文件非常小的时候,使用gzip压缩的效果是很差的,甚至可能压完比源文件还大。

# 顺序写磁盘

在顺序读写的情况下,磁盘的顺序读写速度和内存持平

因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机 I/O,最喜欢顺序 I/O。为了提高读写硬盘的速度,Kafka 就是使用顺序 I/O。

# 批量读写

kafka消费端可以一次拉起多条数据,最后统一提交offset。

kafka发送端也可以囤积多条消息一次发送,通过batch.size来设置批量的大小。但是这个只能针对单partition,即多条消息发往同一个分区。

消费端跟发送端都有两个参数来控制批量的这个策略。一个是跟大小有关,一个是跟时间有关,两者只要符合一种即可符合条件。可以自己了解下。

目前我们用的比较多的是消费端的批量处理。

# 分区分段+索引

这个就涉及到kafka的存储模型.

我们先简单启个kafka看看:

第一步:下载+启动zk+启动server+创建topic

image-20210704102042064

第二步:发几条消息

image-20210704102647956

第三步:消费消息

image-20210704111352456

第四步:查看日志文件

image-20210704123249123

第五步:查看索引文件

image-20210704123318489

第六步:查看时间索引文件

image-20210704123354229

通过上面的一些实践观察,我们发现:

  • 每个partition都会生成一个日志文件夹
  • 每个文件夹里至少包含3个文件,统称为segment(这是一个逻辑分组)
    • .index文件。偏移量索引文件,消息偏移量到物理地址之间的映射,方便快速定位消息在log文件中的位置
    • .log文件。详细日志文件
    • .timeindex文件。时间索引文件,根据时间戳来找到对应的偏移量信息,也就是对应.index文件。
  • .index文件中的偏移量和.timeindex文件中的时间戳都是单调递增的。为啥?因为kafka的索引机制是采用的稀疏索引,简单的说就是分段。kafka并不是把所有 的offset偏移量与物理地址的映射关系都存下来,它是隔一段存一个,隔一段存一个。如果要查找某条消息,已知offset,先根据offset找到当前offset在哪一段。找到段头段尾的offset,根据段头段尾offset去log文件找到对应的区间二分查到对应的msg。

# 那为什么kafka索引要采用稀疏索引?

防止索引文件过大,查找费劲

# 那为什么kafka要分区分段?

防止单个日志文件过大,方便查找。

上面说的是在同一个segment里根据索引找到msg,那我怎么知道offset在哪个segment里?

还是通过二分法来查询判断

下面我们通过一张图来加深上述文字的理解

kafka-storage

下面是具体执行的命令:

# 下载kafka
wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.11-1.0.0.tgz

cd kafka_2.11-1.0.0

mkdir logs

#修改日志目录
vim config/server.properties && log.Dirs = logs

#启动zk
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

#创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic liuli-test

#查看topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

#启动kafka server
./bin/kafka-server-start.sh config/server.properties

#启动kafka producer终端
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic liuli-test

#查看kafka索引文件(根据offset查找)
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./logs/liuli-test-4/00000000000000000000.index

#查看kafka日志文件
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./logs/liuli-test-4/00000000000000000000.log --print-data-log

#查看时间索引文件(根据时间戳查找)
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./logs/liuli-test-4/00000000000000000000.timeindex

#消费消息
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic liuli-test --from-beginning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# kafka的几个八股文问题?

# kafka怎么保证顺序性?

这个问题怎么产生的?

kafka默认的分区策略是轮训。如果既不指定分区,也不指定hash code。则会根据默认的策略讲消息发往server端的partition。那么就有可能同一个主键的数据进入不同的partition队列。消费端对于单个partition的数据是可以保证串行处理,对于不同partition队列是无法保证的。所以会存在后发送的消息先消费的情况。

怎么解决?

单partition、

# kafka怎么保证不丢数据?

副本机制、同步发送、手动ack

# kafka怎么保证数据幂等性?

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  • 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。

指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即enable.idempotence。enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer。底层原理,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。但是它只能保证单分区、单会话上的幂等性。

# kafka核心原理

# kafka架构

# 了解kafka需要理解的几个概念

  • Topic 用来对消息进行分类,每个进入到Kafka的信息都会被放到一个Topic下
  • Broker 用来实现数据存储的主机服务器。每个 Broker 即一个 Kafka 服务实例,多个 Broker 构成一个 Kafka 集群,生产者发布的消息将保存在 Broker 中,消费者将从 Broker 中拉取消息进行消费
  • Partition 每个Topic中的消息会被分为若干个Partition,以提高消息的处理效率。一个 Topic 可以分为多个 Partition,每个 Partition 是一个有序的队列,Partition 中的每条消息都存在一个有序的偏移量(Offest)
  • Offset。消息位移,表示分区中每条消息的位置信息,是一个单调递增且不变的值。
  • Replica。副本,Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。副本还分为领导者副本和追随者副本,各自有不同的角色划分。副本是在分区层级下的,即每个分区可配置多个副本实现高可用。
  • Consumer Offset。消费者位移,表征消费者消费进度,每个消费者都有自己的消费者位移。
  • Producer 消息的生产者
  • Consumer 消息的消费者
  • Consumer Group 消息的消费群组,多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
  • 协调者。这个过程就是 Kafka 中大名鼎鼎的“重平衡”(Rebalance)。
  • Consumer Offset。消费者位移,表征消费者消费进度,每个消费者都有自己的消费者位移。
  • ISR。班干部列表。设置ISR主要是为了broker宕掉之后,重新选举partition的leader从ISR列表中选择。
  • AR。Assigned Replicas,所有副本。
  • OSR。AR-ISR。
  • Rebalance。重平衡,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

# 控制器

  • 主题管理
  • 分区管理
  • 选举
  • 集群成员管理
  • 数据服务

数据服务

# kafka选举

  1. 控制器选举
  2. 分区副本选举
  3. 消费者选举

控制器选举。控制器就是broker,一个kafka集群有多个broker节点。由broker leader来监听其他broker的信息,包括partition状态、isr列表、副本等信息。如果某个broker leader挂了,broker follower会来抢leader位置,谁先来谁坐。如果某个broker follower挂了,broker leader 会读取这个挂了的broker在zk上的信息状态等通知给其他broker,如果这个broker上还存在leader 副本,broker leader还会触发副本选举。

总结就是:它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。即先做控制器选主。选完后接下来控制分区选举。

controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为此作为响应的Broker

优势是啥?

  • 防止脑裂
  • 防止羊群效应

分区副本选举。一个分区会生成多个副本分布在多个broker上。会选举出一个leader副本来负责对外服务,其他副本接收到的请求都会转给leader副本来处理。

消费者选举。对一个消费组中的多个消费者选出一个leader用来统筹消费 conn partition的作用,当某个消费者退出时,此消费者对应的partition会分配给别的消费组来消费。

ISR集合。In Sync Replicas,同步副本。

AR。Assigned Replicas,所有副本。

OSR。AR-ISR。

# kafka consumer group

消费组、消费者跟topic、partition的关系

1、单个topic下单个partition只能被同一个消费组下的同一个消费者订阅。

2、一个消费组可以订阅多个topic。但是不建议。

理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。也可以一个消费者消费好几个分区

消费者位移保存在哪里?

  • zk
  • kafka内部

# Kafka producer

d

# kafka rebalance

本质上是一种协议,规定了消费者组下的每个消费者如何达成一致,来分配订阅topic下的每个分区。将消费者资源与partition队列重新匹配来达到平衡负载的效果

Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。

  1. 组成员数发生变更
  2. 订阅主题数发生变更
  3. 订阅主题的分区数发生变更

在 Rebalance 过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。

所有 Broker 在启动时,都会创建和开启相应的 Coordinator (协调员)组件。也就是说,所有 Broker 都有各自的 Coordinator 组件。

Consumer 端应用程序在提交位移时,其实是向 Coordinator 所在的 Broker 提交位移。同样地,当 Consumer 应用启动时,也是向 Coordinator 所在的 Broker 发送各种请求,然后由 Coordinator 负责执行消费者组的注册、成员管理记录等元数据管理操作。

Rebalance 的弊端是什么呢?总结起来有以下 3 点:

  1. stw影响消费速度
  2. 效率不高,需要所有 成员参与。不能参考一致性hash吗?

避免因为各种参数或逻辑不合理而导致的组成员意外离组或退出的情形,与之相关的主要参数有:

  • session.timeout.ms(会话)
  • heartbeat.interval.ms(心跳)
  • max.poll.interval.ms(两次拉取数据最大间隔)
  • GC 参数

kafka为啥不让Leader副本和Follower都对外提供服务?像 Redis 和 MySQL 那样支持读写分离?

  • 读跟写是一致的
  • 主从延迟问题

I/O 模型与 Kafka 的关系又是什么呢?

实际上 Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的 I/O 性能。

# 为什么 Kafka 要做分区这样的设计?直接用多个主题不好吗?

  • 提供负载均衡的能力
  • 为了实现系统的高伸缩性
  • 实现业务级别的消息顺序

# Unclean 领导者选举

可以理解为不干净的副本选举。正常的选举是从ISR副本基本中选举,因为ISR集合中的副本都是跟领导者保持同步的,那总有一些副本性能不是那么好,落后领导者副本太多。这些副本肯定不能进入ISR副本集合的。但是如果ISR副本全部都挂了呢,一个不留。为了保证集群的可用性,就得从这个落后的副本里选出领导者副本。这就叫Unclean 领导者选举。开启Unclean 领导者选举的弊端是可能会造成数据丢失。

# 什么是高水位(待研究)

是用来表示消息位移的一个状态。

  1. 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的
  2. 帮助 Kafka 完成副本同步

# kafka在 xxx 的应用(省略)

# 引用

https://acecodeinterview.com/kafka/

极客时间《Kafka核心技术与实战》专栏

#

最后更新时间: 7/18/2021, 6:07:27 PM