storm集成kafka简单使用示例2

2023-11-20

StormKafkaTopo.java

package stormUse.stormUse;

import java.util.Properties;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

public class StormKafkaTopo 
{

    public static void main(String[] args) throws Exception 
    { 
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("192.168.153.233:2181");
        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "test", "/test" , "kafkaspout");

        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();  

        //set producer properties.
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.153.233:9092");
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaBolt<String, String> bolt = new KafkaBolt<String, String>()
                .withProducerProperties(props)
                .withTopicSelector(new DefaultTopicSelector("topic2"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<String, String>());


        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());  
        TopologyBuilder builder = new TopologyBuilder();   
        builder.setSpout("spout", new KafkaSpout(spoutConfig));  
        builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); 
        //builder.setBolt("kafkabolt", new KafkaBolt<String, Integer>()).shuffleGrouping("bolt");
        builder.setBolt("kafkabolt", bolt).shuffleGrouping("bolt");

        if (args != null && args.length > 0) 
        {  
            conf.setNumWorkers(3);  
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
        } else 
        {  
            LocalCluster cluster = new LocalCluster();  
            cluster.submitTopology("Topo", conf, builder.createTopology());  
            Utils.sleep(100000);  
            cluster.killTopology("Topo");  
            cluster.shutdown();  
        }  
    }
}

SenqueceBolt.java

package stormUse.stormUse;

import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class SenqueceBolt extends BaseBasicBolt
{

    public void execute(Tuple input, BasicOutputCollector collector) 
    {
        // TODO Auto-generated method stub
         String word = (String) input.getValue(0);  
         String out = "I'm " + word +  "!";  
         System.out.println("out=" + out);
         collector.emit(new Values(out));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    {
        declarer.declare(new Fields("message"));
    }
}

MessageScheme.java

package stormUse.stormUse;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class MessageScheme implements Scheme {

     public List<Object> deserialize(ByteBuffer ser) 
     {

         Charset charset = null;  
         CharsetDecoder decoder = null;  
         CharBuffer charBuffer = null;  

         try 
         {
             charset = Charset.forName("UTF-8");  
             decoder = charset.newDecoder();  
             charBuffer = decoder.decode(ser.asReadOnlyBuffer());  
             String msg = charBuffer.toString(); 
             return new Values(msg);

         } catch (CharacterCodingException e) 
         {  

          }
            return null;
     }

     public Fields getOutputFields() {
            return new Fields("msg");  
        }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

storm集成kafka简单使用示例2 的相关文章

  • scikit-learn 分类器的小批量训练,我提供小批量

    我有一个非常大的数据集 无法加载到内存中 我想使用这个数据集作为 scikit learn 分类器的训练集 例如LogisticRegression 是否有可能在我提供小批量的情况下对 scikit learn 分类器进行小批量训练 我相信
  • SparkR 作业 100 分钟超时

    我编写了一个有点复杂的sparkR脚本并使用spark submit运行它 脚本基本上做的是逐行读取基于 hive impala parquet 的大表 并生成具有相同行数的新 parquet 文件 但似乎工作在大约 100 分钟后停止 这
  • 实施 MySQL NDB Cluster 有哪些限制?

    我想为 MySQL Cluster 6 实现 NDB Cluster 我想为至少有 200 万条记录的非常庞大的数据结构执行此操作 我想知道实施 NDB cluster 是否有任何限制 例如 RAM 大小 数据库数量或 NDB 集群的数据库
  • 外部混洗:从内存中混洗大量数据

    我正在寻找一种方法来整理内存不适合的大量数据 大约 40GB 我有大约 3000 万个可变长度的条目 存储在一个大文件中 我知道该文件中每个条目的开始和结束位置 我需要对内存中不适合的数据进行洗牌 我想到的唯一解决方案是对包含以下数字的数组
  • 如何将数据从一个HDFS复制到另一个HDFS?

    我有两个 HDFS 设置 想要将一些表从 HDFS1 复制 而不是迁移或移动 到 HDFS2 如何将数据从一个HDFS复制到另一个HDFS 是否可以通过 Sqoop 或其他命令行 DistCp 分布式复制 是用于在集群之间复制数据的工具 它
  • Matlab 中大型非稀疏矩阵的高效运算

    我需要在 Matlab 中操作大的 3 维非稀疏矩阵 使用纯矢量化会带来较长的计算时间 所以 我尝试将操作分成10个块 然后解析结果 当我看到纯矢量化不能很好地缩放数据大小时 我感到很惊讶 如下图所示 我提供了这两种方法的示例 Parame
  • 如何使用R将年度数据转换为月度数据?

    我有2000年至2015年15年的逐年GDP数据 我想将这些数据转换为月度数据 其中只有月份和年份 我只想将当年的值复制到所有月份 我怎样才能在 R 中做到这一点2010 年的值是 1708 我想为 2010 年的所有月份复制相同的值 我的
  • Flume的Spool Dir可以在远程机器上吗?

    每当新文件到达特定文件夹时 我就尝试将文件从远程计算机获取到我的 hdfs 我在flume中遇到了spool dir的概念 如果spool dir位于运行flume代理的同一台机器上 那么它工作得很好 有什么方法可以在远程计算机中配置假脱机
  • 如何从 HIVE 中的日期减去月份

    我正在寻找一种方法来帮助我从 HIVE 中的日期中减去月份 我有个约会2015 02 01 现在我需要从这个日期减去 2 个月 这样结果应该是2014 12 01 你们能帮我一下吗 select add months 2015 02 01
  • Hive alter table 更改列名称为重命名的列提供“NULL”

    我曾尝试将表中的现有列重命名为新列 但名称更改后 新列只给我 NULL 值 Parquet 中表的存储格式 例如 user 是 Test 表中字符串数据类型的列 插入了值为 John 的示例记录 Select user from Test
  • 使用本地密钥 MONGODB 启用数据加密时出错

    我已成功加密 mongoDB 中的通信 但是当我尝试启用数据加密时出现错误 我使用的是 mongoDB 企业版 版本为 3 2 4 我在控制台中收到以下消息 ERROR child process failed exited with er
  • 如何将非分区表转换为分区表

    如何使用 StandardSQL 或 LegacySQL 重命名 BigQuery 中的表 以便对之前未分区的表进行分区 我正在尝试使用 StandardSQL 但出现以下错误 重命名表dataset old table name TO d
  • Pig - 如何迭代一袋地图

    让我解释一下这个问题 我有这行代码 u FOREACH persons GENERATE FLATTEN 0 experiences as j dump u 产生以下输出 id 1 date begin 12 2012 descriptio
  • 使用 Kinesis Analytics 构建实时会话

    是否有某个地方的示例 或者有人可以解释如何使用 Kinesis Analytics 构建实时会话 即会话化 这里提到这可能 https aws amazon com blogs aws amazon kinesis analytics pr
  • Postgresql - 在大数据库中使用数组的性能

    假设我们有一个包含 600 万条记录的表 有 16 个整数列和少量文本列 它是只读表 因此每个整数列都有一个索引 每条记录大约 50 60 字节 表名称为 项目 服务器为 12 GB RAM 1 5 TB SATA 4 核 所有 postg
  • PySpark NoSuchMethodError:将数据插入数据库时​​sun.nio.ch.DirectBuffer.cleaner

    我在尝试将大型数据帧插入 Postgres 时收到此错误 NoSuchMethodError sun nio ch DirectBuffer cleaner 这是一个完整的错误 之前有很多操作 所以没有理由将它们附加到问题中 您能否给一些建
  • 了解 Azure 事件中心分区使用者模式

    Azure 事件中心使用分区使用者模式中描述的docs https learn microsoft com en us azure event hubs event hubs features 当涉及到现实世界场景时 我在理解该模型的消费者
  • 在 Flink 流中使用静态 DataSet 丰富 DataStream

    我正在编写一个 Flink 流程序 其中我需要使用一些静态数据集 信息库 IB 来丰富用户事件的数据流 对于例如假设我们有一个买家的静态数据集 并且有一个传入的事件点击流 对于每个事件 我们希望添加一个布尔标志来指示事件的执行者是否是买家
  • HRegionServer 显示“错误告诉主机我们已经启动”。显示套接字异常:参数无效

    我正在尝试在 3 台 centos 机器上创建一个 hbase 集群 Hadoop v 2 8 0 已启动并在我配置的 HBase v 1 2 5 上运行 Hbase 启动正常 它启动了 HMaster 和区域服务器 但仍然在区域服务器和
  • 使用 mapWithState Spark Streaming 过滤部分重复项

    我们有一个DStream 比如 val ssc new StreamingContext sc Seconds 1 val kS KafkaUtils createDirectStream String TMapRecord ssc Pre

随机推荐

  • 为什么越来越多的 IT 人考软考?

    近几年随着国家计算机与软件技术的发展 每年报名参加软考考试的人也越来越多 据工信部新闻发布会消息 计算机软件与通信专业技术人员职业资格考试累计报考人数超过485万 2022年报考人数129万人 01 为什么越来越多的IT人考软考证书 1 软
  • 【精品示例】超实用Python爬虫入门实例——做一个优质舔狗

    引言 最近发现了一个有意思的网站 里面充斥了大量的舔狗箴言 作为一个爬虫发烧友怎么能错过此等机会 咱们直接就是上才艺 类的编写 本次爬虫使用了多协程的方案进行 保证了爬虫的速度 在这里我们新建一个爬虫类 并在里边添加上我们需要的方法 网页的
  • IDEA打包上传到阿里云私服

    上传阿里云私服报错 ERROR Failed to execute goal org apache maven plugins maven deploy plugin 2 8 2 deploy default deploy on proje
  • 通讯录系统图形化界面(C++,Qt5.12)(Visual Studio2019,QtCreator)(初学)

    目录 无用的前言 无用的话 无需用看 前言 一 开发工具 二 功能演示以及 源码和安装包 下载 三 功能介绍以及设计思路 四 代码具体实现 项目文件结构 main cpp mainwindow ui mainwindow h mainwin
  • 2.前端笔记-CSS-字体属性

    1 字体系列 CSS使用font family属性定义文本的字体系列 body font family 思源黑体 Microsoft YaHei 建议 使用英文写字体的属性值 尽量使用系统默认自带字体 保证在任何用户的浏览器都可以显示 微软
  • react 入坑学习(十四)混合菜单新模式(ANT ProLayout)

    混合菜单新模式 样例 Ant Design Pro Blog 文档 这个明显就比非混合的好看很多 今天就来试试改一改吧 现在官网中找到ProLayout 就可以找到这个混合模式的源码样例 import React from react im
  • css实现文本超出显示省略号

    一 普通情况下 1 固定width 2 overflow hidden 3 text overflow ellipsis 显示为省略号 4 white space nowrap 不换行 二 table表格里 td 设置上面的4步 table
  • Selenium 之订制启动Chrome的选项(Options)

    使用 selenium 时 我们可能需要对 chrome 做一些特殊的设置 以完成我们期望的浏览器行为 比如阻止图片加载 阻止JavaScript执行 等动作 这些需要 selenium的 ChromeOptions 来帮助我们完成 1 什
  • 3.Open3D教程——点云数据操作

    点云数据 本教程阐述了基本的点云用法 随需要的文件链接 1 显示点云 import open3d as o3d import numpy as np print Load a ply point cloud print it and ren
  • ESDA in PySal (3):Geosilhouettes:集群拟合的地理测量

    ESDA in PySal 3 Geosilhouettes 集群拟合的地理测量 Silhouette statistics Rousseeuw 1987 是观测值与给定聚类的拟合优度的非参数度量 在聚类具有 地理 解释的情况下 例如当它们
  • 【Linux】进程优先级,环境变量,进程地址空间

    文章目录 1 进程优先级 基本概念 查看系统进程 PRI and NI PRI vs NI 修改进程优先级的命令 其他概念 2 环境变量 基本概念 查看环境变量方法 常见环境变量 测试PATH 环境变量相关的命令 环境变量的组织方式 通过代
  • 心理学的166个现象---之六

    101 拍球效应 拍篮球时 用的力越大 篮球就跳得越高 对学生的期望值越高 学生潜能的发挥就越充分 优秀的老师总是尽可能地信任学生 不断鼓励学生 而批评则尽可能委婉 不使矛盾激化 102 旁观者效应 1993年 四川达竹矿务局一名高考超过录
  • pytorch模型训练的若干问题

    1 Net input 调用的是什么函数 为什么直接写对象名就直接调用函数了 net是创建的vgg类的对象 vgg类继承于pytorch库中类nn Module 创建类时的括号里写上父类的名字 就是继承的意思 在pytorch库中nn Mo
  • QTableWidget 设置表头颜色

    QTableWidget 设置表头颜色 方法1 setStyleSheet QHeaderView section background color qlineargradient x1 0 y1 0 x2 0 y2 1 stop 0 00
  • android sdk自带的fragment标签使用

    项目开发中要用到 下面四个大分类 上面三个小分类的情况 大分类采用viewPage 小分类 使用了sdk自带的
  • 制造业软件体系结构与互联网的差异

    本人自毕业已经13年 虽然热爱计算机 但是由于种种原因 一直在东莞的工厂混迹 感受着互联网的大潮 也不免有几分失落 伴随这去年 今年大厂裁人 许多被逼无路的程序员开始跳槽制造业 浓浓的Java气息来了 在此不免吐槽一句 请不要把写互联网程序
  • ESP32-PICO-D4下载程序出现 rst:0x10 (RTCWDT_RTC_RESET),boot:0x13 (SPI_FAST_FLASH_BOOT) flash read err, 1000

    备注 是我自己记录用的 有问题可以交流 用的Visual Studio Code Arduino platformio开发 最近现在在搞物联网 发现ESP32这款芯片容易上手 而且功能强大 买的开发板用起来很顺手 于是我就自己从立创开源上找
  • 解决cannot be cast to class jakarta.servlet.Servlet问题

    我的Tomcat版本是10 0 5 这个问题的主要原因是因为 10版本的Tomcat的servlet包变化了 解决问题方法 IDEA选择这个直接完美解决 IDEA选择这个直接完美解决 IDEA选择这个直接完美解决 1下载对应的包并且导入 下
  • Prim算法解决修路问题

    普里姆算法 Prim算法 图论中的一种算法 可在加权连通图里搜索最小生成树 意即由此算法搜索到的边子集所构成的树中 不但包括了连通图里的所有顶点 英语 Vertex graph theory 且其所有边的权值之和亦为最小 普里姆算法和Kru
  • storm集成kafka简单使用示例2

    StormKafkaTopo java package stormUse stormUse import java util Properties import org apache storm Config import org apac