纯干货基于flinkcdc实现mysql到mysql/oracle/...... DML实时同步

2023-11-07

CDC

首先什么是CDC ?它是Change Data Capture的缩写,即变更数据捕捉的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等操作。

Flink SQL CDC 数据同步与原理解析

CDC 全称是 Change Data Capture ,它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,可以从下面表格对比他们功能和差异点。

在这里插入图片描述flinkCDC文档
flinkCDC:https://ververica.github.io/flink-cdc-connectors/release-2.0/
flink文档
flink1.13:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/

废话不多说,开始实战
一:基于自定义source和sink的方式
1.业务表与数据源示例
源库schema:amir
源表:在这里插入图片描述目标schema:hmm
目标表:在这里插入图片描述
2.依赖如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.amir.flink</groupId>
	<artifactId>flinkcdc20</artifactId>
	<version>1.0.0</version>
	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<configuration>
					<source>8</source>
					<target>8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
	<packaging>jar</packaging>
	<description>this is test</description>
	<properties>
		<java.version>1.8</java.version>
		<fastjson.version>1.2.75</fastjson.version>
		<druid.version>1.2.5</druid.version>
		<flink.version>1.13.1</flink.version>
		<scala.binary.version>2.12</scala.binary.version>
		<HikariCP.version>3.2.0</HikariCP.version>
		<Impala.version>2.6.4</Impala.version>
		<kafka.version>2.8.0</kafka.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-blink_2.12</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>com.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka</artifactId>
			<version>2.11-1.9.0</version>
		</dependency>
		<dependency>
			<groupId>com.ververica</groupId>
			<artifactId>flink-connector-mysql-cdc</artifactId>
			<version>2.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-runtime_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>com.zaxxer</groupId>
			<artifactId>HikariCP</artifactId>
			<version>${HikariCP.version}</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.13</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.13</artifactId>
			<version>${kafka.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>${kafka.version}</version>
		</dependency>
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>${fastjson.version}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.7.25</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.25</version>
		</dependency>
		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.8.2</version>
		</dependency>
	</dependencies>

</project>

3.Source 和 Sink,此处sink以mysql示例

public class MySqlBinlogSourceExample {
  public static void main(String[] args) throws Exception {
    SourceFunction<String> sourceFunction = MySqlSource.<String>builder()
      .hostname("192.168.16.162")
      .port(3306)
      .databaseList("amir") // monitor all tables under inventory database
      .username("root")
      .password("123456")
      .deserializer(new JsonDebeziumDeserializationSchema())
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .addSink(new MysqlSink());

    env.execute("mysqlAmirToMysqlHmm");
  }
}

4.自定义序列化类JsonDebeziumDeserializationSchema,序列化Debezium输出的数据

public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

        HashMap<String, Object> hashMap = new HashMap<>();

        String topic = sourceRecord.topic();
        String[] split = topic.split("[.]");
        String database = split[1];
        String table = split[2];
        hashMap.put("database",database);
        hashMap.put("table",table);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        //获取数据本身
        Struct struct = (Struct)sourceRecord.value();
        Struct after = struct.getStruct("after");
        Struct before = struct.getStruct("before");
        /*
         	 1,同时存在 beforeStruct 跟 afterStruct数据的话,就代表是update的数据
             2,只存在 beforeStruct 就是delete数据
             3,只存在 afterStruct数据 就是insert数据
        */
        if (after != null) {
            //insert
            Schema schema = after.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) {
                hm.put(field.name(), after.get(field.name()));
            }
            hashMap.put("data",hm);
        }else if (before !=null){
            //delete
            Schema schema = before.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) {
                hm.put(field.name(), before.get(field.name()));
            }
            hashMap.put("data",hm);
        }else if(before !=null && after !=null){
            //update
            Schema schema = after.schema();
            HashMap<String, Object> hm = new HashMap<>();
            for (Field field : schema.fields()) {
                hm.put(field.name(), after.get(field.name()));
            }
            hashMap.put("data",hm);
        }

        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }else if("delete".equals(type)) {
            type = "delete";
        }else if("update".equals(type)) {
            type = "update";
        }

        hashMap.put("type",type);

        Gson gson = new Gson();
        collector.collect(gson.toJson(hashMap));
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

5.创建Sink,将数据变化存入mysql中,以insert、delete、update分别为例,如需要写入oracle、hdfs、hive、Clickhouse等,修改对应数据源连接信息

public class MysqlSink extends RichSinkFunction<String> {
    Connection connection;
    PreparedStatement iStmt,dStmt,uStmt;
    private Connection getConnection() {
        Connection conn = null;
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            String url = "jdbc:mysql://192.168.16.162:3306/hmm?useSSL=false";
            conn = DriverManager.getConnection(url,"root","123456");

        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String insertSql = "insert into amirtwo(ID,CRON) values (?,?)";
        String deleteSql = "delete from amirtwo where ID=?";
        String updateSql = "update amirtwo set CRON=? where ID=?";
        iStmt = connection.prepareStatement(insertSql);
        dStmt = connection.prepareStatement(deleteSql);
        uStmt = connection.prepareStatement(updateSql);
    }

    // 每条记录插入时调用一次
    public void invoke(String value, Context context) throws Exception {
        //{"database":"test","data":{"name":"jacky","description":"fffff","id":8},"type":"insert","table":"test_cdc"}        //{"CRON":"7","canal_type":"insert","ID":"6","canal_ts":0,"canal_database":"amirone","pk_hashcode":0}
        Gson t = new Gson();
        HashMap<String, Object> hs = t.fromJson(value, HashMap.class);
        String database = (String) hs.get("database");
        String table = (String) hs.get("table");
        String type = (String) hs.get("type");

        if ("amir".equals(database) && "amirone".equals(table)) {
            if ("insert".equals(type)) {
                System.out.println("insert => " + value);
                LinkedTreeMap<String, Object> data = (LinkedTreeMap<String, Object>) hs.get("data");
                String id = (String) data.get("ID");
                String cron = (String) data.get("CRON");
                iStmt.setString(1, id);
                iStmt.setString(2, cron);
                iStmt.executeUpdate();
            }else if ("delete".equals(type)) {
                System.out.println("delete => " + value);
                LinkedTreeMap<String, Object> data = (LinkedTreeMap<String, Object>) hs.get("data");
                String id = (String) data.get("ID");
                dStmt.setString(1, id);
                dStmt.executeUpdate();
            }else if ("update".equals(type)) {
                System.out.println("update => " + value);
                LinkedTreeMap<String, Object> data = (LinkedTreeMap<String, Object>) hs.get("data");
                String id = (String) data.get("ID");
                String cron = (String) data.get("CRON");
                uStmt.setString(1, cron);
                uStmt.setString(2, id);
                uStmt.executeUpdate();
            }
        }
    }
    @Override
    public void close() throws Exception {
        super.close();

        if(iStmt != null) {
            iStmt.close();
        }
        if(dStmt != null) {
            dStmt.close();
        }
        if(uStmt != null) {
            uStmt.close();
        }

        if(connection != null) {
            connection.close();
        }
    }
}

6.运行MySqlBinlogSourceExample,查看source和sink
source:
在这里插入图片描述sink:插入3行,删除1行,更新4行,数据实时从A库业务表更新至B库业务表
在这里插入图片描述二:基于Flink SQL CDC,面向sql,简单易上手

public class MysqlToMysqlMain {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
        env.enableCheckpointing(3000);
        // 高级选项:
        // 设置模式为exactly-once (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一时间只允许进行一个检查点
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
        //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setParallelism(1);

        EnvironmentSettings Settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        // 数据源表
        String sourceDDL =
                "CREATE TABLE mysql_binlog (\n" +
                        " ID STRING,\n" +
                        " CRON STRING,\n" +
                        " primary key (ID) not enforced\n" +
                        ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = '192.168.16.162',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = '123456',\n" +
                        " 'database-name' = 'amir',\n" +
                        " 'table-name' = 'amirone',\n" +
                        " 'scan.startup.mode' = 'latest-offset'\n" +
                        ")";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE test_cdc_sink (\n" +
                        " ID STRING,\n" +
                        " CRON STRING,\n" +
                        " primary key (ID) not enforced\n" +
                        ") WITH (\n" +
                        " 'connector' = 'jdbc',\n" +
                        " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                        " 'url' = 'jdbc:mysql://192.168.16.162:3306/hmm?serverTimezone=UTC&useSSL=false',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = '123456',\n" +
                        " 'table-name' = 'amirtwo'\n" +
                        ")";
        // 简单的聚合处理
        String transformDmlSQL =  "insert into test_cdc_sink select * from mysql_binlog";

        tableEnv.executeSql(sourceDDL);
        tableEnv.executeSql(sinkDDL);
        tableEnv.executeSql(transformDmlSQL);

        env.execute("sync-flink-cdc");
    }

}

最终代码结构
在这里插入图片描述ending
逐梦,time will tell,yep!!!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

纯干货基于flinkcdc实现mysql到mysql/oracle/...... DML实时同步 的相关文章

  • Zend Mysql 获取 ENUM 值

    I use Zend Framework在我的应用程序中 我想知道如何从 ENUM 字段中获取值MySQL table 例如 我有permissions field ENUM 删除管理员 edit admin 如何以最佳方式获取数组 删除管
  • 如何合并不同 MySQL 列中的日期和时间以与完整的 DateTime 进行比较?

    Column d是日期 列t是时间 列v例如 是 INT 假设我需要 2012 年 2 月 1 日 15 00 及之后记录的所有值 如果我写 SELECT FROM mytable WHERE d gt 2012 02 01 AND t g
  • 不同的数据库使用不同的名称引用吗?

    例如 mysql引用表名使用 SELECT FROM table name 注意 其他数据库是否使用不同的字符来引用其表名 这种引号的使用称为分隔标识符 它是 SQL 的重要组成部分 因为否则您将无法使用以下标识符 例如表名和列名 包含空格
  • 将 Wamp 服务器升级到 MySQL 8.0.15

    因此 我最近在几个月前安装了 WampServer 并预装了 mySQL 5 7 但我想利用 mySQL 8 附带的 NoSQL 功能 为了更新它 我下载了最新的MySQL版本 将文件夹解压到wamp64的bin目录中 然后 我从 5 7
  • Mysql - 如何搜索26条以字母开头的记录?

    基本上 我正在尝试创建一个查询 该查询可以根据英语字母表中的字母 26 个字母 从表中检索 26 个单词 所以 苹果 香蕉 椰子 等等 我一直在使用 like a 所以 SELECT from word WHERE word name li
  • 如何限制 SQLite / MySQL 中的列值

    我想限制表中的列值 例如 列值只能是car or bike or van 我的问题是如何在 SQL 中实现这一点 在数据库端执行此操作是一个好主意还是应该让应用程序限制输入 我还打算在将来添加或删除更多值 例如 truck 我使用的数据库类
  • 高级 MySQL:查找民意调查响应之间的相关性

    我有四个 MySQL 表 users 身份证号 姓名 polls ID 文本 options id poll id 文本 回应 id poll id 选项 id 用户 id 给定一个特定的民意调查和一个特定的选项 我想生成一个表格 显示其他
  • SQL IN 子句比单个查询慢

    我正在使用 Hibernate 的 JPA 实现和 MySQL 5 0 67 MySQL 配置为使用 InnoDB 在执行 JPA 查询 转换为 SQL 时 我发现使用IN子句比执行单个查询慢 例子 SELECT p FROM Person
  • 使用 JdbcTemplate 进行动态查询

    我有一个关于使用 JdbcTemplate 进行动态查询的问题 我的代码如下 String insertQueries INSERT INTO tablename StringJoiner joiner new StringJoiner S
  • Node.js - 我可以在 PhoneGap / Cordova 应用程序上安装 NPM 包吗?

    感谢 Cordova 我正在构建一个移动应用程序 并且由于 Cordova 基于 Node js 我认为我可以在应用程序中使用 NPM 包 例如 我希望我的移动应用程序能够与远程 MySQL 数据库通信 我想我可以使用 mysql NPM
  • MySQL 无法使用 PHP 连接到本地主机上的服务器

    我正在使用 XAMPP 1 7 2 可以通过 cmd 和 SQLYog 连接到 MySQL 但不能以编程方式连接 这段 PHP 代码 conn mysql connect localhost root if conn die Could n
  • mysql utf8_general_ci 区分大小写

    我有一个 mysql 数据库 我使用 utf8 general ci 不区分大小写 在我的表中 我有一些列 例如 ID 和区分大小写的数据 例如 iSZ6fX 或 AscSc2 为了区分大写和小写 最好只在这些列上设置 utf8 bin 如
  • 我是否应该标准化我的数据库?

    在设计数据库 例如 MySQL 的模式时 会出现是否完全规范化表的问题 一方面 连接 以及外键约束等 非常慢 另一方面 您会获得冗余数据和潜在的不一致 最后优化 是正确的方法吗 即创建一个按书本规范化的数据库 然后查看可以对哪些内容进行非规
  • 重新排列mysql中的主键

    从MySQL表中删除一些行后如何重新排列主键列值 例如 一个包含 4 行数据的表 主键值为 1 2 3 4 当删除第2行和第3行时 第4行的键值变为2 请帮助我找到解决方案 为什么要这样做 你不需要重新排列您的密钥 因为它只是记录的数字和标
  • 用于分页的php示例脚本[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 任何人都可以建议一个好的分页 php 脚本 其中人们想要分页显示数据库中的大量项目 以下链接可以帮助您
  • MySQL - 此版本的 MySQL 尚不支持“LIMIT 和 IN/ALL/ANY/SOME 子查询”

    这是php编码我正在使用的 Last Video db gt fetch all SELECT VID thumb FROM video WHERE VID IN SELECT VID FROM video WHERE title LIKE
  • 使用 JOIN 和 UNION 合并不同表中的记录

    我需要创建一个查询来组合两个表中的数据 我认为可能是 JOIN 和 UNION 的组合 在此示例中 我需要列出状态处于活动状态的所有姓名 仅一次 并将他们的葡萄酒 苏打水 晚餐 甜点和水果偏好组合起来 按姓名排序 我不确定单独的 JOIN
  • ON DUPLICATE KEY UPDATE 的自动增量过多

    我有一个包含列的基本表 id 主要是AI 名称 唯一 etc 如果唯一列不存在 则插入该行 否则更新该行 INSERT INTO pages name etc VALUES bob randomness ON DUPLICATE KEY U
  • 如何将另一列的整数值添加到日期列?

    我试图将整数添加到日期 但出现以下错误 1064 你的 SQL 语法有错误 检查与您的 MySQL 服务器版本相对应的手册 了解在第 6 行的 wp OrderDate INTERVAL WPProduct Duration DAY AS
  • 在 MySQL 中搜索多个单词

    我使用 HTML 表单来允许用户查找数据库表中的条目

随机推荐

  • Live555学习之路(一)

    有关live555的介绍 还是百度百科 http baike baidu com view 3495912 html fromTaglist 下载live555 http www live555 com liveMedia public 我
  • yolov4论文解读和训练自己数据集

    前天YOLOv4终于问世 YOLO v4 论文 https arxiv org abs 2004 10934 YOLO v4 开源代码 GitHub AlexeyAB darknet YOLOv4 Scaled YOLOv4 YOLO Ne
  • 简介setsockopt和udp的多播(组播)广播

    Tcp Udp中的单播 组播 广播 2019年06月27日 1 Setsockopt方法 1 1 setsockopt 函数 用于任意类型 任意状态套接口的设置选项值 int setsockopt int sockfd int level
  • JVM调优相关

    1 jvm中的一些工具 1 1 jps jps 用于查看java进程运行情况 输出JVM中运行的进程状态信息 命令行参数如下 m 输出传入main方法的参数 l 输出main类或Jar的全限名 v 输出传入JVM的参数 如上 bootstr
  • 逼格的一些小功能

    日常开发中 实用的一些小功能 比如各进制转换 计算内存地址什么的 分为C 部分和Unity部分 C 部分 只是存C 代码即可运行 Unity部分 需要用的Unity相关的api C 部分
  • TDK MPU9250的详细功能 替代方案ICM20948

    mpu9250的替代方案ICM20948 零知模块新品上架 ICM20948九轴模块替代MPU9250 附带示例 https www amobbs com thread 5722167 1 1 html ICM20948 TDK规格书 ht
  • 干货,一文彻底搞懂 Java 的 Optional

    想学习 永远都不晚 尤其是针对 Java 8 里面的好东西 Optional 就是其中之一 该类提供了一种用于表示可选值而非空引用的类级别解决方案 作为一名 Java 程序员 我真的是烦透了 NullPointerException NPE
  • Java判断String是否为空或不为空(并且equals的好的写法,不报空指针)

    首先澄清一个概念 String str if str null str equals 1 2 3 str null 说明str还未指向一个特定的字符串对象 此时谈不上是否为空 str 说明str是个空字符串 只不过长度为0 字符串判断空的几
  • bleve和es RediSearch 区别

    目录 区别 bleve支撑多大的数据量 bleve是否支持集群和分布式 bleve主要应用场景 区别 Bleve 和 Elasticsearch 是两个不同的搜索引擎库 而不是像 Bleve 和 RediSearch 那样相互竞争的产品 B
  • PAT (Basic Level) Practice (中文)1033 旧键盘打字

    旧键盘上坏了几个键 于是在敲一段文字的时候 对应的字符就不会出现 现在给出应该输入的一段文字 以及坏掉的那些键 打出的结果文字会是怎样 输入格式 输入在 2 行中分别给出坏掉的那些键 以及应该输入的文字 其中对应英文字母的坏键以大写给出 每
  • wireshark抓包红色_Wireshark使用教程:不同报文颜色的含义

    Wireshark色彩规则 在Wireshark主界面 报文会显示各种各样的颜色 它们表示不同的含义 这些颜色 是由色彩规则控制的 对这些颜色进行适当的了解 对分析报文有很大帮助 01 设置 色彩规则有两个入口 一个在报文上方的工具栏内 如
  • Java调用WebService接口的四种方式

    调用WebService 使用wsimport生成代码 不推荐 使用Axis 1 4 动态调用 使用HTTP SOAP方式远程调用 通过Spring注解方式调用 使用wsimport生成代码 不推荐 配置java环境变量后在命令窗口中输入
  • Keil提示错误L6218E:Undefined symbol TIM_OC2Init解决办法

    1 如题 我在将正点原子的PWM输出代码移植到led的程序上 编译后发现报了七个错误 都是L6218E 2 由于这个是标准库 程序中引用了很多ST官方的库函数 但是你没有引用 所以会报错 对比正点原子的代码内容和我的 发现我的FWLIB中缺
  • JavaScript 数据结构之数组

    JavaScript 数据结构之数组思维导图 JavaScript 数据结构之数组源码
  • Kafka最详细总结

    Kafka Kafka是最初由Linkedin公司开发 是一个分布式 支持分区的 partition 多副本的 replica 基于zookeeper协调的分布式消息系统 它的最大的特性就是可以实时的处理大量数据以满足各种需求场景 比如基于
  • mac android自动化测试学习心得,解决uiautomator视图界面无法打开的问题

    原因是我安装的jdk版本为9 0 4 彻底卸载后 安装8即可正常启动 输入命令 sudo rm fr Library Internet Plug Ins JavaAppletPlugin plugin sudo rm fr Library
  • 各版本Pytorch安装详解

    Pytorch安装教程 windows版本 conda安装 1 cuda9 0 python3 6 3 5 3 7 conda install pytorch c pytorch pip3 install torchvision 2 cud
  • AVFoundation 播放器实例

    播放器 使用苹果官方的 AVFoundation 框架 可以很容易的封装一个视频的播放器 在获取视频资源后 只需要将视频在播放图层中渲染即可 并且可以在图层中添加控件 以便对视频的播放和暂停进行控制 下面封装的库 主要包含两个类 HXJPl
  • FTP云盘

    参考 FTP云盘项目 作者 糯米啊啊 发布时间 2021 08 19 10 34 05 网址 https blog csdn net weixin 43732386 spm 1001 2014 3001 5509 参考 自制FTP云盘项目
  • 纯干货基于flinkcdc实现mysql到mysql/oracle/...... DML实时同步

    CDC 首先什么是CDC 它是Change Data Capture的缩写 即变更数据捕捉的简称 使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游 供下游使用 这些变更可以包括INSERT DELETE UPDATE等操作