我有一个看起来像的架构
Column | Type |
-------------------------------------------------------
message_id | integer |
user_id | integer |
body | text |
created_at | timestamp without time zone |
source | jsonb |
symbols | jsonb[] |
我正在尝试使用 psycopg2 通过 psycopg2.Cursor.copy_from() 插入数据,但我遇到了很多问题,试图弄清楚 jsonb[] 对象应该如何格式化。当我直接列出 JSON 对象时,出现如下错误
psycopg2.errors.InvalidTextRepresentation: malformed array literal: "[{'id': 13016, 'symbol':
....
DETAIL: "[" must introduce explicitly-specified array dimensions.
我在双引号和花括号上尝试了多种不同的转义。如果我对数据执行 json.dumps() ,则会收到以下错误。
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type json
DETAIL: Token "'" is invalid.
此错误是从此代码片段收到的
messageData = []
symbols = messageObject["symbols"]
newSymbols = []
for symbol in symbols:
toAppend = symbol
toAppend = refineJSON(json.dumps(symbol))
toAppend = re.sub("{", "\{", toAppend)
toAppend = re.sub("}", "\}", toAppend)
toAppend = re.sub('"', '\\"', toAppend)
newSymbols.append(toAppend)
messageData.append(set(newSymbols))
我也愿意将列定义为不同的类型(例如文本),然后尝试转换,但我也无法做到这一点。
messageData 是调用 psycopg2.Cursor.copy_from() 的辅助函数的输入
def copy_string_iterator_messages(connection, messages, size: int = 8192) -> None:
with connection.cursor() as cursor:
messages_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (messageData[0], messageData[1], messageData[2], messageData[3], messageData[4], messageData[5], messageData[6], messageData[7], messageData[8], messageData[9], messageData[10],
messageData[11],
))) + '\n'
for messageData in messages
))
# pp.pprint(messages_string_iterator.read())
cursor.copy_from(messages_string_iterator, 'test', sep='|', size=size)
connection.commit()
编辑:根据迈克的输入,我更新了代码以使用execute_batch(),其中消息是包含每条消息的messageData的列表。
def insert_execute_batch_iterator_messages(connection, messages, page_size: int = 1000) -> None:
with connection.cursor() as cursor:
iter_messages = ({**message, } for message in messages)
print("inside")
psycopg2.extras.execute_batch(cursor, """
INSERT INTO test VALUES(
%(message_id)s,
%(user_id)s,
%(body)s,
%(created_at)s,
%(source)s::jsonb,
%(symbols)s::jsonb[]
);
""", iter_messages, page_size=page_size)
connection.commit()