超人学院Hadoop,Hadoop,吴超,Hadoop网络培训,hadoop网络学院,最好hadoop培训机构,Hadoop远程培训,Hadoop免费视频下载,Hadoop学院,吴超Hadoop视频,大数据,云计算

查看: 3048|回复: 2

storm的定时任务怎么使用

[复制链接]

31

主题

48

帖子

299

积分

机器学习第1期

积分
299
发表于 2015-8-19 10:54:52 | 显示全部楼层 |阅读模式
请问storm的定时任务怎么使用,直接这么使用可以么?
String simpleName = LogProcess.class.getSimpleName();
                Timer timer = new Timer();
                timer.schedule(new TimerTask() {


                        @Override
                        public void run() {
                        //执行程序 定时刷新数据存放在redis中,给别的程序使用 保证两个程序取的数据是同一份
                        }
                }, 3000, 1000);
                if (args.length > 0) {
                        try {
                                StormSubmitter.submitTopology(simpleName, new Config(),
                                                builder.createTopology());
                        } catch (AlreadyAliveException e) {
                                e.printStackTrace();
                        } catch (InvalidTopologyException e) {
                                e.printStackTrace();
                        }
                } else {
                        LocalCluster localCluster = new LocalCluster();
                        localCluster.submitTopology(simpleName, new Config(),
                                        builder.createTopology());
                }

回复

使用道具 举报

1

主题

842

帖子

2157

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
2157
发表于 2015-8-19 12:18:09 | 显示全部楼层
storm中的定时任务可以这样使用,后期会把这个内容增加到课程中
1:在main中设置
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);// 设置本Bolt定时发射数据
2:在bolt中使用下面代码判断是否是触发用的bolt
tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
如果为true,则执行定时任务需要执行的代码,最后return,如果为false,则执行正常的tuple处理的业务逻辑

具体参考下面的代码
  1. package cn.crxy.storm;

  2. import java.util.Map;

  3. import backtype.storm.Config;
  4. import backtype.storm.Constants;
  5. import backtype.storm.LocalCluster;
  6. import backtype.storm.messaging.local;
  7. import backtype.storm.spout.SpoutOutputCollector;
  8. import backtype.storm.task.OutputCollector;
  9. import backtype.storm.task.TopologyContext;
  10. import backtype.storm.topology.OutputFieldsDeclarer;
  11. import backtype.storm.topology.TopologyBuilder;
  12. import backtype.storm.topology.base.BaseRichBolt;
  13. import backtype.storm.topology.base.BaseRichSpout;
  14. import backtype.storm.tuple.Fields;
  15. import backtype.storm.tuple.Tuple;
  16. import backtype.storm.tuple.Values;

  17. public class LocalStormTopology {
  18.        
  19.         /**
  20.          * 数据源spout
  21.          * @author Administrator
  22.          *
  23.          */
  24.         public static class DataSourceSpout extends BaseRichSpout{
  25.                 private Map conf;
  26.                 private TopologyContext context;
  27.                 private SpoutOutputCollector collector;
  28.                 /**
  29.                  * 本实例运行时,首先被调用,并且只会被调用一次
  30.                  */
  31.                 public void open(Map conf, TopologyContext context,
  32.                                 SpoutOutputCollector collector) {
  33.                         this.conf = conf;
  34.                         this.context = context;
  35.                         this.collector = collector;
  36.                 }
  37.                 /**
  38.                  * 死循环,会一直执行
  39.                  */
  40.                 int i = 0;
  41.                 public void nextTuple() {
  42.                         System.out.println("spout:"+i);
  43.                         //把数据发送给下一个bolt
  44.                         //values是一个list列表
  45.                         collector.emit(new Values(i++));
  46.                         try {
  47.                                 Thread.sleep(1000);
  48.                         } catch (InterruptedException e) {
  49.                                 e.printStackTrace();
  50.                         }
  51.                 }
  52.                 /**
  53.                  * 声明输出的字段
  54.                  */
  55.                 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  56.                         //这里的fields和nexttuple中的values是一一对应的。
  57.                         declarer.declare(new Fields("num"));
  58.                 }
  59.         }
  60.        
  61.         /**
  62.          * 对spout发射过来的额数据进行求和
  63.          * @author Administrator
  64.          *
  65.          */
  66.         public static class sumbolt extends BaseRichBolt{
  67.                 private Map stormConf;
  68.                 private TopologyContext context;
  69.                 private OutputCollector collector;
  70.                
  71.                
  72.                 public void prepare(Map stormConf, TopologyContext context,
  73.                                 OutputCollector collector) {
  74.                         this.stormConf = stormConf;
  75.                         this.context = context;
  76.                         this.collector = collector;
  77.                 }
  78.                
  79.                 /**
  80.                  * 相当于是死循环--接收spout发射过来的数据
  81.                  */
  82.                 int sum = 0;
  83.                 public void execute(Tuple input) {
  84.                         if(input.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)){
  85.                                 //定时执行的任务
  86.                                 System.out.println("定时输出:"+sum);
  87.                                 return;
  88.                         }else{
  89.                                 //执行正常的tuple处理逻辑       
  90.                                 Integer value = input.getIntegerByField("num");
  91.                                 sum+=value;
  92.                         }
  93.                        
  94.                 }

  95.                 public void declareOutputFields(OutputFieldsDeclarer declarer) {
  96.                        
  97.                 }
  98.         }
  99.        
  100.         public static void main(String[] args) {
  101.                 TopologyBuilder topologyBuilder = new TopologyBuilder();
  102.                 topologyBuilder.setSpout("spout_id", new DataSourceSpout());
  103.                 topologyBuilder.setBolt("bolt_id", new sumbolt()).shuffleGrouping("spout_id");
  104.                
  105.                 LocalCluster localCluster = new LocalCluster();
  106.                 Config config = new Config();
  107.                 config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60);
  108.                 localCluster.submitTopology(LocalStormTopology.class.getSimpleName(), config, topologyBuilder.createTopology());
  109.                
  110.         }
  111.        
  112.        
  113. }
复制代码
回复 支持 反对

使用道具 举报

3

主题

12

帖子

79

积分

就业班第10期

积分
79
发表于 2016-10-13 15:22:46 | 显示全部楼层
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐 上一条 /4 下一条

QQ|Archiver|手机版|小黑屋|超人学院  

Copyright 2013 最新最精彩-社区论坛 版权所有 All Rights Reserved.

Powered by Discuz! X3.1 Copyright © 2001-2013 Comsenz Inc.    All Rights Reserved.

快速回复 返回顶部 返回列表