storm教程(三):用Java开发storm

2023-11-18

1、操作模式

开始之前,有必要了解一下Storm的操作模式。有下面两种方式。
本地模式
在本地模式下,Storm拓扑结构运行在本地计算机的单一JVM进程上。这个模式用于开发、测试以及调试,因为这是观察所有组件如何协同工作的最简单方法。在这种模式下,我们可以调整参数,观察我们的拓扑结构如何在不同的Storm配置环境下运行。要在本地模式下运行,我们要下载Storm开发依赖,以便用来开发并测试我们的拓扑结构。我们创建了第一个Storm工程以后,很快就会明白如何使用本地模式了。
NOTE: 在本地模式下,跟在集群环境运行很像。不过很有必要确认一下所有组件都是线程安全的,因为当把它们部署到远程模式时它们可能会运行在不同的JVM进程甚至不同的物理机上,这个时候它们之间没有直接的通讯或共享内存。
我们要在本地模式运行本章的所有例子。

远程模式
在远程模式下,我们向Storm集群提交拓扑,它通常由许多运行在不同机器上的流程组成。远程模式不会出现调试信息, 因此它也称作生产模式。不过在单一开发机上建立一个Storm集群是一个好主意,可以在部署到生产环境之前,用来确认拓扑在集群环境下没有任何问题。

2、创建maven的pom.xml文件

要运行我们的拓扑,我们可以编写一个包含基本组件的pom.xml文件。

    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
             <modelVersion>4.0.0</modelVersion>
             <groupId>storm.book</groupId>
             <artifactId>Getting-Started</artifactId>
             <version>0.0.1-SNAPSHOT</version>
             <build>
                 <plugins>
                     <plugin>
                         <groupId>org.apache.maven.plugins</groupId>
                         <artifactId>maven-compiler-plugin</artifactId>
                         <version>2.3.2</version>
                         <configuration>
                             <source>1.6</source>
                             <target>1.6</target>
                             <compilerVersion>1.6</compilerVersion>
                         </configuration>
                     </plugin>
                 </plugins>
             </build>
             <repositories>
                 <!-- Repository where we can found the storm dependencies -->
                 <repository>
                     <id>clojars.org</id>
                     <url>http://clojars.org/repo</url>
                 </repository>
             </repositories>
             <dependencies>
                 <!-- Storm Dependency -->
                 <dependency>
                     <groupId>storm</groupId>
                     <artifactId>storm</artifactId>
                     <version>0.6.0</version>
                 </dependency>
             </dependencies>
    </project>

开头几行指定了工程名称和版本号。然后我们添加了一个编译器插件,告知Maven我们的代码要用Java1.6编译。接下来我们定义了Maven仓库(Maven支持为同一个工程指定多个仓库)。clojars是存放Storm依赖的仓库。Maven会为运行本地模式自动下载必要的所有子包依赖。
一个典型的Maven Java工程会拥有如下结构:
我们的应用目录/
├── pom.xml
└── src
└── main
└── java
| ├── spouts
| └── bolts
└── resources
java目录下的子目录包含我们的代码,我们把要统计单词数的文件保存在resource目录下。
NOTE:命令mkdir -p 会创建所有需要的父目录。

3、Spout

pout WordReader类实现了IRichSpout接口。我们将在第四章看到更多细节。WordReader负责从文件按行读取文本,并把文本行提供给第一个bolt。
NOTE: 一个spout发布一个定义域列表。这个架构允许你使用不同的bolts从同一个spout流读取数据,它们的输出也可作为其它bolts的定义域,以此类推。
例2-1包含WordRead类的完整代码(我们将会分析下述代码的每一部分)。
/**
* 例2-1.src/main/java/spouts/WordReader.java
*/
package spouts;

    import java.io.BufferedReader;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.util.Map;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;

    public class WordReader implements IRichSpout {
        private SpoutOutputCollector collector;
        private FileReader fileReader;
        private boolean completed = false;
        private TopologyContext context;
        public boolean isDistributed() {return false;}
        public void ack(Object msgId) {
                System.out.println("OK:"+msgId);
        }
        public void close() {}
        public void fail(Object msgId) {
             System.out.println("FAIL:"+msgId);
        }
        /**
         * 这个方法做的惟一一件事情就是分发文件中的文本行
         */
        public void nextTuple() {
        /**
         * 这个方法会不断的被调用,直到整个文件都读完了,我们将等待并返回。
         */
             if(completed){
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e) {
                     //什么也不做
                 }
                return;
             }
             String str;
             //创建reader
             BufferedReader reader = new BufferedReader(fileReader);
             try{
                 //读所有文本行
                while((str = reader.readLine()) != null){
                 /**
                  * 按行发布一个新值
                  */
                     this.collector.emit(new Values(str),str);
                 }
             }catch(Exception e){
                 throw new RuntimeException("Error reading tuple",e);
             }finally{
                 completed = true;
             }
         }
         /**
          * 我们将创建一个文件并维持一个collector对象
          */
         public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                 try {
                     this.context = context;
                     this.fileReader = new FileReader(conf.get("wordsFile").toString());
                 } catch (FileNotFoundException e) {
                     throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
                 }
                 this.collector = collector;
         }
         /**
          * 声明输入域"word"
          */
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
             declarer.declare(new Fields("line"));
         }
    }

第一个被调用的spout方法都是public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)。它接收如下参数:配置对象,在定义topology对象是创建;TopologyContext对象,包含所有拓扑数据;还有SpoutOutputCollector对象,它能让我们发布交给bolts处理的数据。下面的代码主是这个方法的实现。
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
try {
this.context = context;
this.fileReader = new FileReader(conf.get(“wordsFile”).toString());
} catch (FileNotFoundException e) {
throw new RuntimeException(“Error reading file [“+conf.get(“wordFile”)+”]”);
}
this.collector = collector;
}
我们在这个方法里创建了一个FileReader对象,用来读取文件。接下来我们要实现public void nextTuple(),我们要通过它向bolts发布待处理的数据。在这个例子里,这个方法要读取文件并逐行发布数据。

    public void nextTuple() {
        if(completed){
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                //什么也不做
            }
            return;
        }
        String str;
        BufferedReader reader = new BufferedReader(fileReader);
        try{
            while((str = reader.readLine()) != null){
                this.collector.emit(new Values(str));
            }
        }catch(Exception e){
            throw new RuntimeException("Error reading tuple",e);
        }finally{
            completed = true;
        }
    }

NOTE: Values是一个ArrarList实现,它的元素就是传入构造器的参数。
nextTuple()会在同一个循环内被ack()和fail()周期性的调用。没有任务时它必须释放对线程的控制,其它方法才有机会得以执行。因此nextTuple的第一行就要检查是否已处理完成。如果完成了,为了降低处理器负载,会在返回前休眠一毫秒。如果任务完成了,文件中的每一行都已被读出并分发了。
NOTE:元组(tuple)是一个具名值列表,它可以是任意java对象(只要它是可序列化的)。默认情况,Storm会序列化字符串、字节数组、ArrayList、HashMap和HashSet等类型。

4、Bolts

现在我们有了一个spout,用来按行读取文件并每行发布一个元组,还要创建两个bolts,用来处理它们(看图2-1)。bolts实现了接口backtype.storm.topology.IRichBolt。
bolt最重要的方法是void execute(Tuple input),每次接收到元组时都会被调用一次,还会再发布若干个元组。
NOTE: 只要必要,bolt或spout会发布若干元组。当调用nextTuple或execute方法时,它们可能会发布0个、1个或许多个元组。你将在第五章学习更多这方面的内容。
第一个bolt,WordNormalizer,负责得到并标准化每行文本。它把文本行切分成单词,大写转化成小写,去掉头尾空白符。
首先我们要声明bolt的出参:
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields(“word”));
}
这里我们声明bolt将发布一个名为“word”的域。
下一步我们实现public void execute(Tuple input),处理传入的元组:
public void execute(Tuple input){
String sentence=input.getString(0);
String[] words=sentence.split(” “);
for(String word : words){
word=word.trim();
if(!word.isEmpty()){
word=word.toLowerCase();
//发布这个单词
collector.emit(new Values(word));
}
}
//对元组做出应答
collector.ack(input);
}
第一行从元组读取值。值可以按位置或名称读取。接下来值被处理并用collector对象发布。最后,每次都调用collector对象的ack()方法确认已成功处理了一个元组。
例2-2是这个类的完整代码。
//例2-2 src/main/java/bolts/WordNormalizer.java
package bolts;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class WordNormalizer implements IRichBolt{
private OutputCollector collector;
public void cleanup(){}
/**
* bolt从单词文件接收到文本行,并标准化它。
* 文本行会全部转化成小写,并切分它,从中得到所有单词。
*/
public void execute(Tuple input){
String sentence = input.getString(0);
String[] words = sentence.split(” “);
for(String word : words){
word = word.trim();
if(!word.isEmpty()){
word=word.toLowerCase();
//发布这个单词
List a = new ArrayList();
a.add(input);
collector.emit(a,new Values(word));
}
}
//对元组做出应答
collector.ack(input);
}
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}

    /**
      * 这个*bolt*只会发布“word”域
      */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

NOTE:通过这个例子,我们了解了在一次execute调用中发布多个元组。如果这个方法在一次调用中接收到句子“This is the Storm book”,它将会发布五个元组。
下一个bolt,WordCounter,负责为单词计数。这个拓扑结束时(cleanup()方法被调用时),我们将显示每个单词的数量。
NOTE: 这个例子的bolt什么也没发布,它把数据保存在map里,但是在真实的场景中可以把数据保存到数据库。

package bolts;

import java.util.HashMap;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class WordCounter implements IRichBolt{
    Integer id;
    String name;
    Map<String,Integer> counters;
    private OutputCollector collector;

    /**
      * 这个spout结束时(集群关闭的时候),我们会显示单词数量
      */
    @Override
    public void cleanup(){
        System.out.println("-- 单词数 【"+name+"-"+id+"】 --");
        for(Map.Entry<String,Integer> entry : counters.entrySet()){
            System.out.println(entry.getKey()+": "+entry.getValue());
        }
    }

    /**
     *  为每个单词计数
     */
    @Override
    public void execute(Tuple input) {
        String str=input.getString(0);
        /**
         * 如果单词尚不存在于map,我们就创建一个,如果已在,我们就为它加1
         */
        if(!counters.containsKey(str)){
            conters.put(str,1);
        }else{
            Integer c = counters.get(str) + 1;
            counters.put(str,c);
        }
        //对元组作为应答
        collector.ack(input);
    }

    /**
     * 初始化
     */
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
        this.counters = new HashMap<String, Integer>();
        this.collector = collector;
        this.name = context.getThisComponentId();
        this.id = context.getThisTaskId();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}

execute方法使用一个map收集单词并计数。拓扑结束时,将调用clearup()方法打印计数器map。(虽然这只是一个例子,但是通常情况下,当拓扑关闭时,你应当使用cleanup()方法关闭活动的连接和其它资源。)

5、主类

你可以在主类中创建拓扑和一个本地集群对象,以便于在本地测试和调试。LocalCluster可以通过Config对象,让你尝试不同的集群配置。比如,当使用不同数量的工作进程测试你的拓扑时,如果不小心使用了某个全局变量或类变量,你就能够发现错误。(更多内容请见第三章)
NOTE:所有拓扑节点的各个进程必须能够独立运行,而不依赖共享数据(也就是没有全局变量或类变量),因为当拓扑运行在真实的集群环境时,这些进程可能会运行在不同的机器上。
接下来,TopologyBuilder将用来创建拓扑,它决定Storm如何安排各节点,以及它们交换数据的方式。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(“word-reader”, new WordReader());
builder.setBolt(“word-normalizer”, new WordNormalizer()).shuffleGrouping(“word-reader”);
builder.setBolt(“word-counter”, new WordCounter()).shuffleGrouping(“word-normalizer”);
在spout和bolts之间通过shuffleGrouping方法连接。这种分组方式决定了Storm会以随机分配方式从源节点向目标节点发送消息。
下一步,创建一个包含拓扑配置的Config对象,它会在运行时与集群配置合并,并通过prepare方法发送给所有节点。
Config conf = new Config();
conf.put(“wordsFile”, args[0]);
conf.setDebug(true);
由spout读取的文件的文件名,赋值给wordFile属性。由于是在开发阶段,设置debug属性为true,Strom会打印节点间交换的所有消息,以及其它有助于理解拓扑运行方式的调试数据。
正如之前讲过的,你要用一个LocalCluster对象运行这个拓扑。在生产环境中,拓扑会持续运行,不过对于这个例子而言,你只要运行它几秒钟就能看到结果。
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(“Getting-Started-Topologie”, conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
调用createTopology和submitTopology,运行拓扑,休眠两秒钟(拓扑在另外的线程运行),然后关闭集群。
例2-3是完整的代码
//例2-3 src/main/java/TopologyMain.java
import spouts.WordReader;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolts.WordCounter;
import bolts.WordNormalizer;

public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
    //定义拓扑
        TopologyBuilder builder = new TopologyBuilder());
        builder.setSpout("word-reader", new WordReader());
        builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
        builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));

    //配置
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(false);

    //运行拓扑
         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Getting-Started-Topologie", conf, builder.createTopology();
        Thread.sleep(1000);
        cluster.shutdown();
    }
}

6、观察运行情况
你已经为运行你的第一个拓扑准备好了。在这个目录下面创建一个文件,/src/main/resources/words.txt,一个单词一行,然后用下面的命令运行这个拓扑:mvn exec:java -Dexec.mainClass=”TopologyMain” -Dexec.args=”src/main/resources/words.txt。
举个例子,如果你的words.txt文件有如下内容: Storm test are great is an Storm simple application but very powerful really Storm is great 你应该会在日志中看到类似下面的内容: is: 2 application: 1 but: 1 great: 1 test: 1 simple: 1 Storm: 3 really: 1 are: 1 great: 1 an: 1 powerful: 1 very: 1 在这个例子中,每类节点只有一个实例。但是如果你有一个非常大的日志文件呢?你能够很轻松的改变系统中的节点数量实现并行工作。这个时候,你就要创建两个WordCounter实例。
builder.setBolt(“word-counter”, new WordCounter(),2).shuffleGrouping(“word-normalizer”);
程序返回时,你将看到: — 单词数 【word-counter-2】 — application: 1 is: 1 great: 1 are: 1 powerful: 1 Storm: 3 — 单词数 [word-counter-3] — really: 1 is: 1 but: 1 great: 1 test: 1 simple: 1 an: 1 very: 1 棒极了!修改并行度实在是太容易了(当然对于实际情况来说,每个实例都会运行在单独的机器上)。不过似乎有一个问题:单词is和great分别在每个WordCounter各计数一次。怎么会这样?当你调用shuffleGrouping时,就决定了Storm会以随机分配的方式向你的bolt实例发送消息。在这个例子中,理想的做法是相同的单词问题发送给同一个WordCounter实例。你把shuffleGrouping(“word-normalizer”)换成fieldsGrouping(“word-normalizer”, new Fields(“word”))就能达到目的。试一试,重新运行程序,确认结果。 你将在后续章节学习更多分组方式和消息流类型。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

storm教程(三):用Java开发storm 的相关文章

随机推荐

  • 根据图片地址得到文件流

    得到文件流 param url 图片地址 return public static byte getFileStream String url try URL httpUrl new URL url HttpURLConnection co
  • Spring Boot入门必会(基本介绍+依赖管理+自动装配)

    目录 一 基础入门 1 Spring Boot 是什么 2 SpringBoot 快速入门 2 1完成步骤 2 2快速入门小结 3 Spring SpringMVC SpringBoot 的关系 3 1梳理关系 3 2如何理解 约定优于配置
  • 青少年ptyhon可以参加的主流比赛大全

    青少年python教学视频ppt源码 青少年python系列目录 老程序员115的博客 CSDN博客 一 全国青少年软件编程等级考试 主办单位 中国电子学会 全国青少年电子信息科普创新联盟 网址 http www qceit org cn
  • React WebApp键盘遮挡输入框?

    写在前面 由于近期工作实在太忙 正在赶项目 抽不出时间整理平时遇到的坑 隔了很长时候没有给大家更新文章了 这次正好利用这个单休的时间写一篇开发时遇到的坑 大家如果有什么好的建议和意见欢迎投递哦 邮箱 lcczmy 163 com 本人使用的
  • 多数据源配置(Oracle+MySql),拿来即用

    前言 此次配置使用Druid数据源 1 pom文件
  • Python爬虫:抓取智联招聘岗位信息和要求(进阶版)

    本文的文字及图片来源于网络 仅供学习 交流使用 不具有任何商业用途 版权归原作者所有 如有问题请及时联系我们以作处理 以下文章来源于腾讯云 作者 王强 想要学习Python Python学习交流群 1039649593 满足你的需求 资料都
  • Linux添加永久静态路由信息

    首先让我们查看一下当前机器的路由表 执行如下命令 route n 当前本机只有一条默认路由 网关是192 168 142 1 Linux添加永久静态路由信息 然后我们确认一下当前工作的网卡 这里我们使用的是eth0 补充 如果机器中存在多块
  • 在Linux虚拟机中普通用户和超级用户直接的切换——指令

    Linux虚拟机中普通用户和超级用户直接的切换 指令 1 普通用户切换到root用户 普通用户到高级用户 su root 切换到root用户 输入的命令就是设置的root的密码 或者输入 redhat fengshuai su 也可以切换到
  • 特征筛选2——皮尔逊相关系数筛选(单变量筛选)

    值域为正负1之间 用来筛查单变量与预测结果之间的相关关系 一般来讲 绝对值在0 0 1之间 无关 绝对值在0 1 0 3之间 弱相关关系 绝对值在0 3 0 6之间 存在相关关系 绝对值在0 6 0 9之间 强相关关系 绝对值大于0 9 几
  • BGP边界网关协议相关内容

    BGP 边界网关协议 AS 自治系统 由单一机构组织管理的一系列IP网络及其设备的集合 原因 1网络范围太大 协议跑不过来 需要划分 2自治管理 为了区分不同的AS 我们给每个自治系统设计一个编号 AS号 16位二进制构成 0 65535
  • Textiew常用设置

    Textiew常用设置 Android autoLink设置是否当文本为URL链接 email 电话号码 map时 文本显示为可点击的链接 可选值 none web email phone map all android autoText如
  • C#基础知识整理四

    上一个知识点整理 已经整理到了结构体方面了 顺便把访问修饰符也一起整理了 今天继续向下整理知识点 今天来了解一下类和面向对象的知识 1 类 什么是类 简单来说就是分类 他是描述具有相同特征 属性 和行为 方法 的抽象就是类 他是用关键字cl
  • Redis中的Hash

    1 前言 本篇博客将介绍Redis中五大类型之一的Hash类型及一些其常用命令 Reids中的Hash是一个键值对类型的集合 类似于Java里面的Map
  • 参数估计

    百度百科解释 参数估计 parameter estimation 是根据从总体中抽取的样本估计总体分布中包含的未知参数的方法 人们常常需要根据手中的数据 分析或推断数据反映的本质规律 即根据样本数据如何选择统计量去推断总体的分布或数字特征等
  • VirtualBox没有64位选项,无法安装64位的解决方法 感谢源作者

    昨天碰到了一个让我惊奇的问题 以前从来没有碰到过 早些年一直用 VM 都是草民肯定买不起VM授权 都是各种的破解版 对吧 这几年vm的破解几乎没有了 反正也不怎么好用就一直用VirtualBox 但是昨天竟然没有办法现在64系统 如果不能安
  • JS使用showModalDialog弹出窗口获得弹出窗口设定的值

    父窗口 hello html
  • ES 搜索3 (查找多个精确值)

    查找多个精确值 term 查询对于查找单个值非常有用 但通常我们可能想搜索多个值 如果我们想要查找价格字段值为 20 或 30 的文档该如何处理呢 不需要使用多个 term 查询 我们只要用单个 terms 查询 注意末尾的 s terms
  • ORACLE数据库怎样查看当前的SID

    方法一 echo ORACLE SID 方法二 select from V database
  • 【自用】深度学习工作站安装ubuntu 18.04 LTS系统

    工作站配置 自己组装的 主板 华硕Z790P PCIE插槽间距大 可以装双显卡 CPU i5 13600KF 内存 32 G 显卡 GTX 2080 Ti 魔改版 22G 存储 1T SSD 8T机械硬盘 系统 ubuntu 18 04 L
  • storm教程(三):用Java开发storm

    1 操作模式 开始之前 有必要了解一下Storm的操作模式 有下面两种方式 本地模式 在本地模式下 Storm拓扑结构运行在本地计算机的单一JVM进程上 这个模式用于开发 测试以及调试 因为这是观察所有组件如何协同工作的最简单方法 在这种模