博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Storm实现累加求和操作
阅读量:5993 次
发布时间:2019-06-20

本文共 3263 字,大约阅读时间需要 10 分钟。

package com.csylh;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;import java.util.Map;/** * Description:使用Storm实现累加求和操作 * * @author: 留歌36 * Date:2018/9/3 16:50 */public class LocalSumStormTopology {    /**     * Spout需要继承BaseRichSpout     * 数据源需要产生数据并发射到Bolt     */    public static class DataSourceSpout extends BaseRichSpout{        //定义一个发射器        private SpoutOutputCollector collector;        /**         * 初始化方法 只是会被调用一次         * @param conf     配置参数         * @param context 上下文:相当于一个框 可以从里面获取许多东西         * @param collector 数据发射器         */        @Override        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {            //将传入的collector发射器 对私有变量 进行赋初值            this.collector = collector;        }        int number = 0;        /**         *  用于产生数据         *  生产中肯定是从消息队列中获取数据         *  这个方法是一个死循环         */        @Override        public void nextTuple() {            //发送方式,调用上面定义的数据发射器            this.collector.emit(new Values(number++));            System.out.println("Spout==》发送的数据:" + number);            //每隔1s中发射一次,防止数据产生太快            Utils.sleep(1000);        }        /**         * 声明输出字段         * @param declarer         */        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {            declarer.declare(new Fields("num"));        }    }    /**     * Bolt需要继承BaseRichBolt     * 用于接收数据并对数据进行处理     */    public static class SumBolt extends BaseRichBolt{        /**         *  初始化方法 ,会被执行一次         * @param stormConf         * @param context         * @param collector  这里的数据发射器,由于业务逻辑中没有没有必要进行放下发的操作,所以就是没有必要进行new一个         */        @Override        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {        }        int sum = 0;        /**         * 也是一个死循环 ,职责: 获取Spout发射过来的数据         * @param input         */        @Override        public void execute(Tuple input) {           //Bolt中获取值可以通过index获取            // 也可以根据上一个环节中定义的filed的名称获取(***推荐)          Integer value = input.getIntegerByField("num");          sum += value;          System.out.println("Bolt :Sum = ["+ sum + "]");        }        @Override        public void declareOutputFields(OutputFieldsDeclarer declarer) {        }    }    public static void main(String[] args) {        TopologyBuilder builder = new TopologyBuilder();        builder.setSpout("DataSourceSpout",new DataSourceSpout());        builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");        //创建一个本地的Storm集群 ,本地模式运行,不需要搭建Storm集群        LocalCluster localCluster = new LocalCluster();        localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());    }}

转载于:https://www.cnblogs.com/liuge36/p/9882766.html

你可能感兴趣的文章
pku_1001_Exponentiation(求高精度的幂)
查看>>
EasyTimer
查看>>
SQL Server Dump介绍
查看>>
(转)AlteraFPGA命名的意义
查看>>
Powercenter体系结构和主要组件介绍
查看>>
Android开发历程_10(LayoutAnimationController的初步使用)
查看>>
NYOJ-488 素数环
查看>>
Git 一些常用命令
查看>>
mac:添加环境变量
查看>>
uva 705 Slash Maze
查看>>
Tomcat部署发布JSP应用程序的三种方法
查看>>
Oracle中dual表的用途介绍
查看>>
mfc ui库
查看>>
hduTHE MATRIX PROBLEM(差分约束)
查看>>
mongodb工具
查看>>
数据库别名AS区别
查看>>
codejumper的跳转代码
查看>>
【百度地图API】情人节求爱大作战——添加标注功能
查看>>
Oracle误删除表数据后的恢复具体解释
查看>>
CII-2.4指针常量和常量指针
查看>>