程序员求职经验分享与学习资料整理平台

网站首页 > 文章精选 正文

Python mysql批量插入数据 代码高度复用

balukai 2024-12-23 16:52:11 文章精选 51 ℃

一、应用场景

上篇文章我们学到了mysql的事务处理,在业务开发中,事务往往跟批量提交sql语句相关,而批量提交sql设计到重复生成sql语句。如果我们针对每个数据库表、每个不固定的插入字段都写一个方法进行维护,那代码复用率太低,开发效率太低,维护成本太高。

二、解决思路

为了解决上面提出的问题,我在这里提出如下思路:

  • 所有插入语句的生成都共用一个入口
  • 插入的数据库字段和参数可动态决定
  • 要操作的表名可动态决定

三、解决办法

既然涉及到动态参数,那我把要插入的数据库表参数存入到字典中,从字典中拿出要存的数据。然后根据key对应的value值,拼成sql语句(涉及到for循环的知识点:Python for循环几种常用场景)。最后表名也是传参进来。

def common_insert_sql(item, table):
    """
    从字典item的key-value提取要插入的字段,进行插入
    :param item: 要执行更新的字段的字典,{"colunm_name_1":"colunm_value_1","colunm_name_2":"colunm_value_2"}
    :param table: 数据库表名
    :return: 返回完整的拼接好的sql插入语句
    """
    keys = ', '.join(item.keys())
    values = ', '.join(['%s'] * len(item))
    insert_str = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
    return insert_str
insert_sql = common_insert_sql(item, table)
# execute有两种方法,一种是直接执行sql,另一种是参数封装到元组中。
cursor.execute(insert_sql, tuple(item.values()))

四、测试demo

# -*- coding: utf-8 -*-
"""
@Time : 2022/1/28 17:33
@Auth : 技术空间
@File :handle_list_sql_demo.py
@IDE :PyCharm
@Motto:技术总是要日积月累的

"""
import pymysql
def common_insert_sql(item, table):
    """
    从item的key-value提取要插入的字段,进行插入
    :param item: 要执行更新的字段的字典,{"colunm_name_1":"colunm_value_1","colunm_name_2":"colunm_value_2"}
    :param table: 数据库表名
    :return: 返回完整的拼接好的sql插入语句
    """
    keys = ', '.join(item.keys())
    values = ', '.join(['%s'] * len(item))
    insert_str = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
    return insert_str


def select_list(db_cursor, sql):
    """
    查询数据量表的列表
    :param db_cursor: 游标
    :param sql: 拼接好的sql查询语句
    :return: 返回查询结果列表
    """
    db_cursor.execute(sql)
    data_list = db_cursor.fetchall()
    print(data_list)
    return data_list

if __name__ == '__main__':
    """
    模拟要插入的数据,我们可以把这模拟数据想象成真实开发场景中的批量新增用户、批量新增用户角色等。
    这里我们把上篇文章的查询列表代码单独抽出来到一个方法里,提高代码复用率。
    """
    db = pymysql.connect(host='localhost',
                         user='root',
                         password='root',
                         database='others')

    cursor = db.cursor(pymysql.cursors.DictCursor)
    # 开始批量插入表1
    table_1 = "user_info"
    table_2 = "user_role"
    select_user_sql = " select id,name from " + table_1
    select_role_sql = " select user_id,role_id from " + table_2
    try:
        print("执行批量插入user前的数据-->")
        select_list(cursor, select_user_sql)
        insert_user_list = []
        for i in range(3):
            insert_user_list.append({"name": "code_space_" + str(i + 1)})
        for insert in insert_user_list:
            insert_sql = common_insert_sql(insert, table_1)
            cursor.execute(insert_sql, tuple(insert.values()))

        # 开始批量插入表2
        print("执行批量插入user_role前的数据-->")
        select_list(cursor, select_role_sql)
        insert_role_list = []
        for i in range(3):
            insert_role_list.append({"user_id": i + 4, "role_id": 1})
        for insert in insert_role_list:
            insert_sql = common_insert_sql(insert, table_2)
            cursor.execute(insert_sql, tuple(insert.values()))

    except Exception as e:
        # 事务回滚
        db.rollback()
        print('事务处理失败', e)
    else:
        # 事务提交
        db.commit()
        print('事务处理成功', cursor.rowcount)
        print("执行批量插入user后的数据-->")
        select_list(cursor, select_user_sql)
        print("执行批量插入user_role后的数据-->")
        select_list(cursor, select_role_sql)
    cursor.close()
    db.close()

五、拓展

既然批量插入我们都可以完成耦合了,那批量更新也是不在话下了。批量更新稍微有一点不同,但是问题不大。下一篇我将会讲解关于批量更新的复用代码。还请多多关注。

关注我,坚持每日积累一个技巧,长期坚持,我们将会不断进步。

#程序员##python##数据库##请回答,你的年度知识点##教育听我说#

最近发表
标签列表