import pandas as pd import bcrypt import mysql.connector from mysql.connector import Error import asyncio import os from datetime import datetime from typing import List, Dict, Any, Optional import re # === 配置 === EXCEL_PATH = 'dbExcel/数据库信息.xlsx' # Excel 文件名 LOG_CSV_PATH = '更新日志.csv' # 输出日志 CSV EXPORT_DIR = '用户信息导出' # 导出文件目录 NEW_PASSWORD_PLAIN = 'Baoyi@1341' # 新的明文密码 USER_COUNT = -1 # 每库更新前 N 个用户,-1 表示全部 PROTECTED_USER_ID = 1 # 需要排除的用户ID # === 日志列表与锁 === log_records = [] log_lock = asyncio.Lock() def sanitize_filename(name: str) -> str: """清理文件名,移除可能引起问题的字符""" # 移除或替换Windows文件名中不允许的字符 name = re.sub(r'[\\/*?:"<>|]', "", name) # 替换空格为下划线 name = name.replace(" ", "_") return name class DatabaseUpdater: """数据库更新器类""" def __init__(self, project: str, host: str, database: str, user: str, password: str): self.project = project self.host = host self.database = database self.db_user = user self.db_password = password self.updated_users = [] # 存储更新过的用户信息 async def connect(self) -> Optional[mysql.connector.connection.MySQLConnection]: """建立数据库连接""" try: conn = await asyncio.to_thread( mysql.connector.connect, host=self.host, port=3306, user=self.db_user, password=self.db_password, database=self.database, use_pure=True ) return conn except Error as e: print(f"❌ [{self.project}] 数据库连接失败:{e}") async with log_lock: log_records.append({ '项目': self.project, '数据库': self.database, '用户ID': '连接失败', '用户名': '', '明文密码': '', '哈希密码': f'错误信息: {e}' }) return None def check_plain_text_column(self, cursor) -> bool: """检查是否存在 plain_text 字段""" try: cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'") return cursor.fetchone() is not None except Error as e: print(f"❌ [{self.project}] 检查 plain_text 字段失败:{e}") return False def get_users_to_update(self, cursor, user_filter: Optional[List[str]] = None) -> List[tuple]: """获取需要更新的用户列表""" try: # 构造基础查询语句 base_sql = """ SELECT id, login_name, name, current_role_id FROM sys_user WHERE id != %s """ # 默认排除保护用户 params = [PROTECTED_USER_ID] # 添加用户筛选条件 if isinstance(user_filter, list) and user_filter: # 用户名列表模式 placeholders = ','.join(['%s'] * len(user_filter)) base_sql += f" AND login_name IN ({placeholders})" params.extend(user_filter) elif user_filter == "LIMIT": # 限制数量模式 if USER_COUNT != -1: base_sql += f" LIMIT {USER_COUNT}" cursor.execute(base_sql, params) return cursor.fetchall() except Error as e: print(f"❌ [{self.project}] 获取用户列表失败:{e}") return [] def update_user_password(self, cursor, user_id: int, login_name: str, has_plain_text: bool) -> bool: """更新用户密码""" try: salt = bcrypt.gensalt(rounds=10) hashed = bcrypt.hashpw(NEW_PASSWORD_PLAIN.encode('utf-8'), salt).decode('utf-8') # 构造更新 SQL if has_plain_text: update_sql = """ UPDATE sys_user SET plain_text = %s, password = %s WHERE id = %s """ cursor.execute(update_sql, (NEW_PASSWORD_PLAIN, hashed, user_id)) else: update_sql = """ UPDATE sys_user SET password = %s WHERE id = %s """ cursor.execute(update_sql, (hashed, user_id)) return True except Error as e: print(f"❌ [{self.project}] 更新用户 {login_name} 密码失败:{e}") return False def get_role_name(self, cursor, role_id: int) -> str: """根据角色ID获取角色名称""" try: if not role_id: return "无角色" cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,)) result = cursor.fetchone() return result[0] if result else "角色不存在" except Error as e: print(f"❌ [{self.project}] 获取角色名称失败:{e}") return "获取失败" async def export_user_info(self, cursor) -> bool: """导出用户信息到Excel""" try: if not self.updated_users: print(f"⚠️ [{self.project}] 没有更新过的用户,无需导出") return False # 确保导出目录存在 os.makedirs(EXPORT_DIR, exist_ok=True) # 获取所有角色ID role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id'])) # 获取角色名称映射 role_name_map = {} for role_id in role_ids: role_name = await asyncio.to_thread(self.get_role_name, cursor, role_id) role_name_map[role_id] = role_name # 准备导出数据 export_data = [] for user in self.updated_users: export_data.append({ '用户名': user['login_name'], '密码': NEW_PASSWORD_PLAIN, # 使用新密码 '姓名': user['name'], '角色名称': role_name_map.get(user['role_id'], '无角色') }) # 创建DataFrame并导出 df_export = pd.DataFrame(export_data) # 按照要求格式化文件名:项目名称测试环境账号密码_年月日 safe_project_name = sanitize_filename(self.project) date_str = datetime.now().strftime("%Y%m%d") filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx" filepath = os.path.join(EXPORT_DIR, filename) # 移除 encoding 参数,因为 to_excel 方法不支持它 await asyncio.to_thread(df_export.to_excel, filepath, index=False) print(f"✅ [{self.project}] 用户信息已导出到: {filepath}") return True except Exception as e: print(f"❌ [{self.project}] 导出用户信息失败:{e}") return False async def update_database(self, user_filter: Optional[List[str]] = None) -> bool: """更新数据库中的用户密码""" conn = None cursor = None try: conn = await self.connect() if not conn: return False # 使用同步方式创建游标 cursor = conn.cursor() # 检查是否有 plain_text 字段 has_plain_text = await asyncio.to_thread(self.check_plain_text_column, cursor) # 获取需要更新的用户 users = await asyncio.to_thread(self.get_users_to_update, cursor, user_filter) if not users: print(f"❌ [{self.project}] 未找到符合条件的用户,跳过。") return False # 更新每个用户的密码 for uid, login_name, name, current_role_id in users: success = await asyncio.to_thread( self.update_user_password, cursor, uid, login_name, has_plain_text ) if success: # 记录更新成功的用户信息 self.updated_users.append({ 'id': uid, 'login_name': login_name, 'name': name, 'role_id': current_role_id }) # 写入日志 async with log_lock: log_records.append({ '项目': self.project, '数据库': self.database, '用户ID': uid, '用户名': login_name, '明文密码': NEW_PASSWORD_PLAIN if has_plain_text else '未更新', '哈希密码': bcrypt.hashpw(NEW_PASSWORD_PLAIN.encode('utf-8'), bcrypt.gensalt(rounds=10)).decode('utf-8') }) print(f"✅ [{self.project}] 用户 {login_name} (ID:{uid}) 更新成功") # 提交事务 conn.commit() print(f"✅ [{self.project}] 共 {len(self.updated_users)} 个用户更新完成") return True except Error as e: print(f"❌ [{self.project}] 数据库操作失败:{e}") async with log_lock: log_records.append({ '项目': self.project, '数据库': self.database, '用户ID': '操作失败', '用户名': '', '明文密码': '', '哈希密码': f'错误信息: {e}' }) return False finally: if cursor: cursor.close() if conn: conn.close() async def main(): """主异步函数""" # 读取 Excel 数据 try: df = pd.read_excel(EXCEL_PATH) except Exception as e: print(f"❌ 读取Excel文件失败:{e}") return # 获取所有唯一项目列表 all_projects = df['项目'].unique().tolist() if not all_projects: print("❌ Excel 文件中未找到任何项目") return # === 项目选择 === print("\n请选择要处理的项目(输入编号,多个用逗号分隔):") for idx, project in enumerate(all_projects, 1): print(f" [{idx}] {project}") while True: selected = input("\n请输入项目编号(例如 1,3):").strip() if not selected: print("⚠️ 未选择项目,默认处理全部") selected_projects = all_projects break try: indexes = [int(n.strip()) - 1 for n in selected.replace(',', ',').split(',')] selected_projects = [all_projects[i] for i in indexes] if not selected_projects: raise ValueError("至少选择一个有效项目") break except (ValueError, IndexError) as e: print(f"❌ 输入无效:{e},请重新输入") # === 操作模式选择 === print("\n请选择操作模式:") print(" [1] 批量更新所有用户(排除ID为1的用户)") print(" [2] 指定用户名更新") mode = input("请输入模式编号(1/2):").strip() user_filter = None if mode == '2': while True: usernames = input("请输入用户名(多个用逗号分隔):").strip() if not usernames: print("⚠️ 用户名不能为空") continue user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')] if len(user_filter) == 0: print("❌ 至少输入一个有效用户名") else: break elif mode != '1': print("⚠️ 输入无效,默认使用批量模式") mode = '1' # 过滤 DataFrame df_filtered = df[df['项目'].isin(selected_projects)] if df_filtered.empty: print("❌ 筛选后无有效项目,请检查输入") return # 创建数据库更新器实例 updaters = [] for _, row in df_filtered.iterrows(): project, host, database, user, password = row[:5] print(f"\n🚀 准备处理项目:{project} | 数据库:{database}") updater = DatabaseUpdater(project, host, database, user, password) updaters.append(updater) # 执行更新任务 tasks = [] for updater in updaters: tasks.append(updater.update_database( user_filter=user_filter if mode == '2' else "LIMIT" )) await asyncio.gather(*tasks) # 询问是否导出用户信息 for updater in updaters: if updater.updated_users: export_choice = input(f"\n是否导出项目 {updater.project} 的用户信息? (y/N): ").strip().lower() if export_choice == 'y' or export_choice == 'yes': try: conn = await updater.connect() if conn: cursor = conn.cursor() await updater.export_user_info(cursor) cursor.close() conn.close() except Exception as e: print(f"❌ 导出项目 {updater.project} 用户信息时出错: {e}") else: print(f"⚠️ 跳过项目 {updater.project} 的用户信息导出") # 保存日志 if log_records: # 保存日志时使用正确的编码参数(CSV文件需要) await asyncio.to_thread( pd.DataFrame(log_records).to_csv, LOG_CSV_PATH, index=False, encoding='utf-8-sig' ) print(f"\n✅ 所有更新日志已保存至 {LOG_CSV_PATH}") else: print("⚠️ 无任何更新记录生成") # === 启动入口 === if __name__ == '__main__': asyncio.run(main())