package com.csylh;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;import java.util.Map;/** * Description:使用Storm实现累加求和操作 * * @author: 留歌36 * Date:2018/9/3 16:50 */public class LocalSumStormTopology { /** * Spout需要继承BaseRichSpout * 数据源需要产生数据并发射到Bolt */ public static class DataSourceSpout extends BaseRichSpout{ //定义一个发射器 private SpoutOutputCollector collector; /** * 初始化方法 只是会被调用一次 * @param conf 配置参数 * @param context 上下文:相当于一个框 可以从里面获取许多东西 * @param collector 数据发射器 */ @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //将传入的collector发射器 对私有变量 进行赋初值 this.collector = collector; } int number = 0; /** * 用于产生数据 * 生产中肯定是从消息队列中获取数据 * 这个方法是一个死循环 */ @Override public void nextTuple() { //发送方式,调用上面定义的数据发射器 this.collector.emit(new Values(number++)); System.out.println("Spout==》发送的数据:" + number); //每隔1s中发射一次,防止数据产生太快 Utils.sleep(1000); } /** * 声明输出字段 * @param declarer */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("num")); } } /** * Bolt需要继承BaseRichBolt * 用于接收数据并对数据进行处理 */ public static class SumBolt extends BaseRichBolt{ /** * 初始化方法 ,会被执行一次 * @param stormConf * @param context * @param collector 这里的数据发射器,由于业务逻辑中没有没有必要进行放下发的操作,所以就是没有必要进行new一个 */ @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } int sum = 0; /** * 也是一个死循环 ,职责: 获取Spout发射过来的数据 * @param input */ @Override public void execute(Tuple input) { //Bolt中获取值可以通过index获取 // 也可以根据上一个环节中定义的filed的名称获取(***推荐) Integer value = input.getIntegerByField("num"); sum += value; System.out.println("Bolt :Sum = ["+ sum + "]"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("DataSourceSpout",new DataSourceSpout()); builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout"); //创建一个本地的Storm集群 ,本地模式运行,不需要搭建Storm集群 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology()); }}