storm
datasource -->bolt| || |bolt--> 有向无环图boltstorm与传统数据库区别传统数据库先存后计算,而storm则是先算后存甚至不存传统关系数据库很难部署实时计算,只能部署定时任务统计分析窗口数据关系型数据库注重事务,并发控制,相对storm来说比较简陋storm【速度】与hadoop【海量数据】,spark【内存计算框架】等流行的大数据方案核心代码clojure实用程序python,使用java开发拓扑wordcount逻辑sentence spout -->split sentence bolt -->word count bolt -->report bolt[root target]# cd ~/soft[root soft]# lsmaven Python-2.7.2 storm-0.9.1 storm-starter zookeeper-3.3.6[root soft]# cd zookeeper-3.3.6[root zookeeper-3.3.6]# lsbin contrib ivysettings.xml NOTICE.txt zookeeper-3.3.6.jarbuild.xml data ivy.xml README.txt zookeeper-3.3.6.jar.ascCHANGES.txt dist-maven lib recipes zookeeper-3.3.6.jar.md5conf docs LICENSE.txt src zookeeper-3.3.6.jar.sha1[root zookeeper-3.3.6]# cd bin[root@localhost bin]# lsREADME.txt zkCli.cmd zkEnv.cmd zkServer.cmd zookeeper.outzkCleanup.sh zkCli.sh zkEnv.sh zkServer.sh[root@localhost bin]# ./zkCli.sh[zk: localhost:2181(CONNECTED) 0] ls /[storm, zookeeper][zk: localhost:2181(CONNECTED) 1] storm管理命令storm rebalance增加节点之后/activate/deactivate/kill 拓扑名topology运行流程storm提交后,会把代码首先存放到nimbus节点的inbox目录下,之后会把当前storm运行配置生成一个stormconf.ser文件放到nimbus节点的stormdist目录下,在此目录下还有序列化后的toplogy代码文件2.在设定topology所关联的spout和bolts时,可以同时设置当前spout和bolt的executor数目和task数目,默认情况下,一个topology的task总和是executor的总和一致的,之后,系统根据workerd的数尽量平均这些task的执行,work在哪个supervisor节点运行是由本身决定的3.任务分配好后,niimbus节点会将任务信息提交到zoopeeker集群,同时在zoopecker集群中会有workerbeats节点,这里存储了所有worker进程的心跳信息4supervisor节点会不断的轮训zookeeper集群,在zookeeper的assignment节点保存了所有toplogy的任务分配信息代码存储目录之间的关联关系,supervisor通过轮训此节点的内容,来领取自己的任务,启动worker进程5.一个topogy运行之后,就会不断的通过spout来发送spout流,通过bolts来不断处理接收的stream流,stream流式误解的本地运行的提交方式LocalCluster cluster=new LocalCluster();cluster.submitTopology(TOPLOGY_NAME,conf,builder.createTopology())Thread.sleep(2000)cluster.shutdown();分布式提交方式:StormSubmitter。submitToplogy(TOPLOGY_NAME,conf,builder.createTopology())topology的运行需要注意的是,在storm代码编写完成之后,需要打成jar包放在nimbus中运行打包的时候不要加依赖的包,否者会出现重复的配置文件,因为他运行之前会加载本地的storm。yaml配置文件storm jar StormTology.jar mainclassstorm守护进程nimbus toUIDRPCJAR storm jar topology_jar topology_class argsjar是用于提交集群拓扑他运行topology_class main方法,上传jar到nimbus,由nimbus发布到集群一旦提交,storm会激活拓扑并开始处理topology_class main方法,main方法会调用stormsubmit.submittopology方法, 并且提供一个唯一的拓扑名, 若这个名字存在那么失败,常见方法是用命令方法来指定拓扑名称maven新建一个项目mvn archetype:create/generate -DgroupId=storm.test -DartifactId=teststorm -DpackageName=cn.dataguru.stormpom.xml<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>storm.test</groupId> <artifactId>teststorm</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>teststorm</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>storm</groupId><artifactId>storm</artifactId><version>0.9.0.1</version><scope>provided</scope></dependency> </dependencies> <repositories><repository><id>clojars.org</id><url>http://clojars.org/repo</url></repository></repositories><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.2.1</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass /></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build> </project>package cn.dataguru.storm;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Tuple;public class LearningStormBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; public void execute(Tuple input, BasicOutputCollector collector) { // fetched the field "site" from input tuple. String test = input.getStringByField("site"); // print the value of field "site" on console. System.out.println("Name of input site is : " + test); } public void declareOutputFields(OutputFieldsDeclarer delarer) { }}package cn.dataguru.storm;import java.util.HashMap;import java.util.Map;import java.util.Random;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;public class LearningStormSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector spoutOutputCollector; private static final Map<Integer, String> map = new HashMap<Integer, String>(); static { map.put(0, "google"); map.put(1, "facebook"); map.put(2, "twitter"); map.put(3, "youtube"); map.put(4, "linkedin"); } public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) { // Open the spout this.spoutOutputCollector = spoutOutputCollector; } public void nextTuple() { // Storm cluster repeatedly calls this method to emit a continuous // stream of tuples. final Random rand = new Random(); // generate the random number from 0 to 4. int randomNumber = rand.nextInt(5); spoutOutputCollector.emit(new Values(map.get(randomNumber))); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("site")); }}package cn.dataguru.storm;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.generated.AlreadyAliveException;import backtype.storm.generated.InvalidTopologyException;import backtype.storm.topology.TopologyBuilder;public class LearningStormTopology { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { // create an instance of TopologyBuilder class TopologyBuilder builder = new TopologyBuilder(); // set the spout class builder.setSpout("LearningStormSpout", new LearningStormSpout(), 2); // set the bolt class builder.setBolt("LearningStormBolt", new LearningStormBolt(), 4) .shuffleGrouping("LearningStormSpout"); Config conf = new Config(); conf.setDebug(true); // create an instance of LocalCluster class for // executing topology in local mode. LocalCluster cluster = new LocalCluster(); // LearningStormTopolgy is the name of submitted topology. cluster.submitTopology("LearningStormToplogy", conf, builder.createTopology()); try { Thread.sleep(10000); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the LearningStormTopology cluster.killTopology("LearningStormToplogy"); // shutdown the storm test cluster cluster.shutdown(); }}mvn install[root@localhost teststorm]# cd target[root@localhost target]# lsarchive-tmp test-classesclasses teststorm-0.0.1-SNAPSHOT.jarmaven-archiver teststorm-0.0.1-SNAPSHOT-jar-with-dependencies.jarmaven-status teststorm-1.0-SNAPSHOT.jarsurefire-reports teststorm-1.0-SNAPSHOT-jar-with-dependencies.jar[root@localhost teststorm]# mvn compile exec:java -Dexec:java -Dexec.classpathScope=compile -Dexec.mainClass=cn.dataguru.storm.LearningStormTopology cn.dataguru.storm.LearningStormTopology[root@localhost teststorm]# storm jar teststorm-0.0.1-SNAPSHOT-jar-with-dependencies.jar cn.dataguru.storm.LearningStormTopology 集群方式conf.setNumWorkers(3);StormSubmitter.submitTopology("name", conf, builder.createTopology());以下的注释掉 cluster.submitTopology("LearningStormToplogy", conf, builder.createTopology()); try { Thread.sleep(10000); StormSubmitter.submitTopology("name", conf, builder.createTopology()); } catch (Exception exception) { System.out.println("Thread interrupted exception : " + exception); } // kill the LearningStormTopology cluster.killTopology("LearningStormToplogy"); // shutdown the storm test cluster cluster.shutdown();