你可以尝试Pg批量插入 https://github.com/bytefish/PgBulkInsert,它实现了 PostgreSQL 的二进制复制协议:
- https://github.com/bytefish/PgBulkInsert https://github.com/bytefish/PgBulkInsert
也可以从 Maven 中央存储库获取它。
免责声明:我是项目作者。
PostgreSQL 二进制复制协议
我不想简单地宣传我的项目,还想写一下协议。
首先我写了一个类PgBinaryWriter
,其中包含一个DataOutputStream
并具有编写二进制协议标头的方法、开始新行的方法(二进制复制协议要求您为要插入的每一行编写列数)和一个write
方法,该方法需要一个IValueHandler<TTargetType>
用于编写给定的 Java 类型。
The PgBinaryWriter
实现一个AutoClosable
,因为有必要写一个-1
在刷新和关闭流之前添加到流中。
The IValueHandler<TTargetType>
需要一个DataOutputStream
和一个值。它负责使用 PostgreSQL 二进制协议格式写入给定值。
PgBinaryWriter
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;
public class PgBinaryWriter implements AutoCloseable {
/** The ByteBuffer to write the output. */
private transient DataOutputStream buffer;
public PgBinaryWriter() {
}
public void open(final OutputStream out) {
buffer = new DataOutputStream(new BufferedOutputStream(out));
writeHeader();
}
private void writeHeader() {
try {
// 11 bytes required header
buffer.writeBytes("PGCOPY\n\377\r\n\0");
// 32 bit integer indicating no OID
buffer.writeInt(0);
// 32 bit header extension area length
buffer.writeInt(0);
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}
public void startRow(int numColumns) {
try {
buffer.writeShort(numColumns);
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}
public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
handler.handle(buffer, value);
}
@Override
public void close() {
try {
buffer.writeShort(-1);
buffer.flush();
buffer.close();
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}
}
值处理器
An IValueHandler
是一个简单的界面,其中有一个handle
方法采取DataOutputStream
和一个值。
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
import java.io.DataOutputStream;
import java.lang.reflect.Type;
public interface IValueHandler<TTargetType> extends ValueHandler {
void handle(DataOutputStream buffer, final TTargetType value);
Type getTargetType();
}
了解协议很重要,您必须编写一个-1
当一个值为空时。为此,我编写了一个抽象基类来处理这种情况。
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import java.io.DataOutputStream;
public abstract class BaseValueHandler<T> implements IValueHandler<T> {
@Override
public void handle(DataOutputStream buffer, final T value) {
try {
if (value == null) {
buffer.writeInt(-1);
return;
}
internalHandle(buffer, value);
} catch (Exception e) {
throw new BinaryWriteFailedException(e);
}
}
protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
}
然后可以实现各种 Java 类型的处理程序。这是示例long
。您可以找到
GitHub 存储库中的其他实现(handlers https://github.com/PgBulkInsert/PgBulkInsert/tree/master/PgBulkInsert/pgbulkinsert-core/src/main/java/de/bytefish/pgbulkinsert/pgsql/handlers).
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
import java.io.DataOutputStream;
import java.lang.reflect.Type;
public class LongValueHandler extends BaseValueHandler<Long> {
@Override
protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
buffer.writeInt(8);
buffer.writeLong(value);
}
@Override
public Type getTargetType() {
return Long.class;
}
}
使用 PgBinaryWriter
现在终于到了连接零件的时候了。请注意,我抽象了更多部分。可能需要在代码中查找更多实现细节。
public abstract class PgBulkInsert<TEntity> {
// ...
public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {
CopyManager cpManager = connection.getCopyAPI();
CopyIn copyIn = cpManager.copyIn(getCopyCommand());
int columnCount = columns.size();
try (PgBinaryWriter bw = new PgBinaryWriter()) {
// Wrap the CopyOutputStream in our own Writer:
bw.open(new PGCopyOutputStream(copyIn));
// Insert all entities:
entities.forEach(entity -> {
// Start a New Row:
bw.startRow(columnCount);
// Insert the Column Data:
columns.forEach(column -> {
try {
column.getWrite().invoke(bw, entity);
} catch (Exception e) {
throw new SaveEntityFailedException(e);
}
});
});
}
}
private String getCopyCommand()
{
String commaSeparatedColumns = columns.stream()
.map(x -> x.columnName)
.collect(Collectors.joining(", "));
return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
table.GetFullQualifiedTableName(),
commaSeparatedColumns);
}
}
Pg批量插入
PgBulkInsert 支持以下 PostgreSQL 数据类型。
-
Numeric Types http://www.postgresql.org/docs/current/static/datatype-numeric.html
- smallint
- integer
- bigint
- real
- 双精度
-
Date/Time Types http://www.postgresql.org/docs/current/static/datatype-datetime.html
-
Character Types http://www.postgresql.org/docs/current/static/datatype-character.html
-
Boolean Type http://www.postgresql.org/docs/current/static/datatype-boolean.html
-
Binary Data Types http://www.postgresql.org/docs/current/static/datatype-binary.html
-
Network Address Types http://www.postgresql.org/docs/current/static/datatype-net-types.html
-
UUID Type http://www.postgresql.org/docs/current/static/datatype-uuid.html
基本用法
想象一下,应该将大量人员批量插入到 PostgreSQL 数据库中。每个Person
有名字、姓氏和出生日期。
数据库表
PostgreSQL 数据库中的表可能如下所示:
CREATE TABLE sample.unit_test
(
first_name text,
last_name text,
birth_date date
);
领域模型
应用程序中的域模型可能如下所示:
private class Person {
private String firstName;
private String lastName;
private LocalDate birthDate;
public Person() {}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public LocalDate getBirthDate() {
return birthDate;
}
public void setBirthDate(LocalDate birthDate) {
this.birthDate = birthDate;
}
}
批量插入器
然后你必须实施PgBulkInsert<Person>
,它定义了表和域模型之间的映射。
public class PersonBulkInserter extends PgBulkInsert<Person>
{
public PersonBulkInserter() {
super("sample", "unit_test");
MapString("first_name", Person::getFirstName);
MapString("last_name", Person::getLastName);
MapDate("birth_date", Person::getBirthDate);
}
}
使用批量插入器
最后我们可以编写一个单元测试来插入100000
人员进入数据库。您可以在 GitHub 上找到整个单元测试:集成测试.java https://github.com/bytefish/PgBulkInsert/blob/master/PgBulkInsert/src/test/de/bytefish/pgbulkinsert/de/bytefish/pgbulkinsert/IntegrationTest.java.
@Test
public void bulkInsertPersonDataTest() throws SQLException {
// Create a large list of Persons:
List<Person> persons = getPersonList(100000);
// Create the BulkInserter:
PersonBulkInserter personBulkInserter = new PersonBulkInserter();
// Now save all entities of a given stream:
personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream());
// And assert all have been written to the database:
Assert.assertEquals(100000, getRowCount());
}
private List<Person> getPersonList(int numPersons) {
List<Person> persons = new ArrayList<>();
for (int pos = 0; pos < numPersons; pos++) {
Person p = new Person();
p.setFirstName("Philipp");
p.setLastName("Wagner");
p.setBirthDate(LocalDate.of(1986, 5, 12));
persons.add(p);
}
return persons;
}