消息队列之kafka(集群搭建及shell操作)-创新互联-成都快上网建站

消息队列之kafka(集群搭建及shell操作)-创新互联

1.kafka集群搭建

kafka安装包下载地址:
官网网址:http://kafka.apache.org/quickstart
中文官网:http://kafka.apachecn.org/quickstart.html
在 windows 平台,从官网下载:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/
在 centos 平台:wgethttp://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

成都创新互联主营容城网站建设的网络公司,主营网站建设方案,app开发定制,容城h5小程序定制开发搭建,容城网站营销推广欢迎容城等地区企业咨询
(1)集群部署的基础环境准备:

A: 安装 JDK 1.8
A: 安装 zookeeper 集群(也可以使用自带 ZooKeeper,但是不推荐)

(2)集体搭建:

版本:kafka_2.11-1.1.0
集群规划:hadoop01、hadoop02、hadoop03 (三个节点)
① 解压安装包到对应的目录
tar zxvfkafka_2.11-1.1.0.tgz -C /application/
② 修改配置文件
[hadoop@hadoop01 ~]$ cd /application/kafka_2.11-1.1.0/config/
[hadoop@hadoop01 ~]$ vim server.properties
broker.id=5 ## 当前集群中的每个 broker 节点的一个唯一编号,每个节点都不一样
消息队列之kafka(集群搭建及shell操作)

listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://hadoop01:9092
host.name=hadoop01## 每个节点指定为当前主机名,上面也是
消息队列之kafka(集群搭建及shell操作)

log.dirs=/home/hadoop/data/kafka-logs ## kafkabroke工作节点数据存储目录
num.partitions=1 ## kafka 的 topic 的默认分区数
消息队列之kafka(集群搭建及shell操作)

log.retention.hours=168 ## 日志的最长保存时间
消息队列之kafka(集群搭建及shell操作)

zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181 ## zookeeper 地址
消息队列之kafka(集群搭建及shell操作)
③ 批量发送
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop02:$PWD
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop03:$PWD
千万注意:要修改$KAFKA_HOME/config/server.properties 文件中的对应 broker 节点的信息

broker.id=your broker id 
host.name=your broker hostname 
advertised.listeners=PLAINTEXT:// your broker hostname:9092

④ 配置环境变量
[hadoop@hadoop01 application]$ sudo etc/profile
export KAFKA_HOME=/application/kafka_2.11-1.1.0
[hadoop@hadoop01 application]$source/etc/profile

⑤ 启动集群,进行验证(每一个节点都要启动)

nohup kafka-server-start.sh \
/application/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \
2>~/logs/kafka_err.log &

2.kafka相关shell

(1)启动集群每个节点的进程

nohup kafka-server-start.sh \
 /home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \ 2>~/logs/kafka_err.log &

(2) 创建 topic

kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafka_test
**参数介绍**
--create 创建 kafka topic
--zookeeper 指定 kafka 的 zookeeper 地址
--partitions 指定分区的个数
--replication-factor 指定每个分区的副本个数

(3) 查看已经创建的所有 kafka topic

kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--list \

(4) 查看某个指定的 kafka topic 的详细信息

kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--describe \   #查看详信息
--topic kafka_test  #指定需要查看的topic

消息队列之kafka(集群搭建及shell操作)
Topic:topic的名称
Partition:topic的分区编号
Leader:负责处理消息和读写,leader是从所有节点中随机选出
Replicas:列出了所有的副本节点,不管节点是否在服务中。
isr:正在服务中的节点。
(5) 开启生产者模拟生成数据:

kafka-console-producer.sh \
--broker-list hadoop01:9092 \  # broker的节点列表
--topic kafka_test

(6) 开启消费者模拟消费数据:

kafka-console-consumer.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--from-beginning \  #从哪里开始消费
--topic kafka_test

(7) 查看某 topic 某个分区的偏移量大值和最小值

kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--topic kafka_test \
--time -1 \
--broker-list hadoop01:9092 \
--partitions 1

(8) 增加 topic 分区数(这个操作是不被允许的)

kafka-topics.sh \
--alter \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \
--partitions 5 /
--replication-factor 2

(9) 删除 Topic

kafka-topics.sh \
--delete \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \

3.kafka的consumer group 位移offset重设

   这一节,说来也巧,最近接收的项目是准实时(OOG+kafka+stream),并同事也跟我聊了聊小编博客的建议,大体上说,就是博客写的太过简单,虽然通俗易懂,但是真正实际应用的地方并不多,建议我能加上一些复杂并且经常用到的东西;说白了就是懒的找资料,想看小编现成的;没办法,那就加上一些比较常用的。
  首先小编就介绍下,针对kafka通过group 消费了topic的数据后,如何自定义kafka数据消费的位置,之前的操作都是“--from-beginning”,每次都是从头开始消费,如果消费语句为精准一次,那么该如何操作呢?
  这里通过如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer group)的位移。需要特别强调的是,这是2.11-0.11.0.0版本提供的新功能且只适用于新版本consumer。在新版本之前,如果要为已有的consumer group调整位移必须要手动编写Java程序调用KafkaConsumer#seek方法,费时费力不说还容易出错。0.11.0.0版本丰富了kafka-consumer-groups脚本的功能,用户可以直接使用该脚本很方便地为已有的consumer group重新设置位移,但前提必须是consumer group必须是inactive的,即不能是处于正在工作中的状态
  这里首先介绍一下重设位移的三个步骤:
    - 确定consumer group在topic下的作用域
      → --all-topics :为consumer group下所有topic的所有分区调整位移
      → --topic t1 --topic t2:为指定的若干个topic的所有分区调整位移
      → --topic t1:0,1,2:为指定的topic分区调整位移

    - 确定位移重设策略
      → --to-earliest:把位移调整到分区当前最小位移
      → --to-latest: 把位移调整到分区当前最新位移
      → --to-current:把位移调整到分区当前位移
      → --to-offset :把位移调整到指定位移处
      → --shift-by N:把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
      → --to-datetime :把位移调整到大于给定时间的最早位移处,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
      → --by-duration :把位移调整到距离当前时间指定间隔的位移处,duration格式是PnDTnHnMnS,比如PT0H5M0S
    - 确定执行方案
      → 什么参数都不加:只是打印出位移调整方案,不具体执行
      → --execute:执行真正的位移调整
      → -export:把位移调整方案按照CSV格式打印,方便用户成csv文件,供后续直接使用

   具体案例演示:
#创建topic,并向其中生产数据

①   创建topic
kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 3 \
--topic 'test-group'

②   创建生产者,生产数据
kafka-producer-perf-test.sh \
--topic 'test-group' \
--num-records 500 \
--throughput -1 \
--record-size 100 \
--producer-props bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092 acks=-1
③   启动一个console consumer程序,组名设置为test-group:
kafka-console-consumer.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--topic 'test-group' \
--from-beginning \
--consumer-property group.id=test-group

④   查看当前消费者组消费topic 的细节
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--describe

由图可知,当前消费者中将topic中的数据完全消费。 LAG表示剩余未消费的message。

#案例演示

## --to-earliest:将偏移量设置为partition开头 
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-earliest \
--execute
## --to-latest:把位移调整到分区当前最新位移
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-latest \
--execute
## --to-offset:把位移调整到指定位移处
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-offset 100 \
--execute
## --to-current:把位移调整到分区当前位移
    kafka-consumer-groups.sh \
    --bootstrap-server  hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --to-current \
    --execute
## --shift-by N  把位移调整到当前位移 + N处,注意N可以是负数,表示向前移动
        kafka-consumer-groups.sh \
        --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
        --group test-group \
        --reset-offsets \
        --all-topics \
        --shift-by -100 \
        --execute
##  --to-datetime :将offset调整到大于XX日期最早的位移出
    kafka-consumer-groups.sh \
    --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --to-datetime 2019-07-31T03:40:33.000
## --by-duration  把位移调整到距离当前时间指定间隔的位移处
    kafka-consumer-groups.sh \
    --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --by-duration PT0H20M0S

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


网站名称:消息队列之kafka(集群搭建及shell操作)-创新互联
标题路径:http://kswjz.com/article/cddddj.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流