通过python向pgsql一次性插入上万条数据

数据库为pgsql,表中有jsonb形式的字段

方式一:使用execute直接执行insert语句,比较慢

  • insert语句中json格式字段需要转换
  • 数据库中NULL、true、false字段,python中需要替换为None、True、False
# -*- coding: UTF-8 -*-
import psycopg2
from psycopg2.extras import Json

connection = psycopg2.connect(
    host="xxx",
    port="xxx",
    database="xxx",
    user="xxx",
    password="xxx"
)
cursor = connection.cursor()
for i in range(10000,100000):
    id = 2066645753478356990+i
    order = '0521'+str(i)
    cursor.execute("INSERT INTO public.mo_mfg_order (id, order, deleted, name, extra) VALUES (%s,%s,%s,%s,%s)",(id, order, False, None,  Json({"line": [], "workshop": []})))
connection.commit()
cursor.close()
connection.close()

方式二:通过copy_from方式直接从csv文件拷贝到数据库,速度提高30倍

  • csv文件中,json字段中有英文逗号,导致python识别时自动以逗号分隔字符,网上一堆方法都不好用,直接修改系统文件分割方式
  • csv文件中的空值(对应数据库NULL),需要在copy_from中说明
  1. 修改系统配置


    image.png

    image.png

    image.png

    改完之后,需要用excel打开csv文件,重新处理(WPS不行!)


    CSV数据样例
# -*- coding: UTF-8 -*-
import psycopg2
import csv
from io import StringIO

connection = psycopg2.connect(
    host="xxx",
    port="xxx",
    database="xxx",
    user="xxx",
    password="xxx"
)
cursor = connection.cursor()
with open('data1.csv', mode='r') as f:
    reader = csv.reader(f, delimiter='|')
    # 创建一个内存文件对象用于存储预处理后的CSV数据
    output = StringIO()
    # 处理每一行数据
    for row in reader:
        output.write('|'.join(row) + '\n')

    # 移动到内存文件的开头
    output.seek(0)
    # print(output.getvalue())
    # 使用copy_from方法将数据插入PostgreSQL
    cursor.copy_from(output, 'mo_mfg_order', null='', sep='|', columns=('id', 'name', 'age', 'extra'))
connection.commit()
cursor.close()
connection.close()

方式三:在方式二的基础上增加多线程

import psycopg2
import csv
from io import StringIO
from concurrent.futures import ThreadPoolExecutor

# 批量处理的数据量
BATCH_SIZE = 2000   #可调节
def process_batch(batch):
    # 数据库连接配置
    connection = psycopg2.connect(
        host="dev-new.leadigital.net",
        port="15435",
        database="jicimi_biz_dc_dev_1",
        user="cyf",
        password="123456"
    )
    cursor = connection.cursor()
    output = StringIO()
    for row1 in batch:
        output.write('|'.join(row1) + '\n')
    output.seek(0)
    cursor.copy_from(output, 'mo_mfg_order', null='', sep='|', columns=('id', 'name', 'age', 'extra'))
    # 提交事务并关闭连接
    connection.commit()
    cursor.close()
    connection.close()


# 打开CSV文件并分批处理数据
with open('data1.csv', mode='r') as f:
    reader = csv.reader(f, delimiter='|')
    batch = []
    with ThreadPoolExecutor(max_workers=4) as executor:      #可调节
        for row in reader:
            batch.append(row)
            if len(batch) == BATCH_SIZE:
                executor.submit(process_batch, batch)
                batch = []

# 处理剩余的数据
        if batch:
                executor.submit(process_batch, batch)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。