| New file |
| | |
| | | import pandas as pd |
| | | import bcrypt |
| | | import mysql.connector |
| | | from mysql.connector import Error |
| | | import asyncio |
| | | |
| | | # === 配置 === |
| | | excel_path = 'dbExcel/数据库信息.xlsx' # Excel 文件名 |
| | | log_csv_path = '更新日志.csv' # 输出日志 CSV |
| | | new_password_plain = 'Baoyi@1341' # 新的明文密码 |
| | | user_count = -1 # 每库更新前 N 个用户,-1 表示全部 |
| | | protected_user_id = 1 # 需要排除的用户ID |
| | | |
| | | # === 日志列表与锁 === |
| | | log_records = [] |
| | | log_lock = asyncio.Lock() |
| | | |
| | | |
| | | # === 异步处理单个数据库 === |
| | | async def update_database(project, host, database, user, password, user_filter=None): |
| | | try: |
| | | conn = await asyncio.to_thread(mysql.connector.connect, |
| | | host=host, |
| | | port=3306, |
| | | user=user, |
| | | password=password, |
| | | database=database, |
| | | use_pure=True |
| | | ) |
| | | cursor = conn.cursor() |
| | | |
| | | # ✅ 判断是否有 plain_text 字段 |
| | | cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'") |
| | | has_plain_text = cursor.fetchone() is not None |
| | | |
| | | # 构造基础查询语句 |
| | | base_sql = """ |
| | | SELECT id, login_name |
| | | FROM sys_user |
| | | WHERE id != %s |
| | | """ # 默认排除保护用户 |
| | | |
| | | params = [protected_user_id] |
| | | |
| | | # 添加用户筛选条件 |
| | | if isinstance(user_filter, list): # 用户名列表模式 |
| | | placeholders = ','.join(['%s'] * len(user_filter)) |
| | | base_sql += f" AND login_name IN ({placeholders})" |
| | | params.extend(user_filter) |
| | | elif user_filter == "LIMIT": # 限制数量模式 |
| | | if user_count != -1: |
| | | base_sql += f" LIMIT {user_count}" |
| | | |
| | | cursor.execute(base_sql, params) |
| | | users = cursor.fetchall() |
| | | |
| | | if not users: |
| | | print(f"❌ [{project}] 未找到符合条件的用户,跳过。") |
| | | return |
| | | |
| | | for uid, login_name in users: |
| | | salt = bcrypt.gensalt(rounds=10) |
| | | hashed = bcrypt.hashpw(new_password_plain.encode('utf-8'), salt).decode('utf-8') |
| | | |
| | | # ✅ 构造更新 SQL |
| | | if has_plain_text: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET plain_text = %s, password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (new_password_plain, hashed, uid)) |
| | | else: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (hashed, uid)) |
| | | |
| | | # ✅ 写入日志 |
| | | async with log_lock: |
| | | log_records.append({ |
| | | '项目': project, |
| | | '数据库': database, |
| | | '用户ID': uid, |
| | | '用户名': login_name, |
| | | '明文密码': new_password_plain if has_plain_text else '未更新', |
| | | '哈希密码': hashed |
| | | }) |
| | | print(f"✅ [{project}] 用户 {login_name} (ID:{uid}) 更新成功") |
| | | |
| | | conn.commit() |
| | | print(f"✅ [{project}] 共 {len(users)} 个用户更新完成") |
| | | |
| | | except Error as e: |
| | | print(f"❌ [{project}] 数据库操作失败:{e}") |
| | | async with log_lock: |
| | | log_records.append({ |
| | | '项目': project, |
| | | '数据库': database, |
| | | '用户ID': '连接失败', |
| | | '用户名': '', |
| | | '明文密码': '', |
| | | '哈希密码': f'错误信息: {e}' |
| | | }) |
| | | |
| | | finally: |
| | | if 'cursor' in locals() and cursor: |
| | | cursor.close() |
| | | if 'conn' in locals() and conn: |
| | | conn.close() |
| | | |
| | | |
| | | # === 主异步函数 === |
| | | async def main(): |
| | | # 读取 Excel 数据 |
| | | try: |
| | | df = pd.read_excel(excel_path) |
| | | except Exception as e: |
| | | print(f"❌ 读取Excel文件失败:{e}") |
| | | return |
| | | |
| | | # 获取所有唯一项目列表 |
| | | all_projects = df['项目'].unique().tolist() |
| | | if not all_projects: |
| | | print("❌ Excel 文件中未找到任何项目") |
| | | return |
| | | |
| | | # === 项目选择 === |
| | | print("\n请选择要处理的项目(输入编号,多个用逗号分隔):") |
| | | for idx, project in enumerate(all_projects, 1): |
| | | print(f" [{idx}] {project}") |
| | | |
| | | while True: |
| | | selected = input("\n请输入项目编号(例如 1,3):").strip() |
| | | if not selected: |
| | | print("⚠️ 未选择项目,默认处理全部") |
| | | selected_projects = all_projects |
| | | break |
| | | |
| | | try: |
| | | indexes = [int(n.strip()) - 1 for n in selected.replace(',', ',').split(',')] |
| | | selected_projects = [all_projects[i] for i in indexes] |
| | | if not selected_projects: |
| | | raise ValueError("至少选择一个有效项目") |
| | | break |
| | | except (ValueError, IndexError) as e: |
| | | print(f"❌ 输入无效:{e},请重新输入") |
| | | |
| | | # === 操作模式选择 === |
| | | print("\n请选择操作模式:") |
| | | print(" [1] 批量更新所有用户(排除ID为1的用户)") |
| | | print(" [2] 指定用户名更新") |
| | | mode = input("请输入模式编号(1/2):").strip() |
| | | |
| | | user_filter = None |
| | | if mode == '2': |
| | | while True: |
| | | usernames = input("请输入用户名(多个用逗号分隔):").strip() |
| | | if not usernames: |
| | | print("⚠️ 用户名不能为空") |
| | | continue |
| | | user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')] |
| | | if len(user_filter) == 0: |
| | | print("❌ 至少输入一个有效用户名") |
| | | else: |
| | | break |
| | | elif mode != '1': |
| | | print("⚠️ 输入无效,默认使用批量模式") |
| | | mode = '1' |
| | | |
| | | # 过滤 DataFrame |
| | | df_filtered = df[df['项目'].isin(selected_projects)] |
| | | if df_filtered.empty: |
| | | print("❌ 筛选后无有效项目,请检查输入") |
| | | return |
| | | |
| | | # 创建异步任务 |
| | | tasks = [] |
| | | for _, row in df_filtered.iterrows(): |
| | | project, host, database, user, password = row[:5] |
| | | print(f"\n🚀 准备处理项目:{project} | 数据库:{database}") |
| | | tasks.append(update_database( |
| | | project, host, database, user, password, |
| | | user_filter=user_filter if mode == '2' else None |
| | | )) |
| | | |
| | | await asyncio.gather(*tasks) |
| | | |
| | | # 保存日志 |
| | | if log_records: |
| | | pd.DataFrame(log_records).to_csv(log_csv_path, index=False, encoding='utf-8-sig') |
| | | print(f"\n✅ 所有更新日志已保存至 {log_csv_path}") |
| | | else: |
| | | print("⚠️ 无任何更新记录生成") |
| | | |
| | | |
| | | # === 启动入口 === |
| | | if __name__ == '__main__': |
| | | asyncio.run(main()) |