扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
Kafka是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统。具有:高吞吐量、低延迟、可扩展性、持久性、可靠性、容错性、高并发等特性。常见的应用场景有:日志收集、消息系统、流式处理等。
创新互联坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都网站建设、网站建设、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的灵川网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!二. Kafka的基本架构Producer
:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其发送到 Kafka。Consumer
:消费者,也就是接受消息的一方。消费者连接到 Kafka 上并接收消息,进而进行相应的业务逻辑处理。Consumer Group
:一个消费者组可以包含一个或多个消费者。使用多分区 + 多消费者方式可以极大提高数据下游的处理速度,同一消费组中的消费者不会重复消费消息,同样的,不同消费组中的消费者消息消息时互不影响。Kafka 就是通过消费组的方式来实现消息 P2P 模式和广播模式。Broker
:服务代理节点。Broker 是 Kafka 的服务节点,即 Kafka 的服务器。Topic
:Kafka 中的消息以 Topic 为单位进行划分,生产者将消息发送到特定的 Topic,而消费者负责订阅 Topic 的消息并进行消费。Partition
:Topic 是一个逻辑的概念,它可以细分为多个分区,每个分区只属于单个主题。同一个主题下不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。Offset
:offset 是消息在分区中的唯一标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序性而不是主题有序性。Replication
:副本,是 Kafka 保证数据高可用的方式,Kafka 同一 Partition 的数据可以在多 Broker 上存在多个副本,通常只有主副本对外提供读写服务,当主副本所在 broker 崩溃或发生网络一场,Kafka 会在 Controller 的管理下会重新选择新的 Leader 副本对外提供读写服务。Record
:实际写入 Kafka 中并可以被读取的消息记录。每个 record 包含了 key、value 和 timestamp。Kafka 在Topic
级别本身是无序的,只有partition
上才有序,所以为了保证处理顺序,可以自定义分区器,将需顺序处理的数据发送到同一个partition
。自定义分区器需要实现接口Partitioner
接口并实现 3 个方法:partition
,close
,configure
,在partition
方法中返回分区号即可。
Kafka 中发送 1 条消息的时候,可以指定(topic, partition, key)
3 个参数,partiton
和key
是可选的。
Kafka 分布式的单位是partition
,同一个partition
用一个write ahead log
组织,所以可以保证FIFO
的顺序。不同partition
之间不能保证顺序。因此你可以指定partition
,将相应的消息发往同 1个partition
,并且在消费端,Kafka 保证1 个partition
只能被1 个consumer
消费,就可以实现这些消息的顺序消费。
另外,也可以指定 key(比如 order id),具有同 1 个 key 的所有消息,会发往同 1 个partition
,那这样也实现了消息的顺序消息。
Kafka在数据生产的时候,有一个数据分发策略。默认的情况使用org.apache.kafka.clients.producer.internals.DefaultPartitioner
类,这个类中就是定义数据分发的策略。默认策略为:
Kafka的消息避免丢失可以从三个方面考虑处理:Producer发送消息避免失败、Broker能成功保存接收到的消息、Consumer确认消费消息。
Producer发送消息避免失败
想要Produce发送消息不失败,那就得知道发送结果,网络抖动这些情况是无法避免的,只能是发送后获取发送结果,那么最直接的方式就是把Kafka默认的异步发送改为同步发送(Broker收到消息后ack
回复确认),这样就能实时知道消息发送的结果,但是这样会让Kafka的发送效率大大降低,因为Kafka在默认的异步发送消息的时候可以批量发送,以此大幅度提高发送效率,因此一般很少使用同步发送的方式,除非消息很重要绝不允许丢失。
但是我们可以采用添加异步或调函数,监听消息发送的结果,如果失败可以在回调中重试,以此来达到尽可能的发送成功。同时Producer
本身提供了一个retries
的机制,如果因为网络问题,或者Broker故障 导致发送失败,就是重试。一般这个retries
设置3-5次或者更高,同时重试间隔时间也随着次数增长。
Broker能成功保存接收到的消息
Broker要成功的保存接收到的消息并且不丢失,就需要把接收到的消息保存到磁盘。Kafka为了提高性能采用的是异步批量,存储到磁盘的机制,就是有一定的消息量和时间间隔要求的,刷磁盘的这个动作是操作系统来调度的,如果在刷盘之前系统就崩溃了,就会数据丢失。
针对这个情况,Kafka采用Partition
分区ack
机制,Partition
分区是指一个Topic下的多个分区,有一个Leader
分区,其他的都是Follower
分区,Leader
分区负责接收和被读取消息,Follower
分区会通过Replication
机制同步Leader
的数据,负责高可用(Kafka在2.4之后,Kafka提供了读写分离,Follower
也可以提供读取),当Leader
出现故障时会从Follower
中选取一个成为新的Leader
。那么当一个消息发送到Leader
分区之后,Kafka提供了一个acks
的参数,Producer
可以设置这个参数,去结合broker
的Partition
机制来共同保障数据的可靠性,这个参数的值有三个
0
,表示Producer
不需要等待broker
的响应,就认为消息发送成功了(可能存在数据丢失)1
,表示Leader
收到消息之后,不等待其他的Follower
的同步就给Producer
发一个确认,如果Leader
和Partition
挂了就可能存在数据丢失-1
,表示Leader
收到消息之后还会等待ISR
列表(与Leader
保持正常连接的Follwer
节点列表)中的Follower
同步完成,再给Producer
返回一个确认,也就是所有分区节点都确认收到消息,保证数据不丢失Consumer确认消费消息
当Producer
确定发送消息成功并且Broker
成功保存消息之后,基本上Consumer
就肯定能消费到消息。Kafka在消费者消费时有一个offset
机制,代表了当前消费者消费到了Partition
的哪一条消息。kafka的Consumer
的配置中,默认的enable.auto.commit = true
,表示在Consumer
通过poll
方法 获取到消息以后,每过5秒(通过配置项可修改)会自动获取poll
中得到的大的offset
, 提交给Partition
中的offset_consumer
(存储 offset 的特定topic)。如果enable.auto.commit = false
时,则关闭了自动提交,需要手动的通过应用程序代码进行提交。
所以在Consumer
消费消息时,丢失消息的可能会有两种,比如开启了offset
自动提交,但是消息消费失败;或者没有开启自动提交offset
,但是在消费消息之前提交了offset
。针对这两种情况,可以设置在消息消费完成后手动提交offset
。总之Consumer
端确认消息消费成功后再提交offset
即可保证消息正常消费。
Kafka中的每个Partition
都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition
中。Partition
中的每个消息都有一个连续的序号,用于Partition
唯一标识一条消息,这个唯一标识就是offset
。Offset
从语义上来看拥有两种:Current Offset
和Committed Offset
。
Current Offset
保存在Consumer
客户端中,它表示Consumer
希望收到的下一条消息的序号。它仅仅在poll()
方法中使用。例如,Consumer
第一次调用poll()
方法后收到了20条消息,那么Current Offset
就被设置为20
。这样Consumer
下一次调用poll()
方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll
消息时,都能够收到不重复的消息。
Committed Offset
保存在Broker
上,它表示Consumer
已经确认消费过的消息的序号。主要通过commitSync
和commitAsync
API来操作。举个例子,Consumer
通过poll()
方法收到20条消息后,此时Current Offset
就是20,经过一系列的逻辑处理后,并没有调用consumer.commitAsync()
或consumer.commitSync()
来提交Committed Offset
,那么此时Committed Offset
依旧是0。Committed Offset
主要用于Consumer Rebalance
。在Consumer Rebalance
的过程中,一个partition
被分配给了一个Consumer
,那么这个Consumer
该从什么位置开始消费消息呢?答案就是Committed Offset
。另外,如果一个Consumer
消费了5条消息(poll并且成功commitSync
)之后宕机了,重新启动之后它仍然能够从第6条消息开始消费,因为Committed Offset
已经被Kafka记录为5。
在Kafka 0.9前,Committed Offset
信息保存在zookeeper
的consumers/{group}/offsets/{topic}/{partition}
目录中(zookeeper
并不适合进行大批量的读写操作,尤其是写操作)。而在0.9之后,所有的offset
信息都保存在了Broker
上的一个名为_consumer_offsets
的topic
中。
顺序读写
Kafka的Partition
中写入数据,是通过分段、追加日志的方式,这在很大程度上将读写限制为顺序 I/O(sequential I/O),这在大多数的存储介质上都很快。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。
Page Cache
为了优化读写性能,Kafka利用了操作系统本身的Page Cache
,就是利用操作系统自身的内存而不是JVM空间内存。这样做可以避免Object消耗,如果是使用 Java 堆,Java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多;还能避免GC问题,随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题。
相比于使用JVM
或in-memory cache
等数据结构,利用操作系统的Page Cache
更加简单可靠。首先,操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于Page Cache
做了大量优化,提供了write-behind
、read-ahead
以及flush
等多种机制。再者,即使服务进程重启,系统缓存依然不会消失,避免了in-process cache
重建缓存的过程。
通过操作系统的Page Cache
,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。
零拷贝
Linux操作系统 零拷贝 机制使用了sendfile
方法, 允许操作系统将数据从Page Cache
直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据。零拷贝的技术基础是DMA,又称之为直接内存访问。DMA 传输将数据从一个地址空间复制到另外一个地址空间。当CPU 初始化这个传输动作,传输动作本身是由 DMA 控制器来实行和完成。因此通过DMA,硬件则可以绕过CPU,自己去直接访问系统主内存。很多硬件都支持DMA,其中就包括网卡、声卡、磁盘驱动控制器等。通过这种 “零拷贝” 的机制,Page Cache
结合sendfile
方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。
当Kafka客户端从服务器读取数据时,如果不使用零拷贝技术,那么大致需要经历这样的一个过程:
如果使用零拷贝技术,那么只需要:
批量读写
Kafka数据读写也是批量的而不是单条的。除了利用底层的技术外,Kafka还在应用程序层面提供了一些手段来提升性能,最明显的就是使用批次,在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。
批量压缩
Kafka可以把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优,读取数据的时候配合sendfile直接输出。并且Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议。
Kafka的topic
数量多性能会下降的主要原因是topic
在物理层面以partition
为分组,一个topic
可以分成若干个partition
,partition
还可以细分为logSegment
,一个partition
物理上由多个logSegment
组成,logSegment
文件由两部分组成,分别为.index
文件和.log
文件,分别表示为Segment
索引文件和数据文件。Kafka在Broker
接受并存储消息的时候,是将消息数据使用分段、追加日志的方式写入log文件,在很大程度上将读写限制为顺序 I/O(sequential I/O),那么如果topic
数量很多,即使每个topic
只有1个partition
,也会导致总分区数很多,磁盘读写退化为随机,影响性能。同时Kafka中topic
的元数据是在zookeeper
中的,大量topic
确实会造成性能瓶颈(zk不适合做高并发的读写操作),不仅在磁盘读写上。而且topic
太多造成partition
过多。partition
是kafka的最小并行单元,每个partition
都会在对应的broker
上有日志文件。当topic
过多,partition
增加,日志文件数也随之增加,就需要允许打开更多的文件数。partition
过多在controller
选举和controller
重新选举partition leader
的耗时会大大增加,造成kafka不可用的时间延长。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流