import sys import os import pandas as pd import mysql.connector from mysql.connector import Error from datetime import datetime from typing import List, Dict, Any, Optional import re from PyQt5 import QtWidgets, QtCore, QtGui from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget, QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar, QGroupBox, QRadioButton, QButtonGroup, QTabWidget) from PyQt5.QtCore import Qt, pyqtSignal, QThread # 判断是否是打包后的可执行文件 if getattr(sys, 'frozen', False): # 如果是可执行文件,使用可执行文件所在目录作为基础路径 BASE_DIR = os.path.dirname(sys.executable) else: # 如果是脚本,使用脚本所在目录 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # 修改默认路径配置 DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx') DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv') DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出') DEFAULT_NEW_PASSWORD = 'Baoyi@1341' PROTECTED_USER_ID = 1 # Redis配置 REDIS_HOST = '192.168.6.168' REDIS_PORT = 6002 REDIS_PASSWORD = None # 导入Argon2PasswordEncoder相关库 try: from argon2 import PasswordHasher except ImportError: print("请安装argon2-cffi库: pip install argon2-cffi") sys.exit(1) # 配置Argon2参数 SALT_LENGTH = 32 HASH_LENGTH = 64 PARALLELISM = 2 MEMORY = 65536 # 64MB ITERATIONS = 5 # 创建Argon2密码编码器实例 argon2_hasher = PasswordHasher( time_cost=ITERATIONS, memory_cost=MEMORY, parallelism=PARALLELISM, hash_len=HASH_LENGTH, salt_len=SALT_LENGTH ) def encode_password(raw_password): """使用Argon2编码密码""" return argon2_hasher.hash(raw_password) class RedisRefresher: """Redis缓存刷新器""" def __init__(self): self.redis_client = None def connect_redis(self): """连接Redis服务器""" try: # 尝试导入redis库 import redis self.redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=True ) # 测试连接 self.redis_client.ping() return True except ImportError: print("错误: 未安装redis库,请使用 'pip install redis' 安装") return False except Exception as e: print(f"连接Redis失败: {str(e)}") return False def refresh_all_redis(self): """刷新所有Redis数据库""" try: if not self.redis_client: if not self.connect_redis(): return False, "无法连接Redis服务器" # 获取所有数据库数量 try: # 尝试获取配置信息 config = self.redis_client.config_get('databases') db_count = int(config.get('databases', 16)) except: # 如果无法获取配置,默认使用16个数据库 db_count = 16 refreshed_dbs = 0 total_keys = 0 # 遍历所有数据库并刷新 for db_index in range(db_count): try: # 切换到指定数据库 temp_client = self.redis_client.connection_pool.get_connection() temp_client.send_command('SELECT', db_index) temp_client.read_response() # 获取当前数据库的key数量 temp_client.send_command('DBSIZE') key_count = temp_client.read_response() if key_count > 0: # 清空当前数据库 temp_client.send_command('FLUSHDB') temp_client.read_response() refreshed_dbs += 1 total_keys += key_count print(f"已刷新数据库 {db_index}: 清除了 {key_count} 个键") self.redis_client.connection_pool.release(temp_client) except Exception as e: print(f"刷新数据库 {db_index} 时出错: {str(e)}") continue return True, f"成功刷新 {refreshed_dbs} 个Redis数据库,共清除 {total_keys} 个键" except Exception as e: return False, f"刷新Redis缓存时发生错误: {str(e)}" class DatabaseUpdater(QThread): """数据库更新器类,运行在单独的线程中""" # 定义信号 log_signal = pyqtSignal(str) progress_signal = pyqtSignal(int) finished_signal = pyqtSignal() error_signal = pyqtSignal(str) export_finished_signal = pyqtSignal(str) def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None): super().__init__() self.excel_path = excel_path self.new_password = new_password self.mode = mode # 'all' 或 'specific' self.user_filter = user_filter self.selected_projects = selected_projects self.updaters = [] self.log_records = [] self.is_running = True self.redis_refresher = RedisRefresher() def run(self): """主运行方法""" try: # 读取 Excel 数据 try: df = pd.read_excel(self.excel_path) except Exception as e: self.error_signal.emit(f"读取Excel文件失败:{e}") return # 获取所有唯一项目列表 all_projects = df['项目'].unique().tolist() if not all_projects: self.error_signal.emit("Excel 文件中未找到任何项目") return # 过滤 DataFrame if self.selected_projects: df_filtered = df[df['项目'].isin(self.selected_projects)] else: df_filtered = df if df_filtered.empty: self.error_signal.emit("筛选后无有效项目,请检查输入") return # 创建数据库更新器实例 self.updaters = [] for _, row in df_filtered.iterrows(): project, host, port, database, user, password = row[:6] self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database} | 端口:{port}") updater = ProjectUpdater(project, host, database, user, password, port) self.updaters.append(updater) # 执行更新任务 total_projects = len(self.updaters) for i, updater in enumerate(self.updaters): if not self.is_running: break self.log_signal.emit(f"正在处理项目: {updater.project}") # 运行更新 success = updater.update_database( user_filter=self.user_filter if self.mode == 'specific' else None, new_password=self.new_password ) if success: self.log_records.extend(updater.log_records) self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户") else: self.log_signal.emit(f"❌ {updater.project} 更新失败") # 更新进度 self.progress_signal.emit(int((i + 1) * 100 / total_projects)) # 保存日志 if self.log_records: pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig') self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}") else: self.log_signal.emit("无任何更新记录生成") # 数据库更新完成后,自动刷新Redis缓存 self.log_signal.emit("开始刷新Redis缓存...") success, message = self.redis_refresher.refresh_all_redis() if success: self.log_signal.emit(f"✅ {message}") else: self.log_signal.emit(f"❌ {message}") except Exception as e: self.error_signal.emit(f"运行过程中发生错误: {str(e)}") finally: self.finished_signal.emit() def stop(self): """停止运行""" self.is_running = False def export_user_info(self, project_name): """导出用户信息""" try: # 找到对应的项目更新器 updater = None for u in self.updaters: if u.project == project_name: updater = u break if not updater or not updater.updated_users: self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息") return # 连接到数据库并导出 conn = updater.connect() if conn: cursor = conn.cursor() success = updater.export_user_info(cursor, self.new_password) cursor.close() conn.close() if success: self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成") else: self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败") else: self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库") except Exception as e: self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}") class ProjectUpdater: """单个项目更新器""" def __init__(self, project, host, database, user, password, port=3306): self.project = project self.host = host self.database = database self.db_user = user self.db_password = password self.port = port self.updated_users = [] self.log_records = [] def connect(self): """建立数据库连接""" try: conn = mysql.connector.connect( host=self.host, port=self.port, user=self.db_user, password=self.db_password, database=self.database, use_pure=True ) return conn except Error as e: return None def check_plain_text_column(self, cursor): """检查是否存在 plain_text 字段""" try: cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'") return cursor.fetchone() is not None except Error: return False def get_users_to_update(self, cursor, user_filter=None): """获取需要更新的用户列表""" try: # 构造基础查询语句 base_sql = """ SELECT id, login_name, name, current_role_id FROM sys_user WHERE id != %s """ # 默认排除保护用户 params = [PROTECTED_USER_ID] # 添加用户筛选条件 if isinstance(user_filter, list) and user_filter: # 用户名列表模式 placeholders = ','.join(['%s'] * len(user_filter)) base_sql += f" AND login_name IN ({placeholders})" params.extend(user_filter) cursor.execute(base_sql, params) return cursor.fetchall() except Error: return [] def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password): """更新用户密码""" try: # 使用Argon2生成哈希密码 hashed = encode_password(new_password) # 构造更新 SQL if has_plain_text: update_sql = """ UPDATE sys_user SET plain_text = %s, password = %s WHERE id = %s """ cursor.execute(update_sql, (new_password, hashed, user_id)) else: update_sql = """ UPDATE sys_user SET password = %s WHERE id = %s """ cursor.execute(update_sql, (hashed, user_id)) return True except Error: return False def get_role_name(self, cursor, role_id): """根据角色ID获取角色名称""" try: if not role_id: return "无角色" cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,)) result = cursor.fetchone() return result[0] if result else "角色不存在" except Error: return "获取失败" def export_user_info(self, cursor, new_password): """导出用户信息到Excel""" try: if not self.updated_users: return False # 确保导出目录存在 os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True) # 获取所有角色ID role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id'])) # 获取角色名称映射 role_name_map = {} for role_id in role_ids: role_name = self.get_role_name(cursor, role_id) role_name_map[role_id] = role_name # 准备导出数据 export_data = [] for user in self.updated_users: export_data.append({ '用户名': user['login_name'], '密码': new_password, '姓名': user['name'], '角色名称': role_name_map.get(user['role_id'], '无角色') }) # 创建DataFrame并导出 df_export = pd.DataFrame(export_data) # 按照要求格式化文件名:项目名称测试环境账号密码_年月日 safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project) safe_project_name = safe_project_name.replace(" ", "_") date_str = datetime.now().strftime("%Y%m%d") filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx" filepath = os.path.join(DEFAULT_EXPORT_DIR, filename) df_export.to_excel(filepath, index=False) return True except Exception: return False def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD): """更新数据库中的用户密码""" conn = None cursor = None try: conn = self.connect() if not conn: return False # 使用同步方式创建游标 cursor = conn.cursor() # 检查是否有 plain_text 字段 has_plain_text = self.check_plain_text_column(cursor) # 获取需要更新的用户 users = self.get_users_to_update(cursor, user_filter) if not users: return False # 更新每个用户的密码 for uid, login_name, name, current_role_id in users: success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password) if success: # 记录更新成功的用户信息 self.updated_users.append({ 'id': uid, 'login_name': login_name, 'name': name, 'role_id': current_role_id }) # 写入日志 self.log_records.append({ '项目': self.project, '数据库': self.database, '用户ID': uid, '用户名': login_name, '明文密码': new_password if has_plain_text else '未更新', '哈希密码': encode_password(new_password) }) # 提交事务 conn.commit() return True except Error: return False finally: if cursor: cursor.close() if conn: conn.close() class PasswordUpdaterApp(QMainWindow): """主应用程序窗口""" def __init__(self): super().__init__() self.updater_thread = None self.init_ui() def init_ui(self): """初始化用户界面""" self.setWindowTitle('数据库密码批量修改工具') self.setGeometry(100, 100, 800, 600) # 创建中央部件和布局 central_widget = QWidget() self.setCentralWidget(central_widget) layout = QVBoxLayout(central_widget) # 创建选项卡 tabs = QTabWidget() layout.addWidget(tabs) # 配置选项卡 config_tab = QWidget() config_layout = QVBoxLayout(config_tab) # Excel文件选择 excel_group = QGroupBox("Excel配置文件") excel_layout = QHBoxLayout(excel_group) self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH) excel_browse_btn = QPushButton("浏览...") excel_browse_btn.clicked.connect(self.browse_excel_file) excel_layout.addWidget(QLabel("Excel文件路径:")) excel_layout.addWidget(self.excel_path_edit) excel_layout.addWidget(excel_browse_btn) config_layout.addWidget(excel_group) # 新密码设置 password_group = QGroupBox("新密码设置") password_layout = QHBoxLayout(password_group) self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD) self.password_edit.setEchoMode(QLineEdit.Password) password_layout.addWidget(QLabel("新密码:")) password_layout.addWidget(self.password_edit) config_layout.addWidget(password_group) # 项目选择 projects_group = QGroupBox("项目选择") projects_layout = QVBoxLayout(projects_group) self.projects_list = QListWidget() self.load_projects_btn = QPushButton("加载项目") self.load_projects_btn.clicked.connect(self.load_projects) projects_layout.addWidget(self.projects_list) projects_layout.addWidget(self.load_projects_btn) config_layout.addWidget(projects_group) # 操作模式选择 mode_group = QGroupBox("操作模式") mode_layout = QVBoxLayout(mode_group) self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)") self.specific_users_radio = QRadioButton("指定用户名更新") self.all_users_radio.setChecked(True) # 用户名输入 self.usernames_edit = QLineEdit() self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔") self.usernames_edit.setEnabled(False) # 连接信号 self.all_users_radio.toggled.connect( lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked())) self.specific_users_radio.toggled.connect( lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked())) mode_layout.addWidget(self.all_users_radio) mode_layout.addWidget(self.specific_users_radio) mode_layout.addWidget(self.usernames_edit) config_layout.addWidget(mode_group) # 进度条 self.progress_bar = QProgressBar() config_layout.addWidget(self.progress_bar) # 开始按钮 self.start_btn = QPushButton("开始更新") self.start_btn.clicked.connect(self.start_update) config_layout.addWidget(self.start_btn) # 日志选项卡 log_tab = QWidget() log_layout = QVBoxLayout(log_tab) self.log_text = QTextEdit() self.log_text.setReadOnly(True) log_layout.addWidget(QLabel("操作日志:")) log_layout.addWidget(self.log_text) # 导出选项卡 export_tab = QWidget() export_layout = QVBoxLayout(export_tab) self.export_list = QListWidget() self.export_btn = QPushButton("导出选中项目用户信息") self.export_btn.clicked.connect(self.export_user_info) export_layout.addWidget(QLabel("已更新项目列表:")) export_layout.addWidget(self.export_list) export_layout.addWidget(self.export_btn) # 添加选项卡 tabs.addTab(config_tab, "配置") tabs.addTab(log_tab, "日志") tabs.addTab(export_tab, "导出") def browse_excel_file(self): """浏览Excel文件""" file_path, _ = QFileDialog.getOpenFileName( self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)" ) if file_path: self.excel_path_edit.setText(file_path) def load_projects(self): """加载项目列表""" excel_path = self.excel_path_edit.text() if not os.path.exists(excel_path): QMessageBox.warning(self, "错误", "Excel文件不存在") return try: df = pd.read_excel(excel_path) projects = df['项目'].unique().tolist() self.projects_list.clear() for project in projects: item = QListWidgetItem(project) item.setCheckState(Qt.Unchecked) self.projects_list.addItem(item) self.log_text.append(f"已加载 {len(projects)} 个项目") except Exception as e: QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}") def start_update(self): """开始更新""" # 获取选中的项目 selected_projects = [] for i in range(self.projects_list.count()): item = self.projects_list.item(i) if item.checkState() == Qt.Checked: selected_projects.append(item.text()) if not selected_projects: QMessageBox.warning(self, "错误", "请至少选择一个项目") return # 获取操作模式 if self.all_users_radio.isChecked(): mode = 'all' user_filter = None else: mode = 'specific' usernames = self.usernames_edit.text().strip() if not usernames: QMessageBox.warning(self, "错误", "请输入要更新的用户名") return user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')] # 获取新密码 new_password = self.password_edit.text() if not new_password: QMessageBox.warning(self, "错误", "请输入新密码") return # 创建并启动更新线程 self.updater_thread = DatabaseUpdater( self.excel_path_edit.text(), new_password, mode, user_filter, selected_projects ) # 连接信号 self.updater_thread.log_signal.connect(self.log_text.append) self.updater_thread.progress_signal.connect(self.progress_bar.setValue) self.updater_thread.finished_signal.connect(self.on_update_finished) self.updater_thread.error_signal.connect(self.show_error) self.updater_thread.export_finished_signal.connect(self.log_text.append) # 禁用开始按钮 self.start_btn.setEnabled(False) # 启动线程 self.updater_thread.start() def on_update_finished(self): """更新完成后的处理""" self.start_btn.setEnabled(True) self.progress_bar.setValue(100) # 更新导出列表 if self.updater_thread and self.updater_thread.updaters: self.export_list.clear() for updater in self.updater_thread.updaters: if updater.updated_users: item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)") item.setData(Qt.UserRole, updater.project) self.export_list.addItem(item) def export_user_info(self): """导出用户信息""" selected_items = self.export_list.selectedItems() if not selected_items: QMessageBox.warning(self, "错误", "请选择要导出的项目") return if not self.updater_thread: QMessageBox.warning(self, "错误", "没有可导出的数据") return for item in selected_items: project_name = item.data(Qt.UserRole) self.log_text.append(f"开始导出项目 {project_name} 的用户信息...") self.updater_thread.export_user_info(project_name) def show_error(self, error_msg): """显示错误信息""" QMessageBox.critical(self, "错误", error_msg) self.start_btn.setEnabled(True) def closeEvent(self, event): """关闭应用程序事件""" if self.updater_thread and self.updater_thread.isRunning(): reply = QMessageBox.question( self, '确认退出', '更新操作正在进行中,确定要退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.Yes: self.updater_thread.stop() self.updater_thread.wait() event.accept() else: event.ignore() else: event.accept() def main(): """主函数""" app = QApplication(sys.argv) window = PasswordUpdaterApp() window.show() sys.exit(app.exec_()) if __name__ == '__main__': main()