import pandas as pd import bcrypt import mysql.connector from mysql.connector import Error import asyncio # === 配置 === excel_path = 'dbExcel/数据库信息.xlsx' # Excel 文件名 log_csv_path = '更新日志.csv' # 输出日志 CSV new_password_plain = 'Baoyi@1341' # 新的明文密码 user_count = -1 # 每库更新前 N 个用户,-1 表示全部 protected_user_id = 1 # 需要排除的用户ID # === 日志列表与锁 === log_records = [] log_lock = asyncio.Lock() # === 异步处理单个数据库 === async def update_database(project, host, database, user, password, user_filter=None): try: conn = await asyncio.to_thread(mysql.connector.connect, host=host, port=3306, user=user, password=password, database=database, use_pure=True ) cursor = conn.cursor() # ✅ 判断是否有 plain_text 字段 cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'") has_plain_text = cursor.fetchone() is not None # 构造基础查询语句 base_sql = """ SELECT id, login_name FROM sys_user WHERE id != %s """ # 默认排除保护用户 params = [protected_user_id] # 添加用户筛选条件 if isinstance(user_filter, list): # 用户名列表模式 placeholders = ','.join(['%s'] * len(user_filter)) base_sql += f" AND login_name IN ({placeholders})" params.extend(user_filter) elif user_filter == "LIMIT": # 限制数量模式 if user_count != -1: base_sql += f" LIMIT {user_count}" cursor.execute(base_sql, params) users = cursor.fetchall() if not users: print(f"❌ [{project}] 未找到符合条件的用户,跳过。") return for uid, login_name in users: salt = bcrypt.gensalt(rounds=10) hashed = bcrypt.hashpw(new_password_plain.encode('utf-8'), salt).decode('utf-8') # ✅ 构造更新 SQL if has_plain_text: update_sql = """ UPDATE sys_user SET plain_text = %s, password = %s WHERE id = %s """ cursor.execute(update_sql, (new_password_plain, hashed, uid)) else: update_sql = """ UPDATE sys_user SET password = %s WHERE id = %s """ cursor.execute(update_sql, (hashed, uid)) # ✅ 写入日志 async with log_lock: log_records.append({ '项目': project, '数据库': database, '用户ID': uid, '用户名': login_name, '明文密码': new_password_plain if has_plain_text else '未更新', '哈希密码': hashed }) print(f"✅ [{project}] 用户 {login_name} (ID:{uid}) 更新成功") conn.commit() print(f"✅ [{project}] 共 {len(users)} 个用户更新完成") except Error as e: print(f"❌ [{project}] 数据库操作失败:{e}") async with log_lock: log_records.append({ '项目': project, '数据库': database, '用户ID': '连接失败', '用户名': '', '明文密码': '', '哈希密码': f'错误信息: {e}' }) finally: if 'cursor' in locals() and cursor: cursor.close() if 'conn' in locals() and conn: conn.close() # === 主异步函数 === async def main(): # 读取 Excel 数据 try: df = pd.read_excel(excel_path) except Exception as e: print(f"❌ 读取Excel文件失败:{e}") return # 获取所有唯一项目列表 all_projects = df['项目'].unique().tolist() if not all_projects: print("❌ Excel 文件中未找到任何项目") return # === 项目选择 === print("\n请选择要处理的项目(输入编号,多个用逗号分隔):") for idx, project in enumerate(all_projects, 1): print(f" [{idx}] {project}") while True: selected = input("\n请输入项目编号(例如 1,3):").strip() if not selected: print("⚠️ 未选择项目,默认处理全部") selected_projects = all_projects break try: indexes = [int(n.strip()) - 1 for n in selected.replace(',', ',').split(',')] selected_projects = [all_projects[i] for i in indexes] if not selected_projects: raise ValueError("至少选择一个有效项目") break except (ValueError, IndexError) as e: print(f"❌ 输入无效:{e},请重新输入") # === 操作模式选择 === print("\n请选择操作模式:") print(" [1] 批量更新所有用户(排除ID为1的用户)") print(" [2] 指定用户名更新") mode = input("请输入模式编号(1/2):").strip() user_filter = None if mode == '2': while True: usernames = input("请输入用户名(多个用逗号分隔):").strip() if not usernames: print("⚠️ 用户名不能为空") continue user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')] if len(user_filter) == 0: print("❌ 至少输入一个有效用户名") else: break elif mode != '1': print("⚠️ 输入无效,默认使用批量模式") mode = '1' # 过滤 DataFrame df_filtered = df[df['项目'].isin(selected_projects)] if df_filtered.empty: print("❌ 筛选后无有效项目,请检查输入") return # 创建异步任务 tasks = [] for _, row in df_filtered.iterrows(): project, host, database, user, password = row[:5] print(f"\n🚀 准备处理项目:{project} | 数据库:{database}") tasks.append(update_database( project, host, database, user, password, user_filter=user_filter if mode == '2' else None )) await asyncio.gather(*tasks) # 保存日志 if log_records: pd.DataFrame(log_records).to_csv(log_csv_path, index=False, encoding='utf-8-sig') print(f"\n✅ 所有更新日志已保存至 {log_csv_path}") else: print("⚠️ 无任何更新记录生成") # === 启动入口 === if __name__ == '__main__': asyncio.run(main())