新增客户端gui界面
新增argon2id'加密方式,适配部分项目的加密方式调整
31 files added
4 files modified
6877 ■■■■■ changed files
测试组/脚本/Change_password/.idea/misc.xml 3 ●●●●● patch | view | raw | blame | history
测试组/脚本/Change_password/dist/dbExcel/数据库信息.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/dist/密码更新工具.exe patch | view | raw | blame | history
测试组/脚本/Change_password/修改数据库的哈希密码和原始密码做桌面客户端.py 617 ●●●●● patch | view | raw | blame | history
测试组/脚本/Change_password/修改数据库的哈希密码和原始密码做桌面客户端源代码.py 617 ●●●●● patch | view | raw | blame | history
测试组/脚本/Change_password/修改数据库的哈希密码和原始密码做桌面客户端源代码(argon2id).py 639 ●●●●● patch | view | raw | blame | history
测试组/脚本/Change_password/密码更新工具.spec 38 ●●●●● patch | view | raw | blame | history
测试组/脚本/Change_password/批量更新数据库用户密码脚本使用手册.docx patch | view | raw | blame | history
测试组/脚本/Change_password/更新日志.csv 4963 ●●●●● patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/上海中医药_shutcm测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/上海交通大学_sjtu测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/上海市一测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/上海脑所_bsbii测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/上海药明测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/中国科学院脑科学与智能技术卓越创新中心测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/中洪博元测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/云南中医药大学_ynucm测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/云南大学_ynu测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/仪器测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/北京大学人民医院_pkuph测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/北京大学口腔医院_bjmu测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/北京脑所测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/华西医院_wchscu测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/华西海圻测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/南京鼓楼医院_njglyy测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/成都中医药_cdutcm测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/昆明理工测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/江南大学_jiangnan测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/河北大学测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/爱尔眼科测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/福建省妇幼保健院_fjsfy测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/西南大学测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/重庆三峡医药高专_sxyyc测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/陕西师范大学_snnu测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/用户信息导出/齐鲁医院_qilu测试环境账号密码_20250821.xlsx patch | view | raw | blame | history
测试组/脚本/Change_password/.idea/misc.xml
@@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="Black">
    <option name="sdkName" value="G:\Anaconda3" />
  </component>
  <component name="ProjectRootManager" version="2" project-jdk-name="G:\Anaconda3" project-jdk-type="Python SDK" />
</project>
测试组/脚本/Change_password/dist/dbExcel/数据库信息.xlsx
Binary files differ
测试组/脚本/Change_password/dist/密码更新工具.exe
Binary files differ
测试组/脚本/Change_password/修改数据库的哈希密码和原始密码做桌面客户端.py
New file
@@ -0,0 +1,617 @@
import sys
import os
import pandas as pd
import bcrypt
import mysql.connector
from mysql.connector import Error
import asyncio
import threading
from datetime import datetime
from typing import List, Dict, Any, Optional
import re
from PyQt5 import QtWidgets, QtCore, QtGui
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
                             QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget,
                             QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar,
                             QGroupBox, QRadioButton, QButtonGroup, QTabWidget)
from PyQt5.QtCore import Qt, pyqtSignal, QThread
# 判断是否是打包后的可执行文件
if getattr(sys, 'frozen', False):
    # 如果是可执行文件,使用可执行文件所在目录作为基础路径
    BASE_DIR = os.path.dirname(sys.executable)
else:
    # 如果是脚本,使用脚本所在目录
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 修改默认路径配置
DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx')
DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv')
DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出')
DEFAULT_NEW_PASSWORD = 'Baoyi@1341'
PROTECTED_USER_ID = 1
class DatabaseUpdater(QThread):
    """数据库更新器类,运行在单独的线程中"""
    # 定义信号
    log_signal = pyqtSignal(str)
    progress_signal = pyqtSignal(int)
    finished_signal = pyqtSignal()
    error_signal = pyqtSignal(str)
    export_finished_signal = pyqtSignal(str)
    def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None):
        super().__init__()
        self.excel_path = excel_path
        self.new_password = new_password
        self.mode = mode  # 'all' 或 'specific'
        self.user_filter = user_filter
        self.selected_projects = selected_projects
        self.updaters = []
        self.log_records = []
        self.is_running = True
    def run(self):
        """主运行方法"""
        try:
            # 读取 Excel 数据
            try:
                df = pd.read_excel(self.excel_path)
            except Exception as e:
                self.error_signal.emit(f"读取Excel文件失败:{e}")
                return
            # 获取所有唯一项目列表
            all_projects = df['项目'].unique().tolist()
            if not all_projects:
                self.error_signal.emit("Excel 文件中未找到任何项目")
                return
            # 过滤 DataFrame
            if self.selected_projects:
                df_filtered = df[df['项目'].isin(self.selected_projects)]
            else:
                df_filtered = df
            if df_filtered.empty:
                self.error_signal.emit("筛选后无有效项目,请检查输入")
                return
            # 创建数据库更新器实例
            self.updaters = []
            for _, row in df_filtered.iterrows():
                project, host, database, user, password = row[:5]
                self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database}")
                updater = ProjectUpdater(project, host, database, user, password)
                self.updaters.append(updater)
            # 执行更新任务
            total_projects = len(self.updaters)
            for i, updater in enumerate(self.updaters):
                if not self.is_running:
                    break
                self.log_signal.emit(f"正在处理项目: {updater.project}")
                # 运行更新
                success = updater.update_database(
                    user_filter=self.user_filter if self.mode == 'specific' else None,
                    new_password=self.new_password
                )
                if success:
                    self.log_records.extend(updater.log_records)
                    self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户")
                else:
                    self.log_signal.emit(f"❌ {updater.project} 更新失败")
                # 更新进度
                self.progress_signal.emit(int((i + 1) * 100 / total_projects))
            # 保存日志
            if self.log_records:
                pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig')
                self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}")
            else:
                self.log_signal.emit("无任何更新记录生成")
        except Exception as e:
            self.error_signal.emit(f"运行过程中发生错误: {str(e)}")
        finally:
            self.finished_signal.emit()
    def stop(self):
        """停止运行"""
        self.is_running = False
    def export_user_info(self, project_name):
        """导出用户信息"""
        try:
            # 找到对应的项目更新器
            updater = None
            for u in self.updaters:
                if u.project == project_name:
                    updater = u
                    break
            if not updater or not updater.updated_users:
                self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息")
                return
            # 连接到数据库并导出
            conn = updater.connect()
            if conn:
                cursor = conn.cursor()
                success = updater.export_user_info(cursor, self.new_password)
                cursor.close()
                conn.close()
                if success:
                    self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成")
                else:
                    self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败")
            else:
                self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库")
        except Exception as e:
            self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}")
class ProjectUpdater:
    """单个项目更新器"""
    def __init__(self, project, host, database, user, password):
        self.project = project
        self.host = host
        self.database = database
        self.db_user = user
        self.db_password = password
        self.updated_users = []
        self.log_records = []
    def connect(self):
        """建立数据库连接"""
        try:
            conn = mysql.connector.connect(
                host=self.host,
                port=3306,
                user=self.db_user,
                password=self.db_password,
                database=self.database,
                use_pure=True
            )
            return conn
        except Error as e:
            return None
    def check_plain_text_column(self, cursor):
        """检查是否存在 plain_text 字段"""
        try:
            cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
            return cursor.fetchone() is not None
        except Error:
            return False
    def get_users_to_update(self, cursor, user_filter=None):
        """获取需要更新的用户列表"""
        try:
            # 构造基础查询语句
            base_sql = """
                SELECT id, login_name, name, current_role_id
                FROM sys_user
                WHERE id != %s
            """  # 默认排除保护用户
            params = [PROTECTED_USER_ID]
            # 添加用户筛选条件
            if isinstance(user_filter, list) and user_filter:  # 用户名列表模式
                placeholders = ','.join(['%s'] * len(user_filter))
                base_sql += f" AND login_name IN ({placeholders})"
                params.extend(user_filter)
            cursor.execute(base_sql, params)
            return cursor.fetchall()
        except Error:
            return []
    def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password):
        """更新用户密码"""
        try:
            salt = bcrypt.gensalt(rounds=10)
            hashed = bcrypt.hashpw(new_password.encode('utf-8'), salt).decode('utf-8')
            # 构造更新 SQL
            if has_plain_text:
                update_sql = """
                    UPDATE sys_user
                    SET plain_text = %s, password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (new_password, hashed, user_id))
            else:
                update_sql = """
                    UPDATE sys_user
                    SET password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (hashed, user_id))
            return True
        except Error:
            return False
    def get_role_name(self, cursor, role_id):
        """根据角色ID获取角色名称"""
        try:
            if not role_id:
                return "无角色"
            cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,))
            result = cursor.fetchone()
            return result[0] if result else "角色不存在"
        except Error:
            return "获取失败"
    def export_user_info(self, cursor, new_password):
        """导出用户信息到Excel"""
        try:
            if not self.updated_users:
                return False
            # 确保导出目录存在
            os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True)
            # 获取所有角色ID
            role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id']))
            # 获取角色名称映射
            role_name_map = {}
            for role_id in role_ids:
                role_name = self.get_role_name(cursor, role_id)
                role_name_map[role_id] = role_name
            # 准备导出数据
            export_data = []
            for user in self.updated_users:
                export_data.append({
                    '用户名': user['login_name'],
                    '密码': new_password,
                    '姓名': user['name'],
                    '角色名称': role_name_map.get(user['role_id'], '无角色')
                })
            # 创建DataFrame并导出
            df_export = pd.DataFrame(export_data)
            # 按照要求格式化文件名:项目名称测试环境账号密码_年月日
            safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project)
            safe_project_name = safe_project_name.replace(" ", "_")
            date_str = datetime.now().strftime("%Y%m%d")
            filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx"
            filepath = os.path.join(DEFAULT_EXPORT_DIR, filename)
            df_export.to_excel(filepath, index=False)
            return True
        except Exception:
            return False
    def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD):
        """更新数据库中的用户密码"""
        conn = None
        cursor = None
        try:
            conn = self.connect()
            if not conn:
                return False
            # 使用同步方式创建游标
            cursor = conn.cursor()
            # 检查是否有 plain_text 字段
            has_plain_text = self.check_plain_text_column(cursor)
            # 获取需要更新的用户
            users = self.get_users_to_update(cursor, user_filter)
            if not users:
                return False
            # 更新每个用户的密码
            for uid, login_name, name, current_role_id in users:
                success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password)
                if success:
                    # 记录更新成功的用户信息
                    self.updated_users.append({
                        'id': uid,
                        'login_name': login_name,
                        'name': name,
                        'role_id': current_role_id
                    })
                    # 写入日志
                    self.log_records.append({
                        '项目': self.project,
                        '数据库': self.database,
                        '用户ID': uid,
                        '用户名': login_name,
                        '明文密码': new_password if has_plain_text else '未更新',
                        '哈希密码': bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt(rounds=10)).decode(
                            'utf-8')
                    })
            # 提交事务
            conn.commit()
            return True
        except Error:
            return False
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
class PasswordUpdaterApp(QMainWindow):
    """主应用程序窗口"""
    def __init__(self):
        super().__init__()
        self.updater_thread = None
        self.init_ui()
    def init_ui(self):
        """初始化用户界面"""
        self.setWindowTitle('数据库密码批量修改工具')
        self.setGeometry(100, 100, 800, 600)
        # 创建中央部件和布局
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        layout = QVBoxLayout(central_widget)
        # 创建选项卡
        tabs = QTabWidget()
        layout.addWidget(tabs)
        # 配置选项卡
        config_tab = QWidget()
        config_layout = QVBoxLayout(config_tab)
        # Excel文件选择
        excel_group = QGroupBox("Excel配置文件")
        excel_layout = QHBoxLayout(excel_group)
        self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH)
        excel_browse_btn = QPushButton("浏览...")
        excel_browse_btn.clicked.connect(self.browse_excel_file)
        excel_layout.addWidget(QLabel("Excel文件路径:"))
        excel_layout.addWidget(self.excel_path_edit)
        excel_layout.addWidget(excel_browse_btn)
        config_layout.addWidget(excel_group)
        # 新密码设置
        password_group = QGroupBox("新密码设置")
        password_layout = QHBoxLayout(password_group)
        self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD)
        self.password_edit.setEchoMode(QLineEdit.Password)
        password_layout.addWidget(QLabel("新密码:"))
        password_layout.addWidget(self.password_edit)
        config_layout.addWidget(password_group)
        # 项目选择
        projects_group = QGroupBox("项目选择")
        projects_layout = QVBoxLayout(projects_group)
        self.projects_list = QListWidget()
        self.load_projects_btn = QPushButton("加载项目")
        self.load_projects_btn.clicked.connect(self.load_projects)
        projects_layout.addWidget(self.projects_list)
        projects_layout.addWidget(self.load_projects_btn)
        config_layout.addWidget(projects_group)
        # 操作模式选择
        mode_group = QGroupBox("操作模式")
        mode_layout = QVBoxLayout(mode_group)
        self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)")
        self.specific_users_radio = QRadioButton("指定用户名更新")
        self.all_users_radio.setChecked(True)
        # 用户名输入
        self.usernames_edit = QLineEdit()
        self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔")
        self.usernames_edit.setEnabled(False)
        # 连接信号
        self.all_users_radio.toggled.connect(
            lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked()))
        self.specific_users_radio.toggled.connect(
            lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked()))
        mode_layout.addWidget(self.all_users_radio)
        mode_layout.addWidget(self.specific_users_radio)
        mode_layout.addWidget(self.usernames_edit)
        config_layout.addWidget(mode_group)
        # 进度条
        self.progress_bar = QProgressBar()
        config_layout.addWidget(self.progress_bar)
        # 开始按钮
        self.start_btn = QPushButton("开始更新")
        self.start_btn.clicked.connect(self.start_update)
        config_layout.addWidget(self.start_btn)
        # 日志选项卡
        log_tab = QWidget()
        log_layout = QVBoxLayout(log_tab)
        self.log_text = QTextEdit()
        self.log_text.setReadOnly(True)
        log_layout.addWidget(QLabel("操作日志:"))
        log_layout.addWidget(self.log_text)
        # 导出选项卡
        export_tab = QWidget()
        export_layout = QVBoxLayout(export_tab)
        self.export_list = QListWidget()
        self.export_btn = QPushButton("导出选中项目用户信息")
        self.export_btn.clicked.connect(self.export_user_info)
        export_layout.addWidget(QLabel("已更新项目列表:"))
        export_layout.addWidget(self.export_list)
        export_layout.addWidget(self.export_btn)
        # 添加选项卡
        tabs.addTab(config_tab, "配置")
        tabs.addTab(log_tab, "日志")
        tabs.addTab(export_tab, "导出")
    def browse_excel_file(self):
        """浏览Excel文件"""
        file_path, _ = QFileDialog.getOpenFileName(
            self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)"
        )
        if file_path:
            self.excel_path_edit.setText(file_path)
    def load_projects(self):
        """加载项目列表"""
        excel_path = self.excel_path_edit.text()
        if not os.path.exists(excel_path):
            QMessageBox.warning(self, "错误", "Excel文件不存在")
            return
        try:
            df = pd.read_excel(excel_path)
            projects = df['项目'].unique().tolist()
            self.projects_list.clear()
            for project in projects:
                item = QListWidgetItem(project)
                item.setCheckState(Qt.Unchecked)
                self.projects_list.addItem(item)
            self.log_text.append(f"已加载 {len(projects)} 个项目")
        except Exception as e:
            QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}")
    def start_update(self):
        """开始更新"""
        # 获取选中的项目
        selected_projects = []
        for i in range(self.projects_list.count()):
            item = self.projects_list.item(i)
            if item.checkState() == Qt.Checked:
                selected_projects.append(item.text())
        if not selected_projects:
            QMessageBox.warning(self, "错误", "请至少选择一个项目")
            return
        # 获取操作模式
        if self.all_users_radio.isChecked():
            mode = 'all'
            user_filter = None
        else:
            mode = 'specific'
            usernames = self.usernames_edit.text().strip()
            if not usernames:
                QMessageBox.warning(self, "错误", "请输入要更新的用户名")
                return
            user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
        # 获取新密码
        new_password = self.password_edit.text()
        if not new_password:
            QMessageBox.warning(self, "错误", "请输入新密码")
            return
        # 创建并启动更新线程
        self.updater_thread = DatabaseUpdater(
            self.excel_path_edit.text(),
            new_password,
            mode,
            user_filter,
            selected_projects
        )
        # 连接信号
        self.updater_thread.log_signal.connect(self.log_text.append)
        self.updater_thread.progress_signal.connect(self.progress_bar.setValue)
        self.updater_thread.finished_signal.connect(self.on_update_finished)
        self.updater_thread.error_signal.connect(self.show_error)
        self.updater_thread.export_finished_signal.connect(self.log_text.append)
        # 禁用开始按钮
        self.start_btn.setEnabled(False)
        # 启动线程
        self.updater_thread.start()
    def on_update_finished(self):
        """更新完成后的处理"""
        self.start_btn.setEnabled(True)
        self.progress_bar.setValue(100)
        # 更新导出列表
        if self.updater_thread and self.updater_thread.updaters:
            self.export_list.clear()
            for updater in self.updater_thread.updaters:
                if updater.updated_users:
                    item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)")
                    item.setData(Qt.UserRole, updater.project)
                    self.export_list.addItem(item)
    def export_user_info(self):
        """导出用户信息"""
        selected_items = self.export_list.selectedItems()
        if not selected_items:
            QMessageBox.warning(self, "错误", "请选择要导出的项目")
            return
        if not self.updater_thread:
            QMessageBox.warning(self, "错误", "没有可导出的数据")
            return
        for item in selected_items:
            project_name = item.data(Qt.UserRole)
            self.log_text.append(f"开始导出项目 {project_name} 的用户信息...")
            self.updater_thread.export_user_info(project_name)
    def show_error(self, error_msg):
        """显示错误信息"""
        QMessageBox.critical(self, "错误", error_msg)
        self.start_btn.setEnabled(True)
    def closeEvent(self, event):
        """关闭应用程序事件"""
        if self.updater_thread and self.updater_thread.isRunning():
            reply = QMessageBox.question(
                self, '确认退出',
                '更新操作正在进行中,确定要退出吗?',
                QMessageBox.Yes | QMessageBox.No,
                QMessageBox.No
            )
            if reply == QMessageBox.Yes:
                self.updater_thread.stop()
                self.updater_thread.wait()
                event.accept()
            else:
                event.ignore()
        else:
            event.accept()
def main():
    """主函数"""
    app = QApplication(sys.argv)
    window = PasswordUpdaterApp()
    window.show()
    sys.exit(app.exec_())
if __name__ == '__main__':
    main()
测试组/脚本/Change_password/修改数据库的哈希密码和原始密码做桌面客户端源代码.py
New file
@@ -0,0 +1,617 @@
import sys
import os
import pandas as pd
import bcrypt
import mysql.connector
from mysql.connector import Error
import asyncio
import threading
from datetime import datetime
from typing import List, Dict, Any, Optional
import re
from PyQt5 import QtWidgets, QtCore, QtGui
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
                             QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget,
                             QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar,
                             QGroupBox, QRadioButton, QButtonGroup, QTabWidget)
from PyQt5.QtCore import Qt, pyqtSignal, QThread
# 判断是否是打包后的可执行文件
if getattr(sys, 'frozen', False):
    # 如果是可执行文件,使用可执行文件所在目录作为基础路径
    BASE_DIR = os.path.dirname(sys.executable)
else:
    # 如果是脚本,使用脚本所在目录
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 修改默认路径配置
DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx')
DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv')
DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出')
DEFAULT_NEW_PASSWORD = 'Baoyi@1341'
PROTECTED_USER_ID = 1
class DatabaseUpdater(QThread):
    """数据库更新器类,运行在单独的线程中"""
    # 定义信号
    log_signal = pyqtSignal(str)
    progress_signal = pyqtSignal(int)
    finished_signal = pyqtSignal()
    error_signal = pyqtSignal(str)
    export_finished_signal = pyqtSignal(str)
    def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None):
        super().__init__()
        self.excel_path = excel_path
        self.new_password = new_password
        self.mode = mode  # 'all' 或 'specific'
        self.user_filter = user_filter
        self.selected_projects = selected_projects
        self.updaters = []
        self.log_records = []
        self.is_running = True
    def run(self):
        """主运行方法"""
        try:
            # 读取 Excel 数据
            try:
                df = pd.read_excel(self.excel_path)
            except Exception as e:
                self.error_signal.emit(f"读取Excel文件失败:{e}")
                return
            # 获取所有唯一项目列表
            all_projects = df['项目'].unique().tolist()
            if not all_projects:
                self.error_signal.emit("Excel 文件中未找到任何项目")
                return
            # 过滤 DataFrame
            if self.selected_projects:
                df_filtered = df[df['项目'].isin(self.selected_projects)]
            else:
                df_filtered = df
            if df_filtered.empty:
                self.error_signal.emit("筛选后无有效项目,请检查输入")
                return
            # 创建数据库更新器实例
            self.updaters = []
            for _, row in df_filtered.iterrows():
                project, host, database, user, password = row[:5]
                self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database}")
                updater = ProjectUpdater(project, host, database, user, password)
                self.updaters.append(updater)
            # 执行更新任务
            total_projects = len(self.updaters)
            for i, updater in enumerate(self.updaters):
                if not self.is_running:
                    break
                self.log_signal.emit(f"正在处理项目: {updater.project}")
                # 运行更新
                success = updater.update_database(
                    user_filter=self.user_filter if self.mode == 'specific' else None,
                    new_password=self.new_password
                )
                if success:
                    self.log_records.extend(updater.log_records)
                    self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户")
                else:
                    self.log_signal.emit(f"❌ {updater.project} 更新失败")
                # 更新进度
                self.progress_signal.emit(int((i + 1) * 100 / total_projects))
            # 保存日志
            if self.log_records:
                pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig')
                self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}")
            else:
                self.log_signal.emit("无任何更新记录生成")
        except Exception as e:
            self.error_signal.emit(f"运行过程中发生错误: {str(e)}")
        finally:
            self.finished_signal.emit()
    def stop(self):
        """停止运行"""
        self.is_running = False
    def export_user_info(self, project_name):
        """导出用户信息"""
        try:
            # 找到对应的项目更新器
            updater = None
            for u in self.updaters:
                if u.project == project_name:
                    updater = u
                    break
            if not updater or not updater.updated_users:
                self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息")
                return
            # 连接到数据库并导出
            conn = updater.connect()
            if conn:
                cursor = conn.cursor()
                success = updater.export_user_info(cursor, self.new_password)
                cursor.close()
                conn.close()
                if success:
                    self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成")
                else:
                    self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败")
            else:
                self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库")
        except Exception as e:
            self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}")
class ProjectUpdater:
    """单个项目更新器"""
    def __init__(self, project, host, database, user, password):
        self.project = project
        self.host = host
        self.database = database
        self.db_user = user
        self.db_password = password
        self.updated_users = []
        self.log_records = []
    def connect(self):
        """建立数据库连接"""
        try:
            conn = mysql.connector.connect(
                host=self.host,
                port=3306,
                user=self.db_user,
                password=self.db_password,
                database=self.database,
                use_pure=True
            )
            return conn
        except Error as e:
            return None
    def check_plain_text_column(self, cursor):
        """检查是否存在 plain_text 字段"""
        try:
            cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
            return cursor.fetchone() is not None
        except Error:
            return False
    def get_users_to_update(self, cursor, user_filter=None):
        """获取需要更新的用户列表"""
        try:
            # 构造基础查询语句
            base_sql = """
                SELECT id, login_name, name, current_role_id
                FROM sys_user
                WHERE id != %s
            """  # 默认排除保护用户
            params = [PROTECTED_USER_ID]
            # 添加用户筛选条件
            if isinstance(user_filter, list) and user_filter:  # 用户名列表模式
                placeholders = ','.join(['%s'] * len(user_filter))
                base_sql += f" AND login_name IN ({placeholders})"
                params.extend(user_filter)
            cursor.execute(base_sql, params)
            return cursor.fetchall()
        except Error:
            return []
    def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password):
        """更新用户密码"""
        try:
            salt = bcrypt.gensalt(rounds=10)
            hashed = bcrypt.hashpw(new_password.encode('utf-8'), salt).decode('utf-8')
            # 构造更新 SQL
            if has_plain_text:
                update_sql = """
                    UPDATE sys_user
                    SET plain_text = %s, password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (new_password, hashed, user_id))
            else:
                update_sql = """
                    UPDATE sys_user
                    SET password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (hashed, user_id))
            return True
        except Error:
            return False
    def get_role_name(self, cursor, role_id):
        """根据角色ID获取角色名称"""
        try:
            if not role_id:
                return "无角色"
            cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,))
            result = cursor.fetchone()
            return result[0] if result else "角色不存在"
        except Error:
            return "获取失败"
    def export_user_info(self, cursor, new_password):
        """导出用户信息到Excel"""
        try:
            if not self.updated_users:
                return False
            # 确保导出目录存在
            os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True)
            # 获取所有角色ID
            role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id']))
            # 获取角色名称映射
            role_name_map = {}
            for role_id in role_ids:
                role_name = self.get_role_name(cursor, role_id)
                role_name_map[role_id] = role_name
            # 准备导出数据
            export_data = []
            for user in self.updated_users:
                export_data.append({
                    '用户名': user['login_name'],
                    '密码': new_password,
                    '姓名': user['name'],
                    '角色名称': role_name_map.get(user['role_id'], '无角色')
                })
            # 创建DataFrame并导出
            df_export = pd.DataFrame(export_data)
            # 按照要求格式化文件名:项目名称测试环境账号密码_年月日
            safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project)
            safe_project_name = safe_project_name.replace(" ", "_")
            date_str = datetime.now().strftime("%Y%m%d")
            filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx"
            filepath = os.path.join(DEFAULT_EXPORT_DIR, filename)
            df_export.to_excel(filepath, index=False)
            return True
        except Exception:
            return False
    def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD):
        """更新数据库中的用户密码"""
        conn = None
        cursor = None
        try:
            conn = self.connect()
            if not conn:
                return False
            # 使用同步方式创建游标
            cursor = conn.cursor()
            # 检查是否有 plain_text 字段
            has_plain_text = self.check_plain_text_column(cursor)
            # 获取需要更新的用户
            users = self.get_users_to_update(cursor, user_filter)
            if not users:
                return False
            # 更新每个用户的密码
            for uid, login_name, name, current_role_id in users:
                success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password)
                if success:
                    # 记录更新成功的用户信息
                    self.updated_users.append({
                        'id': uid,
                        'login_name': login_name,
                        'name': name,
                        'role_id': current_role_id
                    })
                    # 写入日志
                    self.log_records.append({
                        '项目': self.project,
                        '数据库': self.database,
                        '用户ID': uid,
                        '用户名': login_name,
                        '明文密码': new_password if has_plain_text else '未更新',
                        '哈希密码': bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt(rounds=10)).decode(
                            'utf-8')
                    })
            # 提交事务
            conn.commit()
            return True
        except Error:
            return False
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
class PasswordUpdaterApp(QMainWindow):
    """主应用程序窗口"""
    def __init__(self):
        super().__init__()
        self.updater_thread = None
        self.init_ui()
    def init_ui(self):
        """初始化用户界面"""
        self.setWindowTitle('数据库密码批量修改工具')
        self.setGeometry(100, 100, 800, 600)
        # 创建中央部件和布局
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        layout = QVBoxLayout(central_widget)
        # 创建选项卡
        tabs = QTabWidget()
        layout.addWidget(tabs)
        # 配置选项卡
        config_tab = QWidget()
        config_layout = QVBoxLayout(config_tab)
        # Excel文件选择
        excel_group = QGroupBox("Excel配置文件")
        excel_layout = QHBoxLayout(excel_group)
        self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH)
        excel_browse_btn = QPushButton("浏览...")
        excel_browse_btn.clicked.connect(self.browse_excel_file)
        excel_layout.addWidget(QLabel("Excel文件路径:"))
        excel_layout.addWidget(self.excel_path_edit)
        excel_layout.addWidget(excel_browse_btn)
        config_layout.addWidget(excel_group)
        # 新密码设置
        password_group = QGroupBox("新密码设置")
        password_layout = QHBoxLayout(password_group)
        self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD)
        self.password_edit.setEchoMode(QLineEdit.Password)
        password_layout.addWidget(QLabel("新密码:"))
        password_layout.addWidget(self.password_edit)
        config_layout.addWidget(password_group)
        # 项目选择
        projects_group = QGroupBox("项目选择")
        projects_layout = QVBoxLayout(projects_group)
        self.projects_list = QListWidget()
        self.load_projects_btn = QPushButton("加载项目")
        self.load_projects_btn.clicked.connect(self.load_projects)
        projects_layout.addWidget(self.projects_list)
        projects_layout.addWidget(self.load_projects_btn)
        config_layout.addWidget(projects_group)
        # 操作模式选择
        mode_group = QGroupBox("操作模式")
        mode_layout = QVBoxLayout(mode_group)
        self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)")
        self.specific_users_radio = QRadioButton("指定用户名更新")
        self.all_users_radio.setChecked(True)
        # 用户名输入
        self.usernames_edit = QLineEdit()
        self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔")
        self.usernames_edit.setEnabled(False)
        # 连接信号
        self.all_users_radio.toggled.connect(
            lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked()))
        self.specific_users_radio.toggled.connect(
            lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked()))
        mode_layout.addWidget(self.all_users_radio)
        mode_layout.addWidget(self.specific_users_radio)
        mode_layout.addWidget(self.usernames_edit)
        config_layout.addWidget(mode_group)
        # 进度条
        self.progress_bar = QProgressBar()
        config_layout.addWidget(self.progress_bar)
        # 开始按钮
        self.start_btn = QPushButton("开始更新")
        self.start_btn.clicked.connect(self.start_update)
        config_layout.addWidget(self.start_btn)
        # 日志选项卡
        log_tab = QWidget()
        log_layout = QVBoxLayout(log_tab)
        self.log_text = QTextEdit()
        self.log_text.setReadOnly(True)
        log_layout.addWidget(QLabel("操作日志:"))
        log_layout.addWidget(self.log_text)
        # 导出选项卡
        export_tab = QWidget()
        export_layout = QVBoxLayout(export_tab)
        self.export_list = QListWidget()
        self.export_btn = QPushButton("导出选中项目用户信息")
        self.export_btn.clicked.connect(self.export_user_info)
        export_layout.addWidget(QLabel("已更新项目列表:"))
        export_layout.addWidget(self.export_list)
        export_layout.addWidget(self.export_btn)
        # 添加选项卡
        tabs.addTab(config_tab, "配置")
        tabs.addTab(log_tab, "日志")
        tabs.addTab(export_tab, "导出")
    def browse_excel_file(self):
        """浏览Excel文件"""
        file_path, _ = QFileDialog.getOpenFileName(
            self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)"
        )
        if file_path:
            self.excel_path_edit.setText(file_path)
    def load_projects(self):
        """加载项目列表"""
        excel_path = self.excel_path_edit.text()
        if not os.path.exists(excel_path):
            QMessageBox.warning(self, "错误", "Excel文件不存在")
            return
        try:
            df = pd.read_excel(excel_path)
            projects = df['项目'].unique().tolist()
            self.projects_list.clear()
            for project in projects:
                item = QListWidgetItem(project)
                item.setCheckState(Qt.Unchecked)
                self.projects_list.addItem(item)
            self.log_text.append(f"已加载 {len(projects)} 个项目")
        except Exception as e:
            QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}")
    def start_update(self):
        """开始更新"""
        # 获取选中的项目
        selected_projects = []
        for i in range(self.projects_list.count()):
            item = self.projects_list.item(i)
            if item.checkState() == Qt.Checked:
                selected_projects.append(item.text())
        if not selected_projects:
            QMessageBox.warning(self, "错误", "请至少选择一个项目")
            return
        # 获取操作模式
        if self.all_users_radio.isChecked():
            mode = 'all'
            user_filter = None
        else:
            mode = 'specific'
            usernames = self.usernames_edit.text().strip()
            if not usernames:
                QMessageBox.warning(self, "错误", "请输入要更新的用户名")
                return
            user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
        # 获取新密码
        new_password = self.password_edit.text()
        if not new_password:
            QMessageBox.warning(self, "错误", "请输入新密码")
            return
        # 创建并启动更新线程
        self.updater_thread = DatabaseUpdater(
            self.excel_path_edit.text(),
            new_password,
            mode,
            user_filter,
            selected_projects
        )
        # 连接信号
        self.updater_thread.log_signal.connect(self.log_text.append)
        self.updater_thread.progress_signal.connect(self.progress_bar.setValue)
        self.updater_thread.finished_signal.connect(self.on_update_finished)
        self.updater_thread.error_signal.connect(self.show_error)
        self.updater_thread.export_finished_signal.connect(self.log_text.append)
        # 禁用开始按钮
        self.start_btn.setEnabled(False)
        # 启动线程
        self.updater_thread.start()
    def on_update_finished(self):
        """更新完成后的处理"""
        self.start_btn.setEnabled(True)
        self.progress_bar.setValue(100)
        # 更新导出列表
        if self.updater_thread and self.updater_thread.updaters:
            self.export_list.clear()
            for updater in self.updater_thread.updaters:
                if updater.updated_users:
                    item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)")
                    item.setData(Qt.UserRole, updater.project)
                    self.export_list.addItem(item)
    def export_user_info(self):
        """导出用户信息"""
        selected_items = self.export_list.selectedItems()
        if not selected_items:
            QMessageBox.warning(self, "错误", "请选择要导出的项目")
            return
        if not self.updater_thread:
            QMessageBox.warning(self, "错误", "没有可导出的数据")
            return
        for item in selected_items:
            project_name = item.data(Qt.UserRole)
            self.log_text.append(f"开始导出项目 {project_name} 的用户信息...")
            self.updater_thread.export_user_info(project_name)
    def show_error(self, error_msg):
        """显示错误信息"""
        QMessageBox.critical(self, "错误", error_msg)
        self.start_btn.setEnabled(True)
    def closeEvent(self, event):
        """关闭应用程序事件"""
        if self.updater_thread and self.updater_thread.isRunning():
            reply = QMessageBox.question(
                self, '确认退出',
                '更新操作正在进行中,确定要退出吗?',
                QMessageBox.Yes | QMessageBox.No,
                QMessageBox.No
            )
            if reply == QMessageBox.Yes:
                self.updater_thread.stop()
                self.updater_thread.wait()
                event.accept()
            else:
                event.ignore()
        else:
            event.accept()
def main():
    """主函数"""
    app = QApplication(sys.argv)
    window = PasswordUpdaterApp()
    window.show()
    sys.exit(app.exec_())
if __name__ == '__main__':
    main()
测试组/脚本/Change_password/修改数据库的哈希密码和原始密码做桌面客户端源代码(argon2id).py
New file
@@ -0,0 +1,639 @@
import sys
import os
import pandas as pd
import mysql.connector
from mysql.connector import Error
from datetime import datetime
from typing import List, Dict, Any, Optional
import re
from PyQt5 import QtWidgets, QtCore, QtGui
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
                             QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget,
                             QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar,
                             QGroupBox, QRadioButton, QButtonGroup, QTabWidget)
from PyQt5.QtCore import Qt, pyqtSignal, QThread
# 判断是否是打包后的可执行文件
if getattr(sys, 'frozen', False):
    # 如果是可执行文件,使用可执行文件所在目录作为基础路径
    BASE_DIR = os.path.dirname(sys.executable)
else:
    # 如果是脚本,使用脚本所在目录
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 修改默认路径配置
DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx')
DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv')
DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出')
DEFAULT_NEW_PASSWORD = 'Baoyi@1341'
PROTECTED_USER_ID = 1
# 导入Argon2PasswordEncoder相关库
try:
    from argon2 import PasswordHasher
except ImportError:
    print("请安装argon2-cffi库: pip install argon2-cffi")
    sys.exit(1)
# 配置Argon2参数
SALT_LENGTH = 32
HASH_LENGTH = 64
PARALLELISM = 2
MEMORY = 65536  # 64MB
ITERATIONS = 5
# 创建Argon2密码编码器实例
argon2_hasher = PasswordHasher(
    time_cost=ITERATIONS,
    memory_cost=MEMORY,
    parallelism=PARALLELISM,
    hash_len=HASH_LENGTH,
    salt_len=SALT_LENGTH
)
def encode_password(raw_password):
    """使用Argon2编码密码"""
    return argon2_hasher.hash(raw_password)
class DatabaseUpdater(QThread):
    """数据库更新器类,运行在单独的线程中"""
    # 定义信号
    log_signal = pyqtSignal(str)
    progress_signal = pyqtSignal(int)
    finished_signal = pyqtSignal()
    error_signal = pyqtSignal(str)
    export_finished_signal = pyqtSignal(str)
    def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None):
        super().__init__()
        self.excel_path = excel_path
        self.new_password = new_password
        self.mode = mode  # 'all' 或 'specific'
        self.user_filter = user_filter
        self.selected_projects = selected_projects
        self.updaters = []
        self.log_records = []
        self.is_running = True
    def run(self):
        """主运行方法"""
        try:
            # 读取 Excel 数据
            try:
                df = pd.read_excel(self.excel_path)
            except Exception as e:
                self.error_signal.emit(f"读取Excel文件失败:{e}")
                return
            # 获取所有唯一项目列表
            all_projects = df['项目'].unique().tolist()
            if not all_projects:
                self.error_signal.emit("Excel 文件中未找到任何项目")
                return
            # 过滤 DataFrame
            if self.selected_projects:
                df_filtered = df[df['项目'].isin(self.selected_projects)]
            else:
                df_filtered = df
            if df_filtered.empty:
                self.error_signal.emit("筛选后无有效项目,请检查输入")
                return
            # 创建数据库更新器实例
            self.updaters = []
            for _, row in df_filtered.iterrows():
                project, host, database, user, password = row[:5]
                self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database}")
                updater = ProjectUpdater(project, host, database, user, password)
                self.updaters.append(updater)
            # 执行更新任务
            total_projects = len(self.updaters)
            for i, updater in enumerate(self.updaters):
                if not self.is_running:
                    break
                self.log_signal.emit(f"正在处理项目: {updater.project}")
                # 运行更新
                success = updater.update_database(
                    user_filter=self.user_filter if self.mode == 'specific' else None,
                    new_password=self.new_password
                )
                if success:
                    self.log_records.extend(updater.log_records)
                    self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户")
                else:
                    self.log_signal.emit(f"❌ {updater.project} 更新失败")
                # 更新进度
                self.progress_signal.emit(int((i + 1) * 100 / total_projects))
            # 保存日志
            if self.log_records:
                pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig')
                self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}")
            else:
                self.log_signal.emit("无任何更新记录生成")
        except Exception as e:
            self.error_signal.emit(f"运行过程中发生错误: {str(e)}")
        finally:
            self.finished_signal.emit()
    def stop(self):
        """停止运行"""
        self.is_running = False
    def export_user_info(self, project_name):
        """导出用户信息"""
        try:
            # 找到对应的项目更新器
            updater = None
            for u in self.updaters:
                if u.project == project_name:
                    updater = u
                    break
            if not updater or not updater.updated_users:
                self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息")
                return
            # 连接到数据库并导出
            conn = updater.connect()
            if conn:
                cursor = conn.cursor()
                success = updater.export_user_info(cursor, self.new_password)
                cursor.close()
                conn.close()
                if success:
                    self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成")
                else:
                    self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败")
            else:
                self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库")
        except Exception as e:
            self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}")
class ProjectUpdater:
    """单个项目更新器"""
    def __init__(self, project, host, database, user, password):
        self.project = project
        self.host = host
        self.database = database
        self.db_user = user
        self.db_password = password
        self.updated_users = []
        self.log_records = []
    def connect(self):
        """建立数据库连接"""
        try:
            conn = mysql.connector.connect(
                host=self.host,
                port=3306,
                user=self.db_user,
                password=self.db_password,
                database=self.database,
                use_pure=True
            )
            return conn
        except Error as e:
            return None
    def check_plain_text_column(self, cursor):
        """检查是否存在 plain_text 字段"""
        try:
            cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
            return cursor.fetchone() is not None
        except Error:
            return False
    def get_users_to_update(self, cursor, user_filter=None):
        """获取需要更新的用户列表"""
        try:
            # 构造基础查询语句
            base_sql = """
                SELECT id, login_name, name, current_role_id
                FROM sys_user
                WHERE id != %s
            """  # 默认排除保护用户
            params = [PROTECTED_USER_ID]
            # 添加用户筛选条件
            if isinstance(user_filter, list) and user_filter:  # 用户名列表模式
                placeholders = ','.join(['%s'] * len(user_filter))
                base_sql += f" AND login_name IN ({placeholders})"
                params.extend(user_filter)
            cursor.execute(base_sql, params)
            return cursor.fetchall()
        except Error:
            return []
    def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password):
        """更新用户密码"""
        try:
            # 使用Argon2生成哈希密码
            hashed = encode_password(new_password)
            # 构造更新 SQL
            if has_plain_text:
                update_sql = """
                    UPDATE sys_user
                    SET plain_text = %s, password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (new_password, hashed, user_id))
            else:
                update_sql = """
                    UPDATE sys_user
                    SET password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (hashed, user_id))
            return True
        except Error:
            return False
    def get_role_name(self, cursor, role_id):
        """根据角色ID获取角色名称"""
        try:
            if not role_id:
                return "无角色"
            cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,))
            result = cursor.fetchone()
            return result[0] if result else "角色不存在"
        except Error:
            return "获取失败"
    def export_user_info(self, cursor, new_password):
        """导出用户信息到Excel"""
        try:
            if not self.updated_users:
                return False
            # 确保导出目录存在
            os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True)
            # 获取所有角色ID
            role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id']))
            # 获取角色名称映射
            role_name_map = {}
            for role_id in role_ids:
                role_name = self.get_role_name(cursor, role_id)
                role_name_map[role_id] = role_name
            # 准备导出数据
            export_data = []
            for user in self.updated_users:
                export_data.append({
                    '用户名': user['login_name'],
                    '密码': new_password,
                    '姓名': user['name'],
                    '角色名称': role_name_map.get(user['role_id'], '无角色')
                })
            # 创建DataFrame并导出
            df_export = pd.DataFrame(export_data)
            # 按照要求格式化文件名:项目名称测试环境账号密码_年月日
            safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project)
            safe_project_name = safe_project_name.replace(" ", "_")
            date_str = datetime.now().strftime("%Y%m%d")
            filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx"
            filepath = os.path.join(DEFAULT_EXPORT_DIR, filename)
            df_export.to_excel(filepath, index=False)
            return True
        except Exception:
            return False
    def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD):
        """更新数据库中的用户密码"""
        conn = None
        cursor = None
        try:
            conn = self.connect()
            if not conn:
                return False
            # 使用同步方式创建游标
            cursor = conn.cursor()
            # 检查是否有 plain_text 字段
            has_plain_text = self.check_plain_text_column(cursor)
            # 获取需要更新的用户
            users = self.get_users_to_update(cursor, user_filter)
            if not users:
                return False
            # 更新每个用户的密码
            for uid, login_name, name, current_role_id in users:
                success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password)
                if success:
                    # 记录更新成功的用户信息
                    self.updated_users.append({
                        'id': uid,
                        'login_name': login_name,
                        'name': name,
                        'role_id': current_role_id
                    })
                    # 写入日志
                    self.log_records.append({
                        '项目': self.project,
                        '数据库': self.database,
                        '用户ID': uid,
                        '用户名': login_name,
                        '明文密码': new_password if has_plain_text else '未更新',
                        '哈希密码': encode_password(new_password)
                    })
            # 提交事务
            conn.commit()
            return True
        except Error:
            return False
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
class PasswordUpdaterApp(QMainWindow):
    """主应用程序窗口"""
    def __init__(self):
        super().__init__()
        self.updater_thread = None
        self.init_ui()
    def init_ui(self):
        """初始化用户界面"""
        self.setWindowTitle('数据库密码批量修改工具')
        self.setGeometry(100, 100, 800, 600)
        # 创建中央部件和布局
        central_widget = QWidget()
        self.setCentralWidget(central_widget)
        layout = QVBoxLayout(central_widget)
        # 创建选项卡
        tabs = QTabWidget()
        layout.addWidget(tabs)
        # 配置选项卡
        config_tab = QWidget()
        config_layout = QVBoxLayout(config_tab)
        # Excel文件选择
        excel_group = QGroupBox("Excel配置文件")
        excel_layout = QHBoxLayout(excel_group)
        self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH)
        excel_browse_btn = QPushButton("浏览...")
        excel_browse_btn.clicked.connect(self.browse_excel_file)
        excel_layout.addWidget(QLabel("Excel文件路径:"))
        excel_layout.addWidget(self.excel_path_edit)
        excel_layout.addWidget(excel_browse_btn)
        config_layout.addWidget(excel_group)
        # 新密码设置
        password_group = QGroupBox("新密码设置")
        password_layout = QHBoxLayout(password_group)
        self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD)
        self.password_edit.setEchoMode(QLineEdit.Password)
        password_layout.addWidget(QLabel("新密码:"))
        password_layout.addWidget(self.password_edit)
        config_layout.addWidget(password_group)
        # 项目选择
        projects_group = QGroupBox("项目选择")
        projects_layout = QVBoxLayout(projects_group)
        self.projects_list = QListWidget()
        self.load_projects_btn = QPushButton("加载项目")
        self.load_projects_btn.clicked.connect(self.load_projects)
        projects_layout.addWidget(self.projects_list)
        projects_layout.addWidget(self.load_projects_btn)
        config_layout.addWidget(projects_group)
        # 操作模式选择
        mode_group = QGroupBox("操作模式")
        mode_layout = QVBoxLayout(mode_group)
        self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)")
        self.specific_users_radio = QRadioButton("指定用户名更新")
        self.all_users_radio.setChecked(True)
        # 用户名输入
        self.usernames_edit = QLineEdit()
        self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔")
        self.usernames_edit.setEnabled(False)
        # 连接信号
        self.all_users_radio.toggled.connect(
            lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked()))
        self.specific_users_radio.toggled.connect(
            lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked()))
        mode_layout.addWidget(self.all_users_radio)
        mode_layout.addWidget(self.specific_users_radio)
        mode_layout.addWidget(self.usernames_edit)
        config_layout.addWidget(mode_group)
        # 进度条
        self.progress_bar = QProgressBar()
        config_layout.addWidget(self.progress_bar)
        # 开始按钮
        self.start_btn = QPushButton("开始更新")
        self.start_btn.clicked.connect(self.start_update)
        config_layout.addWidget(self.start_btn)
        # 日志选项卡
        log_tab = QWidget()
        log_layout = QVBoxLayout(log_tab)
        self.log_text = QTextEdit()
        self.log_text.setReadOnly(True)
        log_layout.addWidget(QLabel("操作日志:"))
        log_layout.addWidget(self.log_text)
        # 导出选项卡
        export_tab = QWidget()
        export_layout = QVBoxLayout(export_tab)
        self.export_list = QListWidget()
        self.export_btn = QPushButton("导出选中项目用户信息")
        self.export_btn.clicked.connect(self.export_user_info)
        export_layout.addWidget(QLabel("已更新项目列表:"))
        export_layout.addWidget(self.export_list)
        export_layout.addWidget(self.export_btn)
        # 添加选项卡
        tabs.addTab(config_tab, "配置")
        tabs.addTab(log_tab, "日志")
        tabs.addTab(export_tab, "导出")
    def browse_excel_file(self):
        """浏览Excel文件"""
        file_path, _ = QFileDialog.getOpenFileName(
            self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)"
        )
        if file_path:
            self.excel_path_edit.setText(file_path)
    def load_projects(self):
        """加载项目列表"""
        excel_path = self.excel_path_edit.text()
        if not os.path.exists(excel_path):
            QMessageBox.warning(self, "错误", "Excel文件不存在")
            return
        try:
            df = pd.read_excel(excel_path)
            projects = df['项目'].unique().tolist()
            self.projects_list.clear()
            for project in projects:
                item = QListWidgetItem(project)
                item.setCheckState(Qt.Unchecked)
                self.projects_list.addItem(item)
            self.log_text.append(f"已加载 {len(projects)} 个项目")
        except Exception as e:
            QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}")
    def start_update(self):
        """开始更新"""
        # 获取选中的项目
        selected_projects = []
        for i in range(self.projects_list.count()):
            item = self.projects_list.item(i)
            if item.checkState() == Qt.Checked:
                selected_projects.append(item.text())
        if not selected_projects:
            QMessageBox.warning(self, "错误", "请至少选择一个项目")
            return
        # 获取操作模式
        if self.all_users_radio.isChecked():
            mode = 'all'
            user_filter = None
        else:
            mode = 'specific'
            usernames = self.usernames_edit.text().strip()
            if not usernames:
                QMessageBox.warning(self, "错误", "请输入要更新的用户名")
                return
            user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
        # 获取新密码
        new_password = self.password_edit.text()
        if not new_password:
            QMessageBox.warning(self, "错误", "请输入新密码")
            return
        # 创建并启动更新线程
        self.updater_thread = DatabaseUpdater(
            self.excel_path_edit.text(),
            new_password,
            mode,
            user_filter,
            selected_projects
        )
        # 连接信号
        self.updater_thread.log_signal.connect(self.log_text.append)
        self.updater_thread.progress_signal.connect(self.progress_bar.setValue)
        self.updater_thread.finished_signal.connect(self.on_update_finished)
        self.updater_thread.error_signal.connect(self.show_error)
        self.updater_thread.export_finished_signal.connect(self.log_text.append)
        # 禁用开始按钮
        self.start_btn.setEnabled(False)
        # 启动线程
        self.updater_thread.start()
    def on_update_finished(self):
        """更新完成后的处理"""
        self.start_btn.setEnabled(True)
        self.progress_bar.setValue(100)
        # 更新导出列表
        if self.updater_thread and self.updater_thread.updaters:
            self.export_list.clear()
            for updater in self.updater_thread.updaters:
                if updater.updated_users:
                    item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)")
                    item.setData(Qt.UserRole, updater.project)
                    self.export_list.addItem(item)
    def export_user_info(self):
        """导出用户信息"""
        selected_items = self.export_list.selectedItems()
        if not selected_items:
            QMessageBox.warning(self, "错误", "请选择要导出的项目")
            return
        if not self.updater_thread:
            QMessageBox.warning(self, "错误", "没有可导出的数据")
            return
        for item in selected_items:
            project_name = item.data(Qt.UserRole)
            self.log_text.append(f"开始导出项目 {project_name} 的用户信息...")
            self.updater_thread.export_user_info(project_name)
    def show_error(self, error_msg):
        """显示错误信息"""
        QMessageBox.critical(self, "错误", error_msg)
        self.start_btn.setEnabled(True)
    def closeEvent(self, event):
        """关闭应用程序事件"""
        if self.updater_thread and self.updater_thread.isRunning():
            reply = QMessageBox.question(
                self, '确认退出',
                '更新操作正在进行中,确定要退出吗?',
                QMessageBox.Yes | QMessageBox.No,
                QMessageBox.No
            )
            if reply == QMessageBox.Yes:
                self.updater_thread.stop()
                self.updater_thread.wait()
                event.accept()
            else:
                event.ignore()
        else:
            event.accept()
def main():
    """主函数"""
    app = QApplication(sys.argv)
    window = PasswordUpdaterApp()
    window.show()
    sys.exit(app.exec_())
if __name__ == '__main__':
    main()
测试组/脚本/Change_password/密码更新工具.spec
New file
@@ -0,0 +1,38 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
    ['修改数据库的哈希密码和原始密码做桌面客户端.py'],
    pathex=[],
    binaries=[],
    datas=[('dbExcel', 'dbExcel')],
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    noarchive=False,
    optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
    pyz,
    a.scripts,
    a.binaries,
    a.datas,
    [],
    name='密码更新工具',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    upx_exclude=[],
    runtime_tmpdir=None,
    console=False,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
)
测试组/脚本/Change_password/批量更新数据库用户密码脚本使用手册.docx
Binary files differ
测试组/脚本/Change_password/更新日志.csv
Diff too large
测试组/脚本/Change_password/用户信息导出/上海中医药_shutcm测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/上海交通大学_sjtu测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/上海市一测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/上海脑所_bsbii测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/上海药明测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/中国科学院脑科学与智能技术卓越创新中心测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/中洪博元测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/云南中医药大学_ynucm测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/云南大学_ynu测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/仪器测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/北京大学人民医院_pkuph测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/北京大学口腔医院_bjmu测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/北京脑所测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/华西医院_wchscu测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/华西海圻测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/南京鼓楼医院_njglyy测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/成都中医药_cdutcm测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/昆明理工测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/江南大学_jiangnan测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/河北大学测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/爱尔眼科测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/福建省妇幼保健院_fjsfy测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/西南大学测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/重庆三峡医药高专_sxyyc测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/陕西师范大学_snnu测试环境账号密码_20250821.xlsx
Binary files differ
测试组/脚本/Change_password/用户信息导出/齐鲁医院_qilu测试环境账号密码_20250821.xlsx
Binary files differ