springboot-starter如何整合阿里云datahub呢?

2023-05-16

转自:

springboot-starter如何整合阿里云datahub呢?

下文笔者讲述springboot整合datahub的方法分享,如下所示

Datahub简介说明


DataHub的功能:
    1.与大数据解决方案中Kafka具有相同的角色
      同时还提供数据队列功能
    2.DataHub还可与阿里云其它上下游产品对接
	  其一个交换的功能,称之为数据交换
  

DataHub 简介


datahub对外提供开发者生产和消费的sdk
 在springboot中,我们也可用使用自定义starter的方式加载sdk
  

实现思路:
    1.引入相应的starter器
    2.application.yml中加入相应的配置信息
    3.编写相应的代码
  

引入相应的starter器


<dependency>
      <artifactId>cry-starters-projects</artifactId>
      <groupId>cn.com.cry.starters</groupId>
      <version>2022-1.0.0</version>
</dependency>
  

启动客户端

配置阿里云DataHub的endpoint以及AK信息


aliyun:
  datahub:
  	# 开启功能
  	havingValue: true
    #是否为私有云
    isPrivate: false
    accessId: xxx
    accessKey: xxx
    endpoint: xxx
    #连接DataHub客户端超时时间
    conn-timeout: 10000
  

获取DataHub客户端


DatahubClient datahubClient=DataHubTemplate.getDataHubClient();
  

写数据


public int write(@RequestParam("id") Integer shardId) {
    List<Student> datas = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        Student s = new Student();
        s.setAge(i);
        s.setName("name-" + i);
        s.setAddress("address-" + i);
        datas.add(s);
    }
    int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
    return successNumbers;
}
  

 上述代码说明:
      projectName为my_test
      topicName为student
      shardId 为N的hub里写数据
      且返回插入成功的条数
  

读数据

读数据开发的逻辑类似RabbitMq的starter
使用@DataHubListener和@DataHubHandler处理器注解进行使用


@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {

    @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
    public void handler(Message message) {
        System.out.println("读取到shardId=0的消息");
        System.out.println(message.getData());
        System.out.println(message.getCreateTsime());
        System.out.println(message.getSize());
        System.out.println(message.getConfig());
        System.out.println(message.getMessageId());
    }
}
   

以上代码
    通过LATEST游标的方式
    监听 project=my_test
         topicName=student
         shardId=0
     最终使用Message的包装类
	 获取dataHub实时写入的数据 
此处可设置多种游标类型
  例:根据最新的系统时间、最早录入的序号等
  

核心代码

需一个DataHubClient增强类
在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId
根据游标规则来读取当前的cursor进行数据的读取


public class DataHubClientWrapper implements InitializingBean, DisposableBean {

    @Autowired
    private AliyunAccountProperties properties;

    @Autowired
    private ApplicationContext context;

    private DatahubClient datahubClient;


    public DataHubClientWrapper() {

    }

    /**
     * 执行销毁方法
     *
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        WorkerResourceExecutor.shutdown();
    }

    @Override
    public void afterPropertiesSet() throws Exception {

        /**
         * 创建DataHubClient
         */
        this.datahubClient = DataHubClientFactory.create(properties);

        /**
         * 打印Banner
         */
        BannerUtil.printBanner();

        /**
         * 赋值Template的静态对象dataHubClient
         */
        DataHubTemplate.setDataHubClient(datahubClient);

        /**
         * 初始化Worker线程
         */
        WorkerResourceExecutor.initWorkerResource(context);
        /**
         * 启动Worker线程
         */
        WorkerResourceExecutor.start();
    }
}
 
//写数据
//构建了一个类似RedisDataTemplate的模板类
//封装了write的逻辑
//调用时只需要用DataHubTemplate.write调用

public class DataHubTemplate {
    private static DatahubClient dataHubClient;
    private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);

    /**
     * 默认不开启重试机制
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @return
     */
    public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {
        return write(projectName, topicName, datas, shardId, false);
    }

    /**
     * 往指定的projectName以及topic和shard下面写数据
     *
     * @param projectName
     * @param topicName
     * @param datas
     * @param shardId
     * @param retry
     * @return
     */
    private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {
        RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
        List<RecordEntry> recordEntries = new ArrayList<>();
        for (Object o : datas) {
            RecordEntry entry = new RecordEntry();
            Map<String, Object> data = BeanUtil.beanToMap(o);
            TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
            for (String key : data.keySet()) {
                tupleRecordData.setField(key, data.get(key));
            }
            entry.setRecordData(tupleRecordData);
            entry.setShardId(String.valueOf(shardId));
            recordEntries.add(entry);
        }
        PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
        int failedRecordCount = result.getFailedRecordCount();
        if (failedRecordCount > 0 && retry) {
            retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
        }
        return datas.size() - failedRecordCount;
    }

    /**
     * @param client
     * @param records
     * @param retryTimes
     * @param project
     * @param topic
     */
    private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
        boolean suc = false;
        List<RecordEntry> failedRecords = records;
        while (retryTimes != 0) {
            logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
            retryTimes = retryTimes - 1;
            PutRecordsResult result = client.putRecords(project, topic, failedRecords);
            int failedNum = result.getFailedRecordCount();
            if (failedNum > 0) {
                failedRecords = result.getFailedRecords();
                continue;
            }
            suc = true;
            break;
        }
        if (!suc) {
            logger.error("DataHub send message retry failure");
        }
    }

    public static DatahubClient getDataHubClient() {
        return dataHubClient;
    }

    public static void setDataHubClient(DatahubClient dataHubClient) {
        DataHubTemplate.dataHubClient = dataHubClient;
    }
}

//读数据
//需要在Spring启动时开启一个监听线程DataListenerWorkerThread
//执行一个死循环不停轮询DataHub下的对应通道

public class DataListenerWorkerThread extends Thread {
    private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
    private volatile boolean init = false;
    private DatahubConfig config;
    private String workerKey;
    private int recordLimits;
    private int sleep;
    private RecordSchema recordSchema;
    private RecordHandler recordHandler;
    private CursorHandler cursorHandler;

    public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
        this.config = new DatahubConfig(projectName, topicName, shardId);
        this.workerKey = projectName + "-" + topicName + "-" + shardId;
        this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
        this.recordLimits = recordLimits;
        this.sleep = sleep;
        this.setName("DataHub-Worker");
        this.setDaemon(true);
    }

    @Override
    public void run() {
        initRecordSchema();
        String cursor = cursorHandler.positioningCursor(config);
        for (; ; ) {
            try {
                GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
                if (result.getRecordCount() <= 0) {
                    // 无数据,sleep后读取
                    Thread.sleep(sleep);
                    continue;
                }
                List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());
                logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
                // 拿到下一个游标
                cursor = cursorHandler.nextCursor(result);
                //执行方法
                WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
            } catch (InvalidParameterException ex) {
                //非法游标或游标已过期,建议重新定位后开始消费
                cursor = cursorHandler.resetCursor(config);
                logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
            } catch (DatahubClientException e) {
                logger.error("DataHubException:{}", e.getErrorMessage());
                this.interrupt();
            } catch (InterruptedException e) {
                logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
            } catch (Exception e) {
                this.interrupt();
                logger.error("receive DataHub records cry.exception:{}", e, e);
            }
        }
    }

    /**
     * 终止
     */
    public void shutdown() {
        if (!interrupted()) {
            interrupt();
        }
    }

    /**
     * 初始化topic字段以及recordSchema
     */
    private void initRecordSchema() {
        try {
            if (!init) {
                recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
                List<Field> fields = recordSchema.getFields();
                this.recordHandler = new RecordHandler(fields);
                init = true;
            }
        } catch (Exception e) {
            logger.error("initRecordSchema error:{}", e, e);
        }
    }
}

//read的时候结合了注解开发
//通过定义类注解DataHubListener和方法注解DataHubHandler内置属性
//来动态的控制需要在哪些方法中处理监听到的数据的逻辑:

DataHubHandler

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {
    /**
     * 话题名称
     *
     * @return
     */
    String topicName();

    /**
     * shardId
     *
     * @return
     */
    int shardId();

    /**
     * 最大数据量限制
     *
     * @return
     */
    int recordLimit() default 1000;

    /**
     * 游标类型
     *
     * @return
     */
    CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;

    /**
     * 若未监听到数据添加,休眠时间 ms
     *
     * @return
     */
    int sleep() default 10000;

    /**
     * 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量
     *
     * @return
     */
    String startTime() default "";

    /**
     * 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数
     *
     * @return
     */
    int sequenceOffset() default 0;
}

 
DataHubListener

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {
    String projectName();
}
 
//启动SpringBootStarter的EnableConfigurationProperties注解
//使用配置文件来控制default-bean的开启或关闭

启动类
@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {
    /**
     * 初始化dataHub装饰bean
     *
     * @return
     */
    @Bean
    public DataHubClientWrapper dataHubWrapper() {
        return new DataHubClientWrapper();
    }

}
 
//属性配置类
@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{

    /**
     * http://xxx.aliyuncs.com
     */
    private String endpoint;

    /**
     * account
     */
    private String accessId;

    /**
     * password
     */
    private String accessKey;

    /**
     * private cloud || public cloud
     */
    private boolean isPrivate;

    /**
     * unit: ms
     */
    private Integer connTimeout = 10000;
} 
  

最后记得要做成一个starter
在resources下新建一个META-INF文件夹
新建一个spring.factories文件


org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
  cry.starter.datahub.DataHubClientAutoConfiguration
   
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

springboot-starter如何整合阿里云datahub呢? 的相关文章

  • Git使用

    安装完成后 xff0c 打开Git Bash 配置用户信息 xff0c 因为git为分布式 xff0c 需要知道你是谁 git config global user name 34 nnnn 34 gir config global use
  • easyui 报错

    Uncaught SyntaxError Unexpected token lt 可能的原因是js路径引入错误 xff0c 之前遇到过这种情况 xff0c 这次文件换了路径后 xff0c 忘记更改 xff0c 导致再次 出现这个问题 xff
  • list清空

    list clear 其它引用该list的值也会被清空list 61 null 我是在for里用的 xff0c 第二次循环时 xff0c 会引起空指针异常 xff0c 具体原因不清楚每次重新 new ArrayList 来清空
  • easyui sidemenu侧边栏点击事件

    lt DOCTYPE html gt lt html lang 61 34 en 34 gt lt head gt lt meta charset 61 34 UTF 8 34 gt lt title gt My Layout lt tit
  • VR发展前景展望

    VR发展前景展望 引子 随着计算机技术的飞速发展 xff0c 虚拟现实也在短时间内经历了萌芽探索到飞速发展完善的转变 由于其独特的沉浸式体验 xff0c VR的前景被大多数人看好 xff0c 更多的VR相关技术也在为让人能更完美的融合到这个
  • map与jsonObject

    想要的数据格式为 xff1a 34 第一章 34 34 1 第一节 34 34 sectionId 34 1 34 1 第二节 34 sectionId 34 2 34 第二章 34 34 2 第一节 34 34 sectionId 34
  • net.sf.json.JSONException: Object is null

    出现这个错误的原因是net sf json JSONArray或JSONObject转换时 xff0c 对象内包含另一个对象 xff0c 而该被包含的对象为NULL xff0c 所以抛出异常 一种解决办法是将json字符串里的null去掉
  • fastjson根据key去除某一项元素

    数据如下所示 xff1a 34 chapterId 34 1 34 sectionsName 34 34 数据结构简介 34 34 id 34 1 34 contents 34 34 ssssss 34 这里为一个sections对象 34
  • HTML引用外部json文件

    我是在hubuilder里试的 xff0c 先是新建data json文件 里面写上内容 其中var dataFromJSON是为了方便引用 var dataFromJSON 61 34 text 34 34 外部json 34 34 ic
  • idea使用json数据格式化

    Reformat Code 可以将json数据格式化 xff0c 看起来更整洁 xff0c 快捷键是CTRL 43 ALT 43 L但是用起来没反应 xff0c 应该是快捷键冲突 xff0c 然后修改快捷键 关于修改快捷键教程网上有很多
  • easyui layout布局内容显示不全

    设置fit属性为false即可
  • 使用ObjectMapper将map写出到json文件

    64 Test public void ttt String filePath 61 34 src main webapp courseData factorNameAndId json 34 Map lt String Object gt
  • 两次点击会出现undefine

    原因是将拿到的html元素直接插入到了一个新地方 xff0c 导致原有的元素消失 xff0c 所以第二次时获取不到 可以用这种方法 xff0c 获得复制的节点 var clonedNode 61 sourceNode cloneNode t
  • layui里 同级兄弟layer里的iframe方法调用

    比如我们有一个页面A xff0c 然后页面B和C是其子页面 xff0c 在页面A中打开B页面 xff0c 然后咋B中用parent layer open xff0c 打开C页面 xff0c 这样B和C就是A页面下的同级 xff08 兄弟 x
  • xshell远程连接虚拟机缓慢最后失败,(ssh,回环适配)

    https www cnblogs com areyouready p 10134771 html
  • 手机启动镜像boot.img的解包(压)(用split_bootimg.pl)和问题gzip: boot.img-ramdisk.gz: not in gzip format解决

    下载工具split bootimg pl 此脚本在github很多 如gist github com amartinz 84c7ebc64f126bd6b3a8 用split bootimg pl解包boot img 命令 xff1a sp
  • pandas使用json.loads()将列的值转为字典时遇到的问题(lambda)

    会报错 xff1a AttributeError 39 list 39 object has no attribute 39 values 39 formatter 61 lambda x list json loads x values
  • 分布式 ID生成方案

    https www cnblogs com haoxinyue p 5208136 html
  • 过拟合概念

    Interpretation You can see the cost decreasing It shows that the parameters are being learned However you see that you c
  • 【整理】离散数学在计算机学科中的应用

    离散数学在计算机学科中的应用 离散数学是计算机学科中许多专业课程的先行课程 xff0c 离散数学和后续课程的关系密切 xff0c 它是计算机科学与技术应用与研究的有力工具 xff0c 在计算机科学中应用非常广泛 离散数学是计算机科学与技术专

随机推荐

  • 嵌入式Linux开发板移植SSH

    SSH服务可以很方便的通过网络登录到Linux开发板 xff0c 同时支持SFTP协议向开发板传输文件 下面简单讲下移植过程 开发板环境 xff1a 名称 xff1a imx283内核 xff1a Linux2 6 35 3 1 下载源码
  • ue4_AI_BT

    Rotate to face BB entry 大家在编写行为树的时候 xff0c 经常会遇到需求就是要把操作目标转向某人 xff0c 面对某人 xff0c UE4的行为树中 xff0c 为我们提供了节点Rotate to face BB
  • 阿里云 RHSA 漏洞提醒修复方法教程详解

    第一类 xff1a 一 收到阿里云RHSA漏洞提醒通知 二 点击进入查看漏洞明细 三 查看某一项详情 xff0c 注意标红线的地方 xff0c 就是需要更新的包名 四 组织修复命令 xff0c 使用yum upgrade 命令 yum up
  • 搭建一个简单的cms前端框架

    明确一点 xff0c 我们的公司的项目是要求前后端分离 当然首选是node这块 xff0c 警告node很多强大的功能 xff0c 自己了解的不深 xff0c 不过慢慢学习中 好了 xff0c 接下来讲一下我的后端项目的前端部分的搭建过程
  • opencv学习笔记六十六:FisherFace人脸识别算法

    简要 FisherFace是基于LDA降维的人脸识别算法 由Ronald Fisher最早提出 故以此为名 它和PCA类似 都是将原始数据映射到低维空间 但和PCA最大的区别就是它考虑了降维后数据的类间方差和类内方差 使得降维后的数据类间方
  • Docker 安装 Redis 容器 (完整版)

    Docker如果想安装软件 必须先到 Docker 镜像仓库下载镜像 Docker 镜像仓库 Docker 安装Redis 1 寻找Redis镜像 在Docker镜像仓库寻找Redis镜像 Docker 下载Redis镜像的命令 2 下载R
  • 阿里巴巴校招笔试题型攻略

    题型分布 阿里巴巴应届生校园招聘线上测评包括三部分通用能力测试 xff1a 言语理解 资料分析 图形推理 xff1b 每道题单独限时 xff0c 建议多刷真题 xff0c 提高正确率 amp 效率 xff0c 尤其资料分析 xff0c 难度
  • Java中如何判断文件或文件夹是否存在(File.exists)呢?

    下文笔者讲述检测文件或文件夹是否存在的方法分享 xff0c 如下所示 实现思路 xff1a 使用file exists 方法即可检测file对象是否为一个有效的路径或文件夹 exists语法 public boolean exists 返回
  • java如何实现base64和图片互相转换呢?

    下文笔者讲述base64和图片互相转换的方法分享 xff0c 如下所示 实现思路 笔者通过一个utils类实现base64和图片两者的互相转换 例 import java io FileInputStream import java io
  • Java删除字符串中指定字符的方法分享

    转自 xff1a Java如何删除字符串中指定字符呢 xff1f 字符串 字符串主要用于编程 xff0c 概念说明 函数解释 用法详述见正文 xff0c 这里补充一点 xff1a 字符串在存储上类似字符数组 xff0c 所以它每一位的单个元
  • Java中Collections.singleton方法起什么作用呢?

    转自 Java中Collections singleton方法起什么作用呢 xff1f 下文笔者讲述Collections singleton方法的功能简介说明 xff0c 如下所示 Collections singleton方法功能 返回
  • Java如何借助Stream流进行求和呢?

    转自 Java如何借助Stream流进行求和呢 xff1f list简介 集合就是把具有相同属性的东西放在一起 xff0c 也可以是容器 xff0c 把有关的东西都放进去 List是位于java util下的一个接口 xff0c 有序集合
  • Java JSON格式简介说明

    转自 Java JSON格式简介说明 下文笔者讲述java中JSON格式的相关简介说明 xff0c 如下所示 JSON简介说明 JSON JavaScript Object Notation 是一种轻量级的基于文本的数据交换格式 它采用完全
  • Java 如何使用Matcher.matches()匹配整个字符串呢?

    转自 Java 如何使用Matcher matches 匹配整个字符串呢 xff1f 下文讲述使用Matcher matches 匹配整个字符串的方法分享 xff0c 如下所示 Matcher matches 对整个字符串进行匹配 当整个字
  • Java中如何删除文件呢?

    转自 xff1a Java中如何删除文件呢 下文笔者讲述java中删除文件的方法分享 xff0c 如下所示 实现思路 借助File delete 方法即可实现文件删除操作 File delete语法 public boolean delet
  • Java中Random.nextInt()方法功能简介说明

    转自 Java中Random nextInt 方法功能简介说明 下文笔者将讲述Random nextInt 方法的功能简介说明 xff0c 如下所示 Random nextInt 方法功能 用于随机产生某个范围区间的整数 注意事项 Rand
  • Java中ArrayList之clone()方法的功能说明

    转自 Java中ArrayList之clone 方法的功能说明 下文讲述ArrayList中clone 方法的功能简介说明 xff0c 如下所示 ArrayList Clone 方法的功能简介 clone 方法的功能 用于复制一份Array
  • (附源码)计算机毕业设计SSM智慧小区团购系统

    项目运行 环境配置 xff1a Jdk1 8 43 Tomcat7 0 43 Mysql 43 HBuilderX xff08 Webstorm也行 xff09 43 Eclispe xff08 IntelliJ IDEA Eclispe
  • springboot如何使用soap协议的方式访问webService接口呢?

    转自 springboot如何使用soap协议的方式访问webService接口呢 xff1f 下文笔者讲述springboot采用soap协议的方式访问webService接口的方法分享 xff0c 如下所示 1 引入相应的jar包 lt
  • springboot-starter如何整合阿里云datahub呢?

    转自 springboot starter如何整合阿里云datahub呢 xff1f 下文笔者讲述springboot整合datahub的方法分享 如下所示 Datahub简介说明 DataHub的功能 xff1a 1 与大数据解决方案中K