flink-addSource和addSink分别是kafka、自定义数据、mysql、hbase的java实现

2023-11-19

flink主程序

public class FinkTest {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//摄入时间
        //env.enableCheckpointing(5000);
        //创建kafka-topic
        Properties p = LoadResourcesUtils.getProperties("kafka.properties");
        String inputTopic = p.getProperty("source.inputTopic");
        String outputTopic = p.getProperty("source.outputTopic");

        //kafka addSource
        DataStream<String> kafkaStream = env.addSource(KafkaStreamBuilder.kafkaConsumer(inputTopic));
        
        //kafka addSink
        kafkaStream.addSink(KafkaSink.KafkaProducer(driversTopicPattern));
        
        //mysql addSink
        kafkaStream.addSink(new OrderMySqlSink());
        
        //hbase addSink
       kafkaStream..addSink(new HbaseSink(configs.topicOut));
       
		//自定义 addSource
        DataStream<String> myStream = env.addSource(new MySource());

		//mysql addSource
        DataStream<String> driverStream = env.addSource(new MySqlSource());
        
        env.execute("my flink job");

    }
}

addSource(kafka)

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * @author liquan
 *
 *	构建KafkaStream
 *
 */
public class KafkaStreamBuilder {

	public static FlinkKafkaConsumer<String> kafkaConsumer(String topics) {
		Properties p = LoadResourcesUtils.getProperties("application.properties");
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers"));
		properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id"));
		properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset"));
		properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit"));
		properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");
//		String topics = consumerConfig.getTopics();
		List<String> topicsSet = new ArrayList<String>(Arrays.asList(topics.split(",")));
		FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topicsSet, new SimpleStringSchema(),
				properties);//test0是kafka中开启的topic
//		myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
		return myConsumer;
	}
}

addSink(kafka)

import com.shengekeji.simulator.serialization.OutSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class KafkaSink {

    public static FlinkKafkaProducer<String> KafkaProducer(String topics) {
        Properties p = LoadResourcesUtils.getProperties("application.properties");
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, p.getProperty("spring.kafka.bootstrap-servers"));
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, p.getProperty("spring.kafka.consumer.group-id"));
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, p.getProperty("spring.kafka.consumer.auto-offset-reset"));
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, p.getProperty("spring.kafka.consumer.enable-auto-commit"));
        properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RangeAssignor");

        return new FlinkKafkaProducer<>(topics, new OutSerializationSchema(), properties);
    }
}

addSink(mysql)

import com.alibaba.fastjson.JSONObject;
import com.shengekeji.simulator.dao.OrderDao;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.session.SqlSession;
import com.shengekeji.simulator.model.OrderModel;

public class OrderMySqlSink extends RichSinkFunction<String> {

    @Override
    public void invoke(String value, Context context) throws Exception {
        SqlSession sqlSession = null;
        try {
            OrderModel order= JSONObject.parseObject(value, OrderModel.class);
            sqlSession = MyBatisUtil.openSqlSession();
            // 通过SqlSession对象得到Mapper接口的一个代理对象
            // 需要传递的参数是Mapper接口的类型
            OrderDao dao = sqlSession.getMapper(OrderDao.class);
            System.err.println(order);
            dao.insert(order);
            sqlSession.commit();

        }catch (Exception e){
            e.printStackTrace();
            System.err.println(e.getMessage());
            sqlSession.rollback();

        }finally {

            if (sqlSession != null){
                sqlSession.close();
            }
        }
    }
}

注,数据入库时用的mybatis方式,MyBatisUtil,OrderDao,OrderModel根据自己环境自己定义

addSource(自定义)

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Properties;

public class MySource implements SourceFunction<String> {

    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;


    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while(this.isRunning) {
            Thread.sleep(6000);
            String order = getDriverData();
            sourceContext.collect(order);
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    //随机产生订单数据
    public String getDriverData() {
        Properties p = LoadResourcesUtils.getProperties("content.properties");

        String driverJson = p.getProperty("source.driverJson");

        String value = driverJson ;
        if(value.indexOf("%orderId") >= 0){
            value = value.replaceAll("%orderId",RandomUtil.getOrderId());
        }
        if(value.indexOf("%appId") >= 0){
            value = value.replaceAll("%appId",RandomUtil.getAppId());
        }
        if(value.indexOf("%serviceId") >= 0){
            value = value.replaceAll("%serviceId",RandomUtil.getServiceId());
        }
        if(value.indexOf("%passageId") >= 0){
            value = value.replaceAll("%passageId",RandomUtil.getPassageId());
        }
        if(value.indexOf("%driverId") >= 0){
            value = value.replaceAll("%driverId",RandomUtil.getDriverId());
        }
        if(value.indexOf("%startLoclatitude") >= 0){
            LngLat startLoc=RandomUtil.getCoordinate();
            value = value.replaceAll("%startLoclatitude",Double.toString(startLoc.latitude));
            value = value.replaceAll("%startLoclongitude",Double.toString(startLoc.longitude));
        }
        if(value.indexOf("%endLoclatitude") >= 0){
            LngLat endLoc=RandomUtil.getCoordinate();
            value = value.replaceAll("%endLoclatitude",Double.toString(endLoc.latitude));
            value = value.replaceAll("%endLoclongitude",Double.toString(endLoc.longitude));
        }
        if(value.indexOf("%loclatitude") >= 0){
            LngLat loc=RandomUtil.getCoordinate();
            value = value.replaceAll("%loclatitude",Double.toString(loc.latitude));
            value = value.replaceAll("%loclongitude",Double.toString(loc.longitude));
        }
        if(value.indexOf("%flag") >= 0){
            value = value.replaceAll("%flag",Integer.toString(RandomUtil.getFlag()));
        }
        if(value.indexOf("%pushFlag") >= 0){
            value = value.replaceAll("%pushFlag",Integer.toString(RandomUtil.getPushFlag()));
        }
        if(value.indexOf("%state") >= 0){
            value = value.replaceAll("%state",Integer.toString(RandomUtil.getState()));
        }
        if(value.indexOf("%d") >= 0){
            value = value.replaceAll("%d", RandomUtil.getNum().toString());
        }
        if(value.indexOf("%s") >= 0){
            value = value.replaceAll("%s", RandomUtil.getStr());
        }
        if(value.indexOf("%f") >= 0){
            value = value.replaceAll("%f",RandomUtil.getDoubleStr());
        }
        if(value.indexOf("%ts") >= 0){
            value = value.replaceAll("%ts",RandomUtil.getTimeStr());
        }
        if(value.indexOf("%tl") >= 0){
            value = value.replaceAll("%tl",RandomUtil.getTimeLongStr());
        }

        System.out.println(value);

        return value;
    }
}

addSource(mysql)

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.shengekeji.simulator.dao.BestDispatchDao;
import com.shengekeji.simulator.dao.DriverDao;
import com.shengekeji.simulator.model.DispatchModel;
import com.shengekeji.simulator.model.DriverModel;
import com.shengekeji.simulator.model.GeographyOrder;
import com.shengekeji.simulator.model.PassagesModel;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.ibatis.session.SqlSession;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


public class MySqlDriverSource implements SourceFunction<String> {

    private static final long serialVersionUID = 1L;

    private volatile boolean isRunning = true;

    /**
     * 此处是代码的关键,要从mysql表中,把数据读取出来
     * @param sourceContext
     * @throws Exception
     */

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        while(this.isRunning) {
            Thread.sleep(5000);
            System.out.println("--------------------------");
            SqlSession sqlSession = null;
            Map<String,Object> map = new HashMap<String, Object>();
            map.put("appId","SGKJ");
            try {
                sqlSession = MyBatisUtil.openSqlSession();
                // 通过SqlSession对象得到Mapper接口的一个代理对象
                // 需要传递的参数是Mapper接口的类型
                //司机信息数据
                DriverDao driverdao = sqlSession.getMapper(DriverDao.class);
                List<DriverModel> drivers = driverdao.selectAllActiveDriver(map);
                //处理每个司机
                for (DriverModel driver:drivers){
                    driver.setLoc(new LngLat(locLongitude,locLatitude));
                    driver.setSendTime(RandomUtil.getTimeStr());
                    String dr = JSONObject.toJSONString(driver, SerializerFeature.DisableCircularReferenceDetect);
                    System.out.println(dr);
                    sourceContext.collect(dr);
                }

            }catch (Exception e){
                e.printStackTrace();
                System.err.println(e.getMessage());
                sqlSession.rollback();
            }finally {

                if (sqlSession != null){
                    sqlSession.close();
                }

            }
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

}

注,数据读取用的mybatis方式,MyBatisUtil,DriverDao,DriverModel根据自己环境自己定义

addSink(hbase)

import com.shengekeji.owl.constant.Constants;
import com.shengekeji.owl.pojo.Message;
import com.shengekeji.owl.util.HBaseUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.util.ArrayList;
import java.util.List;

public class HbaseSink extends RichSinkFunction<Message> {
    private Integer maxSize = 1000;
    private Long delayTime = 5000L;
    private String tableName;

    public HbaseSink(String tableName) {
        this.tableName = tableName;
    }

    public HbaseSink(Integer maxSize, Long delayTime) {
        this.maxSize = maxSize;
        this.delayTime = delayTime;
    }

    private Connection connection;
    private Long lastInvokeTime;
    private List<Put> puts = new ArrayList<Put>();

    // 创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        connection = HBaseUtil.getConnection(Constants.ZOOKEEPER_QUORUM,Constants.ZOOKEEPER_PORT);
        // 获取系统当前时间
        lastInvokeTime = System.currentTimeMillis();
    }

    @Override
    public void invoke(Message value, Context context) throws Exception {

        System.out.println(value);
        String rk = value.id+"-"+value.ts;
        //创建put对象,并赋rk值
        Put put = new Put(rk.getBytes());

        // 添加值:f1->列族, order->属性名 如age, 第三个->属性值 如25
        put.addColumn("cf1".getBytes(), "id".getBytes(), value.id.getBytes());
        put.addColumn("cf1".getBytes(), "vals".getBytes(), value.vals.getBytes());
        put.addColumn("cf1".getBytes(), "p".getBytes(), (value.p+"").getBytes());
        put.addColumn("cf1".getBytes(), "ts".getBytes(), (value.ts+"").getBytes());
        System.out.println("----------");
        System.out.println(put);
        puts.add(put);// 添加put对象到list集合

        //使用ProcessingTime
        long currentTime = System.currentTimeMillis();

        System.out.println(currentTime - lastInvokeTime);
        //开始批次提交数据
        if (puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) {

            //获取一个Hbase表
            Table table = connection.getTable(TableName.valueOf(tableName));
            table.put(puts);//批次提交

            puts.clear();

            lastInvokeTime = currentTime;
            table.close();
        }
    }

    @Override
    public void close() throws Exception {
        connection.close();
    }

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink-addSource和addSink分别是kafka、自定义数据、mysql、hbase的java实现 的相关文章

随机推荐

  • vue使用threejs加载模型问题整理

    1 如果出现错误 THREE WebGLRenderer Error creating WebGL context 需要开启浏览器的gpu加速 GPU acceleration 地址栏输入 chrome flags ignore gpu b
  • 国考省考申论:归纳概括多个主体身上的优秀品质,透过动词现象(怎么做的),找到名词(精神品质)本质

    国考省考申论 归纳概括多个主体身上的优秀品质 透过动词现象 怎么做的 找到名词 精神品质 本质 2022找工作是学历 能力和运气的超强结合体 公务员特招重点就是专业技能 附带行测和申论 而常规国考省考最重要的还是申论和行测 所以大家认真准备
  • dede:list分页与控制文章标题显示字数

    关于dedecms分页 百度上也有许多教程 本人记性不好所以写个博客保存下来 pagesize控制每页显示条数 在 dede list 结束标签 后边写上 dede pagelist 标签即可 如何控制文章显示字 让溢出部分用 代替呢 其实
  • IPS与防火墙旁路部署

    一 防火墙旁路部署 实现防护功能的同时 可以完全不需改变用户的网络环境 并且可以避免设备对用户网络造成中断的风险 用于把设备接在交换机的镜像口或者接在 HUB 上 保证外网用户访问服务器的数据经过此交换机 并且设置镜像口的时候需要同时镜像上
  • iview+page封装+强制刷新

    前言 iview的page封装 缺点无法固定页码按钮数量 而且current的页面恢复选中第一个实现不了 这里动态写了强制刷新的方法 下面是组件cpage vue
  • 【Spring

    上篇 Spring 事件监听概述 对 Spring 事件监听的机制有了个基本的了解 本篇来详细的解读下Spring 的 事件监听机制 事件监听详解 ApplicationEvent ApplicationListener 基于注释 异步 排
  • 多态的实现

    多态 之前介绍过多态的概念就是基类引用派生类对象且和派生类有相同的同名覆盖函数 那么现在我们就具体讲讲怎么实现多态 类方法实现多态性有两种方法 1 方法重载 可以声明多个同名但参数个数 类型 和顺序不同的方法 编译时根据参数 个数 类型和顺
  • win环境下SSH key 配置

    从Gitlab上拉取代码报错 Warning Permanently added gitlab wang cn 47 94 8 13 ECDSA to the list of known hosts Connection closed by
  • windows下使用FFmpeg生成YUV视频文件并播放(通过命令的方式)

    一 YUV的定义 YUV是一种颜色编码方法 它跟我们常见的RGB格式区分开来 常使用在各个视频处理组件中 其中 Y 代表明亮度 U 和 V 代表其色度 视频播放器把市面上流行的MP4等格式文件的视频部分解码出来 得到的一般会是YUV格式的数
  • Java方法重写注意事项

    系原创 只为需要它的人 Java方法重写的几个要求 重写的方法与父类方法签名 方法名称和参数列表 相同 子类重写的方法访问修饰符范围不能低于父类 父类的私有方法不能被重写 static修饰的方法不能被重写 返回值类型 如果父类中方法返回值类
  • 解决mybatis一对多只能获取部分数据的问题

    需求 building表和position表 Building类中含有List positionList mybatis查询方法需要查询到所有的building和building中含有所有的position 问题 sql语句和一对多方法写的
  • SQL如何进行优化

    SQL优化 前言 对于初级程序开发工程师而言 SQL是很多人的弱项 为此我给大家来做一下总结 希望能够帮到你们 课程说明 1 对MySQL SQL优化方案做讲解 学习如何排查慢查询 SQL优化 分页查询优化 一页一页的往下面翻这种查询方式
  • 针对读写操作频繁的应用系统的LINUX调优设置

    在线签约系统调优 项目类型 限制型应用 需要频繁调用 进行签章 调用的 保存在 磁盘中 项目业务设计实现 这里简要说明一下业务流程 前端业务系统过来的请求通过Nignx进行分流 通过网关DSS 将各自的请求转发到相应的老 新签章系统进行处理
  • 小白学Redis系列:Redis持久化

    Redis作为缓存数据库 区别于常规数据库的地方就在于Redis将数据存储在内存中 而不是硬盘中 因此数据的IO就十分快速 非常适合一些电商网站等数据IO频繁的场景 当然 内存中的数据在掉电之后就会被清空 而Redis的持久化功能使得内存中
  • linux后台运行之screen和nohup

    linux后台运行之screen和nohup 3 1 nohup命令 如果你正在运行一个进程 而且你觉得在退出帐户时该进程还不会结束 那么可以使用nohup命令 该命令可以在你退出帐户 关闭终端之后继续运行相应的进程 nohup就是不挂起的
  • 进制及进制转换详解。原码、反码、移码,补码区别介绍。(通俗易懂)

    目录 前言 一 十进制 n进制 进制转换详解 1 先说说什么是进制 2 二进制介绍 3 十进制 n进制 进制转换详解 重点 十进制 gt n进制 2 8 16 n进制 2 8 16 gt 十进制 非十进制间的互相转化 二 原码 反码 移码
  • Python数据可视化 - 如何自定义Matplotlib图例?

    Python数据可视化 如何自定义Matplotlib图例 Matplotlib 是一个最常用的Python数据可视化库 它允许我们创建各种类型的图形 包括直方图 折线图 散点图 饼状图等 当我们绘制多个子图或多个曲线时 我们可能需要图例来
  • SpringBoot 整合 ElasticSearch

    整合前先理解几个概念 与关键字 开始前给大家推荐一款很火的刷题 面试求职网站 https www nowcoder com link pc csdncpt xiaoying java 索引
  • Java编程练习题:Demo96 - Demo105(多维数组)

    目录 Demo96 代数方面 两个矩阵相乘 编写两个矩阵相乘的方法 Demo97 距离最近的两个点 程序清单8 3给出找到二维空间中距离最近的两个点的程序 修改该程序 让程序能够找出在三维空间上距离最近的两个点 Demo98 最大的行和列
  • flink-addSource和addSink分别是kafka、自定义数据、mysql、hbase的java实现

    flink主程序 public class FinkTest public static void main String args throws Exception StreamExecutionEnvironment env Strea