Java库为Postgres COPY编写二进制格式?

2024-01-03

有没有人遇到过 Java 库(或只是一些代码)来编写binaryPostgres 使用的格式复制命令 http://www.postgresql.org/docs/9.2/interactive/sql-copy.html#AEN66736?

它看起来很简单,但如果有人已经找到了正确的元组数据格式,我也可以从那里开始。

实际上,即使只是对所有数据类型的格式进行描述也会有所帮助。

Thanks.


你可以尝试Pg批量插入 https://github.com/bytefish/PgBulkInsert,它实现了 PostgreSQL 的二进制复制协议:

  • https://github.com/bytefish/PgBulkInsert https://github.com/bytefish/PgBulkInsert

也可以从 Maven 中央存储库获取它。

免责声明:我是项目作者。

PostgreSQL 二进制复制协议

我不想简单地宣传我的项目,还想写一下协议。

首先我写了一个类PgBinaryWriter,其中包含一个DataOutputStream并具有编写二进制协议标头的方法、开始新行的方法(二进制复制协议要求您为要插入的每一行编写列数)和一个write方法,该方法需要一个IValueHandler<TTargetType>用于编写给定的 Java 类型。

The PgBinaryWriter实现一个AutoClosable,因为有必要写一个-1在刷新和关闭流之前添加到流中。

The IValueHandler<TTargetType>需要一个DataOutputStream和一个值。它负责使用 PostgreSQL 二进制协议格式写入给定值。

PgBinaryWriter

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;


import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;

public class PgBinaryWriter implements AutoCloseable {

    /** The ByteBuffer to write the output. */
    private transient DataOutputStream buffer;

    public PgBinaryWriter() {
    }

    public void open(final OutputStream out) {
        buffer = new DataOutputStream(new BufferedOutputStream(out));

        writeHeader();
    }

    private void writeHeader() {
        try {

            // 11 bytes required header
            buffer.writeBytes("PGCOPY\n\377\r\n\0");
            // 32 bit integer indicating no OID
            buffer.writeInt(0);
            // 32 bit header extension area length
            buffer.writeInt(0);

        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public void startRow(int numColumns) {
        try {
            buffer.writeShort(numColumns);
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
        handler.handle(buffer, value);
    }

    @Override
    public void close() {
        try {
            buffer.writeShort(-1);

            buffer.flush();
            buffer.close();
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }
}

值处理器

An IValueHandler是一个简单的界面,其中有一个handle方法采取DataOutputStream和一个值。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public interface IValueHandler<TTargetType> extends ValueHandler {

    void handle(DataOutputStream buffer, final TTargetType value);

    Type getTargetType();

}

了解协议很重要,您必须编写一个-1当一个值为空时。为此,我编写了一个抽象基类来处理这种情况。

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;

import java.io.DataOutputStream;

public abstract class BaseValueHandler<T> implements IValueHandler<T> {

    @Override
    public void handle(DataOutputStream buffer, final T value) {
        try {
            if (value == null) {
                buffer.writeInt(-1);
                return;
            }
            internalHandle(buffer, value);
        } catch (Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
}

然后可以实现各种 Java 类型的处理程序。这是示例long。您可以找到 GitHub 存储库中的其他实现(handlers https://github.com/PgBulkInsert/PgBulkInsert/tree/master/PgBulkInsert/pgbulkinsert-core/src/main/java/de/bytefish/pgbulkinsert/pgsql/handlers).

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public class LongValueHandler extends BaseValueHandler<Long> {

    @Override
    protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
        buffer.writeInt(8);
        buffer.writeLong(value);
    }

    @Override
    public Type getTargetType() {
        return Long.class;
    }
}

使用 PgBinaryWriter

现在终于到了连接零件的时候了。请注意,我抽象了更多部分。可能需要在代码中查找更多实现细节。

public abstract class PgBulkInsert<TEntity> {

    // ... 

    public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {

        CopyManager cpManager = connection.getCopyAPI();
        CopyIn copyIn = cpManager.copyIn(getCopyCommand());

        int columnCount = columns.size();

        try (PgBinaryWriter bw = new PgBinaryWriter()) {

            // Wrap the CopyOutputStream in our own Writer:
            bw.open(new PGCopyOutputStream(copyIn));

            // Insert all entities:                
            entities.forEach(entity -> {

                // Start a New Row:
                bw.startRow(columnCount);
                
                // Insert the Column Data:
                columns.forEach(column -> {
                    try {
                        column.getWrite().invoke(bw, entity);
                    } catch (Exception e) {
                        throw new SaveEntityFailedException(e);
                    }
                });
            });
        }
    }
    
    private String getCopyCommand()
    {
        String commaSeparatedColumns = columns.stream()
                .map(x -> x.columnName)
                .collect(Collectors.joining(", "));

        return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
                table.GetFullQualifiedTableName(),
                commaSeparatedColumns);
    }
}

Pg批量插入

PgBulkInsert 支持以下 PostgreSQL 数据类型。

  • Numeric Types http://www.postgresql.org/docs/current/static/datatype-numeric.html
    • smallint
    • integer
    • bigint
    • real
    • 双精度
  • Date/Time Types http://www.postgresql.org/docs/current/static/datatype-datetime.html
    • 时间戳
    • date
  • Character Types http://www.postgresql.org/docs/current/static/datatype-character.html
    • text
  • Boolean Type http://www.postgresql.org/docs/current/static/datatype-boolean.html
    • boolean
  • Binary Data Types http://www.postgresql.org/docs/current/static/datatype-binary.html
    • bytea
  • Network Address Types http://www.postgresql.org/docs/current/static/datatype-net-types.html
    • inet(IPv4、IPv6)
  • UUID Type http://www.postgresql.org/docs/current/static/datatype-uuid.html
    • uuid

基本用法

想象一下,应该将大量人员批量插入到 PostgreSQL 数据库中。每个Person有名字、姓氏和出生日期。

数据库表

PostgreSQL 数据库中的表可能如下所示:

CREATE TABLE sample.unit_test
(
    first_name text,
    last_name text,
    birth_date date
);

领域模型

应用程序中的域模型可能如下所示:

private class Person {

    private String firstName;

    private String lastName;

    private LocalDate birthDate;

    public Person() {}

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public LocalDate getBirthDate() {
        return birthDate;
    }

    public void setBirthDate(LocalDate birthDate) {
        this.birthDate = birthDate;
    }
    
}

批量插入器

然后你必须实施PgBulkInsert<Person>,它定义了表和域模型之间的映射。

public class PersonBulkInserter extends PgBulkInsert<Person>
{
    public PersonBulkInserter() {
        super("sample", "unit_test");

        MapString("first_name", Person::getFirstName);
        MapString("last_name", Person::getLastName);
        MapDate("birth_date", Person::getBirthDate);
    }
}

使用批量插入器

最后我们可以编写一个单元测试来插入100000人员进入数据库。您可以在 GitHub 上找到整个单元测试:集成测试.java https://github.com/bytefish/PgBulkInsert/blob/master/PgBulkInsert/src/test/de/bytefish/pgbulkinsert/de/bytefish/pgbulkinsert/IntegrationTest.java.

@Test
public void bulkInsertPersonDataTest() throws SQLException {
    // Create a large list of Persons:
    List<Person> persons = getPersonList(100000);
    
    // Create the BulkInserter:
    PersonBulkInserter personBulkInserter = new PersonBulkInserter();
    
    // Now save all entities of a given stream:
    personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream());
    
    // And assert all have been written to the database:
    Assert.assertEquals(100000, getRowCount());
}

private List<Person> getPersonList(int numPersons) {
    List<Person> persons = new ArrayList<>();

    for (int pos = 0; pos < numPersons; pos++) {
        Person p = new Person();

        p.setFirstName("Philipp");
        p.setLastName("Wagner");
        p.setBirthDate(LocalDate.of(1986, 5, 12));

        persons.add(p);
    }

    return persons;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Java库为Postgres COPY编写二进制格式? 的相关文章

  • 如何获取 PostgreSQL 中表上所有索引的列名列表?

    我有这个查询来获取表上的索引列表 SELECT ns nspname as schema name tab relname as table name cls relname as index name am amname as index
  • cygwin有java sdk吗?

    cygwin有java sdk吗 如果有一个使用 cygwin 文件系统和 X windows 进行显示的本机 cygwin 实现 那就太好了 不幸的是我不知道这样的版本 我认为移植 OpenJDK 也需要付出很大的努力 但我还没有尝试过
  • 将倒计时器从 10 秒改为 1 秒

    我有一个倒计时器 它以 1 秒的增量从 10000 毫秒倒计时到 0 毫秒 以使按钮在 10 秒后可单击 尽管计时器是准确的并且按照代码的说明执行操作 但我想更改秒的表示方式 但我不知道如何更改 java void startTimer c
  • Java 8 中异常类型推断的一个独特功能

    在为该网站上的另一个答案编写代码时 我遇到了这个特性 static void testSneaky final Exception e new Exception sneakyThrow e no problems here nonSnea
  • 如何将 Cucumber 中的数据表转换为对象列表?

    原标题 Java 中的 Cucumber DataTables 中的标量是什么意思 From 参考 Java 提供了几种标量类型 这些包括原始数字 类型 加上布尔值和字符 每个标量 原始 类型都有一个关联的包装类或 参考类型 阅读javad
  • java.time.LocalDate 到 java.util.Date

    转换的最佳方式是什么java time LocalDate to java util Date Date from dateToReturn atStartOfDay ZoneId systemDefault toInstant 我一直在尝
  • LibGDX 闪烁

    我已经使用 LibGDX UI 设置来启动一个项目 我在实现 ApplicationListener 中唯一拥有的是 public void create setScreen new LoadingScreen this 这应该会触发 Lo
  • 枚举内的枚举

    这不是我被卡住的问题 而是我正在寻找一种简洁的方式来编写我的代码 本质上 我正在编写一个事件驱动的应用程序 用户触发事件 事件被发送到适当的对象 然后对象处理事件 现在我正在编写偶数处理程序方法 我希望使用 switch 语句来确定如何处理
  • Logback 配置在单行上有异常吗?

    我的日志被提取 传输并合并到 elasticsearch 中 多行事件很难跟踪和诊断 有没有办法使用收集器和正则表达式将异常行分组到单个记录中登录配置 https logback qos ch manual layouts html xTh
  • 如何在不使用反射的情况下查看对象是否是数组?

    在Java中如何在不使用反射的情况下查看对象是否是数组 如何在不使用反射的情况下迭代所有项目 我使用 Google GWT 所以不允许我使用反射 我很想在不使用反射的情况下实现以下方法 private boolean isArray fin
  • 如何在 Java 中读取/转换 InputStream 为字符串?

    如果你有一个java io InputStream对象 您应该如何处理该对象并生成一个String 假设我有一个InputStream包含文本数据 我想将其转换为String 例如我可以将其写入日志文件 最简单的方法是什么InputStre
  • 如何保存/加载 BigInteger 数组

    我想保存 加载BigInteger数组传入 传出 SharedPreferences 如何做呢 例如对于以下数组 private BigInteger dataCreatedTimes new BigInteger 20 Using Gso
  • 我有什么理由应该嘲笑?

    我也是 Mockito 和 PowerMockito 的新手 我发现我无法使用纯 Mockito 测试静态方法 因此我需要使用 PowerMockito 对吗 我有一个非常简单的类 名为 Validate 使用这个非常简单的方法 publi
  • 抛出 UnsupportedOperationException

    因此其中一种方法的描述如下 public BasicLinkedList addToFront T data 该操作无效 对于排序列表 将生成 UnsupportedOperationException 使用消息 排序列表的操作无效 我的代
  • 如何在 iText 7 中创建页面大小不等的文档

    如何在 iText 7 中创建页面大小不等的文档 iText7 可以吗 在iText5中 我使用document setPageSize and document newPage 如果您通过高级 API 添加内容 Document add
  • 如何在 Spring GCP 中订阅多个 Google PubSub 项目?

    我想在 Spring Boot 应用程序中订阅多个 Google Cloud PubSub 项目 阅读完相关问题后如何使用 Spring Cloud 在一个 Spring Boot 应用程序中连接 配置两个 pubsub gcp 项目 ht
  • 如何在java中进行多处理,以及预期的速度提升是多少?

    我是一个新手 使用 Java 对 csv 文件进行一些数据处理 为此 我使用 Java 的多线程功能 线程池 将 csv 文件批量导入到 Java 中 并对每一行执行一些操作 在我的四核处理器上 多线程大大加快了处理速度 我很想知道多处理如
  • 如何在 logback 中启动时滚动日志文件

    我想配置 logback 来执行以下操作 记录到文件 当文件达到 50MB 时滚动文件 仅保留 7 天的日志 启动时始终生成一个新文件 滚动 除了最后一项 启动卷 外 我一切都正常 有谁知道如何实现这一目标 这是配置
  • 无法取消 GWT 中的重复计时器

    我正在尝试在 GWT 中安排一个重复计时器 它将每一毫秒运行一次 轮询某个事件 如果发现满意 则执行某些操作并取消计时器 我尝试这样做 final Timer t new Timer public void run if condition
  • 最新版本 6.* Struts2 支持 Tomcat 10 吗? [复制]

    这个问题在这里已经有答案了 最新版本 6 Struts2 支持 Tomcat 10 吗 异常启动过滤器 struts2 java lang ClassCastException class org apache struts2 dispat

随机推荐

  • SQL Server 存储过程区分大小写吗?

    我有一个区分大小写的服务器 SQL Latin1 General CP1 CS AS 但数据库不区分大小写 SQL Latin1 General CP1 CI AS 如果我尝试在数据库上创建以下存储过程 则会收到错误 必须声明标量变量 te
  • Xamarin Forms CollectionView TapGestureRecognizer 未在标签上触发

    我有一个 XF 应用程序 定义了以下集合视图 第二个标签有一个不触发的 TapGestureRecognizerDoSomethingInteresting当我点击标签时在模型中 在 Android 上尝试这个 有人可以看看问题是什么吗 工
  • 使用 *apply 函数访问数据框的列名称

    我需要为初学者使用 R apply 函数制作教程 第一次不使用 reshape 或 plyr 包 我试着lapply 因为我读了apply对于数据框不好 这个数据框的一个简单函数 我想使用命名列来访问数据 fDist lt function
  • Powershell 中的标准化文件路径比较

    假设我有以下两条路径 比如说 两者都是有效的 p1 D folder1 p2 D Folder1 我想比较这两条路径的相等性 我期望两条路径是相同的 我尝试了以下两个命令 Resolve Path p1 eq Resolve Path p2
  • 我应该通过引用传递shared_ptr吗? [复制]

    这个问题在这里已经有答案了 传递shared ptr的最佳实践是什么 目前我传递shared ptr函数参数 如下所示 void function1 shared ptr
  • Matplotlib 中颜色图的设置范围

    我正在使用 matplotlib 绘制一个简单的图表 cm plt get cmap Blues nx draw circular G node color White edge color range G number of edges
  • 使用 ASP.net 对文件夹中的图像进行计数

    我想计算文件夹中的图像数量 但它会产生以下错误 Could not find a part of the path c Content slideshow images image 所有图像都位于项目的文件夹中 位于一个Content sl
  • Java 四舍五入

    我怎样才能将 numberGrade 的值向上舍入 这样如果它是 89 5 它就会变为 90 numberGrade 被视为双精度值 但将其设为 int 不会将其向上或向下舍入 public class GradeReporter The
  • 为什么类的常量数据成员需要在构造函数中初始化?

    我想知道为什么类的常量数据成员需要在构造函数中初始化 为什么不在其他地方初始化 这样做和不这样做有什么影响 我也看到只有静态常数积分数据可以在类内部初始化 但不能在类内部初始化任何数据成员 例如 假设下面是我的班级声明 class A in
  • 控制数组 vb.net

    我正在尝试在 VB net 中为购物系统编写一个程序 它将读取数据库并填充表单上的项目 该应用程序在可滚动面板内的标签中显示产品名称等信息 我正在创建对象并在运行时分配文本等值 我在代码中使用循环 如果我使用 vb 6 我将有一个控制数组
  • MongoError:连接未知

    我有一个 mongo 服务器在 localhost 27017 上运行 使用 mongo 控制台我可以在 mongodb 中插入数据 但是当我尝试使用 node js 连接它时 我收到上述错误 以下是使用的代码 var MongoClien
  • Anaconda 中的“基础”(最佳实践)的目的是什么?

    它说这是默认环境 但 不过 您不想将程序放入您的基本环境中 那么我到底应该用它做什么呢 我创建的其他环境是否继承自基础环境 基础环境在哪里conda本身被安装 最好使用Miniconda 并安装all将您想要的东西放入单独的环境中 其他环境
  • 如何让 Perl 正则表达式仅在 $1 < $2 时匹配?

    我无法完全开始工作的部分是条件 因为它总是失败 use Test More tests gt 2 my regex qr d d g1 lt g2 FAIL x like 23 36 regex should match unlike 36
  • 用户/角色管理系统的最佳设计? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 这是我多次遇到的软件设计问题 但从未找到理想的解决方案 我现在也再次处理它 许多应用程序需要某种形式的用户 角色管理 您有基本用户 这
  • Chrome 中的 SVG“使用”标签已损坏

    SAP AngularJS 和 Angular Route 具有由 svg sprite 制作的基于图标的导航 所以 我有这样的内联代码 div style height 1 width 1 div
  • `uint64_t` 有什么困难? (从“float”转换组装)

    我现在的情况是需要计算类似的东西size t s size t floorf f 也就是说 参数是浮点数 但它有一个整数值 假设floorf f 足够小 可以准确表示 在优化这个的过程中 我发现了一些有趣的事情 以下是一些转换自float到
  • 如何在 Maple 中加载包?

    我想使用一个名为 ESC 椭圆面计算器的包 可以用Maple上传 创建者的指示是 使用字符编码 ISO 8859 1 ISO Latin 1 另存为文本文件 并使用 读取 命令在 Maple 中加载 我在上传到 Maple 并使用此编码保存
  • Android开发:如何以位图形式打开.dcm文件?

    我目前正在尝试制作一个 android dicom 应用程序 以下代码以 通常 图像格式从 res drawable 打开图片 但不适用于 dcm public class BitmapView extends View public Bi
  • 如何将西里尔字母音译为拉丁文本

    我有一种方法可以将任何拉丁文本 例如英语 法语 德语 波兰语 转换为其 slug 形式 e g Alpha Bravo Charlie gt alpha bravo charlie 但它不适用于西里尔文文本 例如俄语 所以我想做的是将西里尔
  • Java库为Postgres COPY编写二进制格式?

    有没有人遇到过 Java 库 或只是一些代码 来编写binaryPostgres 使用的格式复制命令 http www postgresql org docs 9 2 interactive sql copy html AEN66736 它