扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
storm-kafka-client使用的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
巴林左旗ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为成都创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:18982081108(备注:SSL证书合作)期待与您的合作!
package hgs.core.sk; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.kafka.spout.ByTopicRecordTranslator; import org.apache.storm.kafka.spout.KafkaSpout; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; //参考如下 //https://community.hortonworks.com/articles/87597/how-to-write-topology-with-the-new-kafka-spout-cli.html //https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L52 public class StormKafkaMainTest { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //该类将传入的kafka记录转换为storm的tuple ByTopicRecordTranslatorbrt = new ByTopicRecordTranslator<>( (r) -> new Values(r.value(),r.topic()),new Fields("values","test7")); //设置要消费的topic即test7 brt.forTopic("test7", (r) -> new Values(r.value(),r.topic()), new Fields("values","test7")); //类似之前的SpoutConfig KafkaSpoutConfig ksc = KafkaSpoutConfig //bootstrapServers 以及topic(test7) .builder("bigdata01:9092,bigdata02:9092,bigdata03:9092", "test7") //设置group.id .setProp(ConsumerConfig.GROUP_ID_CONFIG, "skc-test") //设置开始消费的气势位置 .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST) //设置提交消费边界的时长间隔 .setOffsetCommitPeriodMs(10_000) //Translator .setRecordTranslator(brt) .build(); builder.setSpout("kafkaspout", new KafkaSpout<>(ksc), 2); builder.setBolt("mybolt1", new MyboltO(), 4).shuffleGrouping("kafkaspout"); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(0); try { StormSubmitter.submitTopology("storm-kafka-clients", config, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } /* LocalCluster cu = new LocalCluster(); cu.submitTopology("test", config, builder.createTopology());*/ } } class MyboltO extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple input) { //这里把消息大一出来,在对应的woker下面的日志可以找到打印的内容 String out = input.getString(0); System.out.println(out); //collector.ack(input); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
pom.xml文件
4.0.0 hgs core.sk 1.0.0-SNAPSHOT jar core.sk http://maven.apache.org UTF-8 junit junit 3.8.1 test org.apache.storm storm-kafka-client 1.1.3 org.apache.storm storm-core 1.1.3 provided org.apache.kafka kafka_2.11 1.0.0 org.slf4j slf4j-log4j12 org.apache.zookeeper zookeeper org.clojure clojure 1.7.0 org.apache.kafka kafka-clients 1.0.0 maven-assembly-plugin 2.2 hgs.core.sk.StormKafkaMainTest jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 1.8
//以下为lambda表达式,因为在上面用大了,所以在这儿记录一下,以免以后看不懂 import java.util.UUID; import org.junit.jupiter.api.Test; public class TEst { @Test public void sysConfig() { String[] ags = {"his is my first storm program so i hope it will success", "i love bascketball", "the day of my birthday i was alone"}; String uuid = UUID.randomUUID().toString(); String nexttuple= ags[new Random().nextInt(ags.length)]; System.out.println(nexttuple); } @Test public void lambdaTest() { int b = 100; //该出返回10*a的值、 //"(a) -> 10*a" 相当于 new testinter(); printPerson((a) -> 10*a) ; } void printPerson( testinter t) { //穿过来的t需要一个参数a 即下面借口中定义的方法sysoutitems(int a ) System.out.println(t.sysoutitems(100)); }; } //定义接口,在lambda表达式运用中,必须为借口,并且借口只能有一个方法 interface testinter { T sysoutitems(int a ); //void aAndb(int a, int b ); }
看完上述内容,你们掌握storm-kafka-client使用的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注创新互联行业资讯频道,感谢各位的阅读!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流