扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
成都创新互联公司主营泉州网站建设的网络公司,主营网站建设方案,重庆APP开发,泉州h5成都小程序开发搭建,泉州网站营销推广欢迎泉州等地区企业咨询
tar xvf jdk1.8.0_231.tar.gz -C /usr/local && cd /usr/local
ln -sv jdk1.8.0_231 jdk
vim /etc/profile.d/java.sh
JAVA_HOME=/usr/local/jdk
PATH=$JAVA_HOME/bin:$PATH
vim /usr/local/kafka/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
maxClientCnxns=0
# 集群版的zookeeper添加如下配置
# server.1=ip1:2888:3888
# server.2=ip2:2888:3888
# server.3=ip3:28888:3888
wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
tar xvf kafka_2.11-0.10.2.1.tgz -C /usr/local && cd /usr/local
ln -sv kafka_2.11-0.10.2.1.tgz kafka
vim /usr/local/kafka/bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
/usr/local/kafka/bin/zookeeper-server-start.sh -deamon /usr/local/kafka/conf/zookeeper.properties
/usr/local/kafka/bin/kafka-server-start.sh -deamon /usr/local/kafka/conf/server.properties
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/conf/server.properties
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/conf/zookeeper.properties
/usr/local/zookeeper/bin/zkServer.sh stop|stop
需要一个解析到内网ip地址的域名,内网环境也可以设置/etc/hosts
host.name=kafka.test.com(对应的域名解析需要解到内网ip)
高版本已弃用。低版本0.10.2.1可以用, 仅当listeners属性未配置时被使用,已用listeners属性代替。表示broker的hostname
advertised.listeners=PLAINTEXT://kafka.test.com:9092(高版本用,替代host.name,设置了advertised.listeners不用设置host.name)
注册到zookeeper上并提供给客户端的监听器,如果没有配置则使用listeners。
advertised.host.name(不需要设置,仅作参考)
已弃用。仅当advertised.listeners或者listeners属性未配置时被使用。官网建议使用advertised.listeners
listeners(不需要设置,仅作参考)
需要监听的URL和协议,如:PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093。如果未指定该配置,则使用java.net.InetAddress.getCanonicalHostName()函数的的返回值
[内网ip] kafka.test.com
[外网ip] kafka.test.com
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list IP:9092 --topic TOPIC
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server IP:9092 --topic TOPIC--from-beginning --max-messages 1
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 外网IP:9092 --topic TOPIC --from-beginning --max-messages 1
output {
stdout { codec => rubydebug { metadata => true } }
}
a、topics_pattern 通配问题".* ","."一定不能少
topics_pattern=>"prefix-.*"
b、filter中匹配规则,注意要能匹配到kafka中topic,不同的filebeat和不同的logstash版本对应的topic元数据可能不太一样,这点需要注意
if [type] =~ "prefix-*" {
grok { match =>["[type]","^prefix-(?)"] }
}
if [kafka][topic] =~ "prefix-*" {
grok { match => [ "[kafka][topic]", "^prefix-(?.*$)" ]}
}
if [@metadata][topic] =~ "prefix-*" {
grok { match =>["[@metadata][topic]","^prefix-(?)"] }
}
if [@metadata][kafka][topic] =~ "prefix-*" {
grok { match => [ "[@metadata][kafka][topic]", "^prefix-(?.*$)" ]}
}
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流