Flink自定义HBaseSink类

2023-11-09

文章目录

HBaseCell类

package com.vic.flink.entity;

import lombok.Data;
import java.util.HashMap;

@Data
public class HBaseCell {
    private String tableName;
    private String rowKey; 
    private String cf; 
    private HashMap<String,String> kv; // column,value

    public HBaseCell(String tableName,String rowKey,String cf,HashMap<String,String> kv){
        this.tableName = tableName;
        this.rowKey = rowKey;
        this.cf = cf;
        this.kv = kv;
    }
}

HBaseSink类

package com.vic.flink.sink;

import com.vic.flink.entity.HBaseCell;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.HashMap;

/**
 * 自定义HbaseSink类
 */

public class HbaseSink extends RichSinkFunction<HBaseCell> {
    private static org.apache.hadoop.conf.Configuration configuration;
    private static Connection connection = null;
    private static BufferedMutator mutator;

    @Override
    public void open(Configuration parameters) throws Exception {
        org.apache.hadoop.conf.Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum","127.0.0.1,127.0.0.2,127.0.0.3");
        configuration.set("hbase.zookeeper.property.clientPort","2181");
        configuration.setInt("hbase.client.operation.timeout",30000);
        configuration.setInt("hbase.client.scanner.timeout.period",200000);
        connection = ConnectionFactory.createConnection(configuration);
    }

    @Override
    public void close() throws Exception {
        if (connection != null) connection.close();
    }

    @Override
    public void invoke(HBaseCell value, Context context) throws Exception {
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(value.getTableName()));
        params.writeBufferSize(1024 * 1024);
        mutator = connection.getBufferedMutator(params);
        Put put = new Put(Bytes.toBytes(value.getRowKey())); 
        HashMap<String, String> kv = value.getKv();
        for(String key:kv.keySet()){
            put.addColumn(Bytes.toBytes(value.getCf()), Bytes.toBytes(key), Bytes.toBytes(kv.get(key)));
        }
        mutator.mutate(put);
        mutator.flush();
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink自定义HBaseSink类 的相关文章

  • 如何使用 FileChannel 将一个文件的内容附加到另一个文件的末尾?

    File a txt好像 ABC File d txt好像 DEF 我正在尝试将 DEF 附加到 ABC 所以a txt好像 ABC DEF 我尝试过的方法总是完全覆盖第一个条目 所以我总是最终得到 DEF 这是我尝试过的两种方法 File
  • manifest.mf 文件的附加内容的约定?

    Java JAR 中的 MANIFEST MF 文件是否有任何超出 MANIFEST MF 约定的约定 JAR规范 http download oracle com javase 1 4 2 docs guide jar jar html
  • JNI 不满意链接错误

    我想创建一个简单的 JNI 层 我使用Visual studio 2008创建了一个dll Win 32控制台应用程序项目类型 带有DLL作为选项 当我调用本机方法时 出现此异常 Exception occurred during even
  • java.io.IOException: %1 不是有效的 Win32 应用程序

    我正在尝试对 XML 文档进行数字签名 为此我有两个选择 有一个由爱沙尼亚认证中心为程序员创建的库 还有一个由银行制作的运行 Java 代码的脚本 如果使用官方 认证中心 库 那么一切都会像魅力一样进行一些调整 但是当涉及到银行脚本时 它会
  • 如何在 Java 中禁用 System.out 以提高速度

    我正在用 Java 编写一个模拟重力的程序 其中有一堆日志语句 到 System out 我的程序运行速度非常慢 我认为日志记录可能是部分原因 有什么方法可以禁用 System out 以便我的程序在打印时不会变慢 或者我是否必须手动检查并
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • Java 页面爬行和解析之 Crawler4j 与 Jsoup

    我想获取页面的内容并提取其中的特定部分 据我所知 此类任务至少有两种解决方案 爬虫4j https github com yasserg crawler4j and Jsoup http jsoup org 它们都能够检索页面的内容并提取其
  • hibernate总是自己删除表中的所有数据

    您好 我正在开发一个 spring mvc 应用程序 它使用 hibernate 连接到存储文件的 mysql 数据库 我有两个方法 一个方法添加我选择的特定文件路径中的所有文件 另一种方法调用查询以返回从 mysql 存储的文件列表 问题
  • OnClick 事件中的 finish() 如何工作?

    我有一个Activity一键退出Activity 通过layout xml我必须设置OnClick事件至cmd exit调用 this finish 效果很好 public void cmd exit View editLayout thi
  • 无法理解 Java 地图条目集

    我正在看一个 java 刽子手游戏 https github com leleah EvilHangman blob master EvilHangman java https github com leleah EvilHangman b
  • Spring Data 与 Spring Data JPA 与 JdbcTemplate

    我有信心Spring Data and Spring Data JPA指的是相同的 但后来我在 youtube 上观看了一个关于他正在使用JdbcTemplate在那篇教程中 所以我在那里感到困惑 我想澄清一下两者之间有什么区别Spring
  • 制作java包

    我的 Java 类组织变得有点混乱 所以我要回顾一下我在 Java 学习中跳过的东西 类路径 我无法安静地将心爱的类编译到我为它们创建的包中 这是我的文件夹层次结构 com david Greet java greeter SayHello
  • 将多模块 Maven 项目导入 Eclipse 时出现问题 (STS 2.5.2)

    我刚刚花了最后一个小时查看 Stackoverflow com 上的线程 尝试将 Maven 项目导入到 Spring ToolSuite 2 5 2 中 Maven 项目有多个模块 当我使用 STS 中的 Import 向导导入项目时 所
  • Java中未绑定通配符泛型的用途和要点是什么?

    我不明白未绑定通配符泛型有什么用 具有上限的绑定通配符泛型 stuff for Object item stuff System out println item Since PrintStream println 可以处理所有引用类型 通
  • 当单元格内的 JComboBox 中有 ItemEvent 时,如何获取 CellRow

    我有一个 JTable 其中有一列包含 JComboBox 我有一个附加到 JComboBox 的 ItemListener 它会根据任何更改进行操作 但是 ItemListener 没有获取更改的 ComboBox 所在行的方法 当组合框
  • 运行 Jar 文件时出现问题

    我已将 java 项目编译成 Jar 文件 但运行它时遇到问题 当我跑步时 java jar myJar jar 我收到以下错误 Could not find the main class myClass 类文件不在 jar 的根目录中 因
  • Android JNI C 简单追加函数

    我想制作一个简单的函数 返回两个字符串的值 基本上 java public native String getAppendedString String name c jstring Java com example hellojni He
  • android Accessibility-service 突然停止触发事件

    我有一个 AccessibilityService 工作正常 但由于开发过程中的某些原因它停止工作 我似乎找不到这个原因 请看一下我的代码并告诉我为什么它不起作用 public class MyServicee extends Access
  • 休眠以持久保存日期

    有没有办法告诉 Hibernate java util Date 应该持久保存 我需要这个来解决 MySQL 中缺少的毫秒分辨率问题 您能想到这种方法有什么缺点吗 您可以自己创建字段long 或者使用自定义的UserType 实施后User
  • com.jcraft.jsch.JSchException:身份验证失败

    当我从本地磁盘上传文件到远程服务器时 出现这样的异常 com jcraft jsch JSchException Auth fail at org apache tools ant taskdefs optional ssh Scp exe

随机推荐

  • 搞一个release版本的aar包

    最近在做一个aar包给第三方使用 由于是第一次做这个aar包 在所有的功能代码完成需要打包的时候发现 坑开始出现了 于是各种百度 开始吧 首先你需要创建一个Android项目 然后创建一个android的library 下一步 剩下的就看你
  • JavaWeb.MVC购物车(第一部分)

    前言 这一篇我会使用servlet EL JSTL 三层架构写一个简单的购物车项目 内容比较多 这只是第一部分 只有登陆 首页数据显示和商品添加到购物车的功能 还有一部分功能的代码我会写在下一篇博客里 感兴趣的朋友们可以看一看 也希望大家可
  • Blender2.9基础七:外部插件篇

    一 插件安装 1 安装插件 2 插件显示位置 二 材质贴图相关插件 1 GrabDoc 贴图烘培插件 GrabDoc可以运行一键式场景设置 然后开始建模 对形状进行建模后 甚至在建模阶段 你可以实时预览材质在视口中的外观 主要特点 实时材质
  • unity3D 脚本中按键或鼠标输入响应函数

    简单地总结一下 unity中脚本实现输入 键盘和鼠标 的响应事件函数 不够完善 以后碰到再慢慢添加 键盘输入 1 Input GetKey up 按住键盘上键 2 Input GetKey KeyCode UpArrow 按住键盘上键 Ke
  • JS实现将数组中某个属性值相同的元素,放在一起

    function sortArr arrList str var arr 大数组 t 临时属性值相同数组 临时的变量 tmp if arrList length gt 0 tmp arrList 0 str 将相同类别的对象添加到同一个数组
  • [从零开始学习FPGA编程-44]:视野篇 - 集成电路助力数字化时代高质量发展-1-集成电路芯片主要形态

    目录 前言背景 什么是集成电路 什么是数字化时代
  • 为什么小程序预览时必须打开‘调试工具vconsole’才能正常运行?

    这是因为没有为小程序配置域名导致的 预览或者使用小程序体验版的时候 小程序会自动校验你是否配置了合法的域名 如果没有配置 还是使用的ip地址 这样就会造成一个现象 在开发工具上以及真机调试时 都能正常运行 但预览就不行 但只要在预览时 打开
  • c++如何使用yaml来进行配置

    c 如何使用yaml来进行配置 yaml的基本语法可以参考这个博客 https www cnblogs com sddai p 9626392 html yaml的使用也可以参考这个博客 https www it610 com articl
  • 基础算法题——迷宫(递推)

    迷宫 题目链接 解题思路 暴力法 利用 dfs 遍历每一条可能的路径 将遍历的权值和不断取余 不足 当 n m 取较大的情况下 所遍历的路径可能会暴增 出现超时的情况 递推法 从题目上我们可以发现 最终的权值和是要对 mod 取余的 利用这
  • 查询SQLSERVER执行过的SQL记录(历史查询记录)

    有的时候 需要知道近段时间SQLSERVER执行了什么语句 可以用下面的方法 SELECT TOP 1000 QS creation time SUBSTRING ST text QS statement start offset 2 1
  • Linux教程系列 pdf下载(鸟哥私房菜等)

    鸟哥的Linux私房菜 基础篇 第四版 pdf 下载 LINUX内核设计与实现 pdf 下载 Linux 操作系统 基础操作 教学 doc 下载 linux内核深入剖析基于0 11 pdf 下载 Linux系统命令及其使用详解 doc 下载
  • 静态变量与动态变量的区别

    目录 一 定义 1 变量与常量 2 局部变量 局部变量 定义在函数中的变量 3 全局变量 4 动态变量和静态变量 二 区别 1 局部变量与全局变量的对比 2 静态变量与动态变量 一 定义 1 变量与常量 变量 指的是在程序运行过程中 可以通
  • Linux 高级进程管理

    1 让出处理器 Linux提供一个系统调用运行进程主动让出执行权 sched yield 进程运行的好好的 为什么需要这个函数呢 有一种情况是用户空间线程的锁定 如果一个线程试图取得另一个线程所持有的锁 则新的线程应该让出处理器知道该锁变为
  • 动态sql MyBatis处理多对一,一对多映射关系

    MyBatis处理模糊查询 1 用 符代替 接参 避免 占位符被解析成 在字符串中无法接参 select from user where username like name 2 使用sql语句中字符串拼接的函数 select from u
  • 微信小程序背景图片设置问题

    我们都知道 用css给网页设置背景图片 可以导入网络图片和本地图片 1 网络图片 元素定位 background image url https timgsa baidu com timg image quality 80 size b99
  • CUDA error: CUBLAS_STATUS_EXECUTION_FAILED when calling ‘cublasSgemm’

    运行transformer模型是报错如题 1 减小batch size 原因是调用cublas函数时会生成句柄 占用一定的内存 确保剩余内存够使用 2 gpu驱动版本和cuda torch版本的匹配问题 低版本的gpu驱动 尝试换成11 0
  • 怎么上传本地项目或文件到SVN服务器

    实验需要将本地的文件上传到SVN的doc文件夹下 在桌面右击 TortoiseSVN gt Repo brower gt 输入你的仓库的url gt 输入用户姓名和密码 即可访问到svn 右键点击Add File即可添加要上传的文件 如下图
  • c++文件输入与输出

    基于流的文件IO 头文件 ofstream 写文件 ifstream 读文件 fstream 读写文件 using namespace std 打开文件 std ifstream fin xxx txt std ifstream fin f
  • 几个更优雅、更高效 Pythonic 代码写法!

    本文分享几个鲜为人知的 Pythonic 技巧 这些技巧非常有用 但并不广为人知 通过学习和使用这些技巧 可以帮你节省时间和精力 并使你的代码更加优雅和高效 1 三元运算符 三元运算符是 if else 语句的简写 语法是value if
  • Flink自定义HBaseSink类

    文章目录 HBaseCell类 HBaseSink类 HBaseCell类 package com vic flink entity import lombok Data import java util HashMap Data publ