扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章主要讲解了“Storm如何接收数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Storm如何接收数据”吧!
站在用户的角度思考问题,与客户深入沟通,找到遂平网站设计与遂平网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:成都网站设计、网站制作、企业官网、英文网站、手机端网站、网站推广、申请域名、网页空间、企业邮箱。业务覆盖遂平地区。
简要的模拟如何接收数据:
package com.cc.storm.spout; import java.io.IOException; import java.util.Map; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import redis.clients.jedis.JedisPubSub; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomEmitSpout extends BaseRichSpout { private Random _random; private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RandomEmitSpout.class); private SpoutOutputCollector _collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _random = new Random(); } @Override public void nextTuple() { try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } String[] userIds = { "1", "2", "3", "4" }; String[] merchandiseIDS = { "1" }; _collector.emit(new Values(userIds[_random.nextInt(userIds.length)], merchandiseIDS[_random.nextInt(merchandiseIDS.length)])); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { } }
plus: 如果您采用的是Redis
那么:
package com.cc.storm.spout; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPubSub; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RedisPubSubSpout extends BaseRichSpout { /** * @Fields serialVersionUID : TODO */ private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RedisPubSubSpout.class); private SpoutOutputCollector _collector; private final String host; private final int port; private final String pattern; LinkedBlockingQueuequeue; JedisPool pool; public RedisPubSubSpout(String host, int port, String pattern) { // TODO Auto-generated constructor stub this.host = host; this.port = port; this.pattern = pattern; } // 监听线程,从redis订阅的兴趣事件中获取数据 class ListenerThread extends Thread { private LinkedBlockingQueue queue; JedisPool pool; String pattern; public ListenerThread(LinkedBlockingQueue queue, JedisPool pool, String pattern) { // TODO Auto-generated constructor stub this.queue = queue; this.pool = pool; this.pattern = pattern; } @Override public void run() { JedisPubSub listener = new JedisPubSub() { @Override public void onUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPUnsubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPSubscribe(String arg0, int arg1) { // TODO Auto-generated method stub } @Override public void onPMessage(String pattern, String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } @Override public void onMessage(String channel, String message) { // TODO Auto-generated method stub queue.offer(message); } }; Jedis jedis = pool.getResource(); try { jedis.psubscribe(listener, pattern); } finally { pool.returnResource(jedis); } } } @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub _collector = collector; // 队列最大支持1000个 queue = new LinkedBlockingQueue (1000); JedisPoolConfig config = new JedisPoolConfig(); // error pool = null; ListenerThread listener = new ListenerThread(queue, pool, pattern); // 启动线程 listener.start(); } @Override public void nextTuple() { // TODO Auto-generated method stub String ret = queue.poll(); if (null == ret) { // 如果队列中暂无数据可取,休息500ms Utils.sleep(500); } else { // 数据格式为 “userID:merchandiseID”,可以依据需求更改此处 String[] s = ret.split(":"); _collector.emit(new Values(s[0], s[1])); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { // TODO Auto-generated method stub pool.destroy(); } }
感谢各位的阅读,以上就是“Storm如何接收数据”的内容了,经过本文的学习后,相信大家对Storm如何接收数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流