hyb
2025-05-13 118d12ced0a4590d38d1112adb63bbb2eee3aa84
初始化相关文档和脚本
11 files added
201 ■■■■■ changed files
测试组/手册/SQLmap使用教程.docx patch | view | raw | blame | history
测试组/模版/性能测试报告模版.docx patch | view | raw | blame | history
测试组/模版/测试用例模板.xlsx patch | view | raw | blame | history
测试组/模版/软件测试报告模版.docx patch | view | raw | blame | history
测试组/统计/2024年10月和11月测试团队工作汇总报告.docx patch | view | raw | blame | history
测试组/统计/2024年12月份测试团队工作汇总报告.docx patch | view | raw | blame | history
测试组/统计/2024年9月测试团队汇总报告.docx patch | view | raw | blame | history
测试组/统计/2025年Q1测试团队项目进度及深度分析报告.docx patch | view | raw | blame | history
测试组/脚本/Change_password/dbExcel/数据库信息.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/异步批量修改数据库的哈希密码和原始密码.py 201 ●●●●● patch | view | raw | blame | history
测试组/脚本/Change_password/批量更新数据库用户密码脚本使用手册.docx patch | view | raw | blame | history
测试组/手册/SQLmap使用教程.docx
Binary files differ
测试组/模版/性能测试报告模版.docx
Binary files differ
测试组/模版/测试用例模板.xlsx
Binary files differ
测试组/模版/软件测试报告模版.docx
Binary files differ
测试组/统计/2024年10月和11月测试团队工作汇总报告.docx
Binary files differ
测试组/统计/2024年12月份测试团队工作汇总报告.docx
Binary files differ
测试组/统计/2024年9月测试团队汇总报告.docx
Binary files differ
测试组/统计/2025年Q1测试团队项目进度及深度分析报告.docx
Binary files differ
测试组/脚本/Change_password/dbExcel/数据库信息.xlsx
Binary files differ
测试组/脚本/Change_password/异步批量修改数据库的哈希密码和原始密码.py
New file
@@ -0,0 +1,201 @@
import pandas as pd
import bcrypt
import mysql.connector
from mysql.connector import Error
import asyncio
# === 配置 ===
excel_path = 'dbExcel/数据库信息.xlsx'  # Excel 文件名
log_csv_path = '更新日志.csv'  # 输出日志 CSV
new_password_plain = 'Baoyi@1341'  # 新的明文密码
user_count = -1  # 每库更新前 N 个用户,-1 表示全部
protected_user_id = 1  # 需要排除的用户ID
# === 日志列表与锁 ===
log_records = []
log_lock = asyncio.Lock()
# === 异步处理单个数据库 ===
async def update_database(project, host, database, user, password, user_filter=None):
    try:
        conn = await asyncio.to_thread(mysql.connector.connect,
                                       host=host,
                                       port=3306,
                                       user=user,
                                       password=password,
                                       database=database,
                                       use_pure=True
                                       )
        cursor = conn.cursor()
        # ✅ 判断是否有 plain_text 字段
        cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
        has_plain_text = cursor.fetchone() is not None
        # 构造基础查询语句
        base_sql = """
            SELECT id, login_name
            FROM sys_user
            WHERE id != %s
        """  # 默认排除保护用户
        params = [protected_user_id]
        # 添加用户筛选条件
        if isinstance(user_filter, list):  # 用户名列表模式
            placeholders = ','.join(['%s'] * len(user_filter))
            base_sql += f" AND login_name IN ({placeholders})"
            params.extend(user_filter)
        elif user_filter == "LIMIT":  # 限制数量模式
            if user_count != -1:
                base_sql += f" LIMIT {user_count}"
        cursor.execute(base_sql, params)
        users = cursor.fetchall()
        if not users:
            print(f"❌ [{project}] 未找到符合条件的用户,跳过。")
            return
        for uid, login_name in users:
            salt = bcrypt.gensalt(rounds=10)
            hashed = bcrypt.hashpw(new_password_plain.encode('utf-8'), salt).decode('utf-8')
            # ✅ 构造更新 SQL
            if has_plain_text:
                update_sql = """
                    UPDATE sys_user
                    SET plain_text = %s, password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (new_password_plain, hashed, uid))
            else:
                update_sql = """
                    UPDATE sys_user
                    SET password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (hashed, uid))
            # ✅ 写入日志
            async with log_lock:
                log_records.append({
                    '项目': project,
                    '数据库': database,
                    '用户ID': uid,
                    '用户名': login_name,
                    '明文密码': new_password_plain if has_plain_text else '未更新',
                    '哈希密码': hashed
                })
            print(f"✅ [{project}] 用户 {login_name} (ID:{uid}) 更新成功")
        conn.commit()
        print(f"✅ [{project}] 共 {len(users)} 个用户更新完成")
    except Error as e:
        print(f"❌ [{project}] 数据库操作失败:{e}")
        async with log_lock:
            log_records.append({
                '项目': project,
                '数据库': database,
                '用户ID': '连接失败',
                '用户名': '',
                '明文密码': '',
                '哈希密码': f'错误信息: {e}'
            })
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()
        if 'conn' in locals() and conn:
            conn.close()
# === 主异步函数 ===
async def main():
    # 读取 Excel 数据
    try:
        df = pd.read_excel(excel_path)
    except Exception as e:
        print(f"❌ 读取Excel文件失败:{e}")
        return
    # 获取所有唯一项目列表
    all_projects = df['项目'].unique().tolist()
    if not all_projects:
        print("❌ Excel 文件中未找到任何项目")
        return
    # === 项目选择 ===
    print("\n请选择要处理的项目(输入编号,多个用逗号分隔):")
    for idx, project in enumerate(all_projects, 1):
        print(f"  [{idx}] {project}")
    while True:
        selected = input("\n请输入项目编号(例如 1,3):").strip()
        if not selected:
            print("⚠️ 未选择项目,默认处理全部")
            selected_projects = all_projects
            break
        try:
            indexes = [int(n.strip()) - 1 for n in selected.replace(',', ',').split(',')]
            selected_projects = [all_projects[i] for i in indexes]
            if not selected_projects:
                raise ValueError("至少选择一个有效项目")
            break
        except (ValueError, IndexError) as e:
            print(f"❌ 输入无效:{e},请重新输入")
    # === 操作模式选择 ===
    print("\n请选择操作模式:")
    print("  [1] 批量更新所有用户(排除ID为1的用户)")
    print("  [2] 指定用户名更新")
    mode = input("请输入模式编号(1/2):").strip()
    user_filter = None
    if mode == '2':
        while True:
            usernames = input("请输入用户名(多个用逗号分隔):").strip()
            if not usernames:
                print("⚠️ 用户名不能为空")
                continue
            user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
            if len(user_filter) == 0:
                print("❌ 至少输入一个有效用户名")
            else:
                break
    elif mode != '1':
        print("⚠️ 输入无效,默认使用批量模式")
        mode = '1'
    # 过滤 DataFrame
    df_filtered = df[df['项目'].isin(selected_projects)]
    if df_filtered.empty:
        print("❌ 筛选后无有效项目,请检查输入")
        return
    # 创建异步任务
    tasks = []
    for _, row in df_filtered.iterrows():
        project, host, database, user, password = row[:5]
        print(f"\n🚀 准备处理项目:{project} | 数据库:{database}")
        tasks.append(update_database(
            project, host, database, user, password,
            user_filter=user_filter if mode == '2' else None
        ))
    await asyncio.gather(*tasks)
    # 保存日志
    if log_records:
        pd.DataFrame(log_records).to_csv(log_csv_path, index=False, encoding='utf-8-sig')
        print(f"\n✅ 所有更新日志已保存至 {log_csv_path}")
    else:
        print("⚠️ 无任何更新记录生成")
# === 启动入口 ===
if __name__ == '__main__':
    asyncio.run(main())
测试组/脚本/Change_password/批量更新数据库用户密码脚本使用手册.docx
Binary files differ