新增客户端gui界面
新增argon2id'加密方式,适配部分项目的加密方式调整
31 files added
4 files modified
| | |
| | | <?xml version="1.0" encoding="UTF-8"?> |
| | | <project version="4"> |
| | | <component name="Black"> |
| | | <option name="sdkName" value="G:\Anaconda3" /> |
| | | </component> |
| | | <component name="ProjectRootManager" version="2" project-jdk-name="G:\Anaconda3" project-jdk-type="Python SDK" /> |
| | | </project> |
| New file |
| | |
| | | import sys |
| | | import os |
| | | import pandas as pd |
| | | import bcrypt |
| | | import mysql.connector |
| | | from mysql.connector import Error |
| | | import asyncio |
| | | import threading |
| | | from datetime import datetime |
| | | from typing import List, Dict, Any, Optional |
| | | import re |
| | | from PyQt5 import QtWidgets, QtCore, QtGui |
| | | from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, |
| | | QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget, |
| | | QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar, |
| | | QGroupBox, QRadioButton, QButtonGroup, QTabWidget) |
| | | from PyQt5.QtCore import Qt, pyqtSignal, QThread |
| | | |
| | | # 判断是否是打包后的可执行文件 |
| | | if getattr(sys, 'frozen', False): |
| | | # 如果是可执行文件,使用可执行文件所在目录作为基础路径 |
| | | BASE_DIR = os.path.dirname(sys.executable) |
| | | else: |
| | | # 如果是脚本,使用脚本所在目录 |
| | | BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| | | |
| | | # 修改默认路径配置 |
| | | DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx') |
| | | DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv') |
| | | DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出') |
| | | DEFAULT_NEW_PASSWORD = 'Baoyi@1341' |
| | | PROTECTED_USER_ID = 1 |
| | | |
| | | |
| | | class DatabaseUpdater(QThread): |
| | | """数据库更新器类,运行在单独的线程中""" |
| | | |
| | | # 定义信号 |
| | | log_signal = pyqtSignal(str) |
| | | progress_signal = pyqtSignal(int) |
| | | finished_signal = pyqtSignal() |
| | | error_signal = pyqtSignal(str) |
| | | export_finished_signal = pyqtSignal(str) |
| | | |
| | | def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None): |
| | | super().__init__() |
| | | self.excel_path = excel_path |
| | | self.new_password = new_password |
| | | self.mode = mode # 'all' 或 'specific' |
| | | self.user_filter = user_filter |
| | | self.selected_projects = selected_projects |
| | | self.updaters = [] |
| | | self.log_records = [] |
| | | self.is_running = True |
| | | |
| | | def run(self): |
| | | """主运行方法""" |
| | | try: |
| | | # 读取 Excel 数据 |
| | | try: |
| | | df = pd.read_excel(self.excel_path) |
| | | except Exception as e: |
| | | self.error_signal.emit(f"读取Excel文件失败:{e}") |
| | | return |
| | | |
| | | # 获取所有唯一项目列表 |
| | | all_projects = df['项目'].unique().tolist() |
| | | if not all_projects: |
| | | self.error_signal.emit("Excel 文件中未找到任何项目") |
| | | return |
| | | |
| | | # 过滤 DataFrame |
| | | if self.selected_projects: |
| | | df_filtered = df[df['项目'].isin(self.selected_projects)] |
| | | else: |
| | | df_filtered = df |
| | | |
| | | if df_filtered.empty: |
| | | self.error_signal.emit("筛选后无有效项目,请检查输入") |
| | | return |
| | | |
| | | # 创建数据库更新器实例 |
| | | self.updaters = [] |
| | | for _, row in df_filtered.iterrows(): |
| | | project, host, database, user, password = row[:5] |
| | | self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database}") |
| | | updater = ProjectUpdater(project, host, database, user, password) |
| | | self.updaters.append(updater) |
| | | |
| | | # 执行更新任务 |
| | | total_projects = len(self.updaters) |
| | | for i, updater in enumerate(self.updaters): |
| | | if not self.is_running: |
| | | break |
| | | |
| | | self.log_signal.emit(f"正在处理项目: {updater.project}") |
| | | |
| | | # 运行更新 |
| | | success = updater.update_database( |
| | | user_filter=self.user_filter if self.mode == 'specific' else None, |
| | | new_password=self.new_password |
| | | ) |
| | | |
| | | if success: |
| | | self.log_records.extend(updater.log_records) |
| | | self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户") |
| | | else: |
| | | self.log_signal.emit(f"❌ {updater.project} 更新失败") |
| | | |
| | | # 更新进度 |
| | | self.progress_signal.emit(int((i + 1) * 100 / total_projects)) |
| | | |
| | | # 保存日志 |
| | | if self.log_records: |
| | | pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig') |
| | | self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}") |
| | | else: |
| | | self.log_signal.emit("无任何更新记录生成") |
| | | |
| | | except Exception as e: |
| | | self.error_signal.emit(f"运行过程中发生错误: {str(e)}") |
| | | finally: |
| | | self.finished_signal.emit() |
| | | |
| | | def stop(self): |
| | | """停止运行""" |
| | | self.is_running = False |
| | | |
| | | def export_user_info(self, project_name): |
| | | """导出用户信息""" |
| | | try: |
| | | # 找到对应的项目更新器 |
| | | updater = None |
| | | for u in self.updaters: |
| | | if u.project == project_name: |
| | | updater = u |
| | | break |
| | | |
| | | if not updater or not updater.updated_users: |
| | | self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息") |
| | | return |
| | | |
| | | # 连接到数据库并导出 |
| | | conn = updater.connect() |
| | | if conn: |
| | | cursor = conn.cursor() |
| | | success = updater.export_user_info(cursor, self.new_password) |
| | | cursor.close() |
| | | conn.close() |
| | | |
| | | if success: |
| | | self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成") |
| | | else: |
| | | self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败") |
| | | else: |
| | | self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库") |
| | | |
| | | except Exception as e: |
| | | self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}") |
| | | |
| | | |
| | | class ProjectUpdater: |
| | | """单个项目更新器""" |
| | | |
| | | def __init__(self, project, host, database, user, password): |
| | | self.project = project |
| | | self.host = host |
| | | self.database = database |
| | | self.db_user = user |
| | | self.db_password = password |
| | | self.updated_users = [] |
| | | self.log_records = [] |
| | | |
| | | def connect(self): |
| | | """建立数据库连接""" |
| | | try: |
| | | conn = mysql.connector.connect( |
| | | host=self.host, |
| | | port=3306, |
| | | user=self.db_user, |
| | | password=self.db_password, |
| | | database=self.database, |
| | | use_pure=True |
| | | ) |
| | | return conn |
| | | except Error as e: |
| | | return None |
| | | |
| | | def check_plain_text_column(self, cursor): |
| | | """检查是否存在 plain_text 字段""" |
| | | try: |
| | | cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'") |
| | | return cursor.fetchone() is not None |
| | | except Error: |
| | | return False |
| | | |
| | | def get_users_to_update(self, cursor, user_filter=None): |
| | | """获取需要更新的用户列表""" |
| | | try: |
| | | # 构造基础查询语句 |
| | | base_sql = """ |
| | | SELECT id, login_name, name, current_role_id |
| | | FROM sys_user |
| | | WHERE id != %s |
| | | """ # 默认排除保护用户 |
| | | |
| | | params = [PROTECTED_USER_ID] |
| | | |
| | | # 添加用户筛选条件 |
| | | if isinstance(user_filter, list) and user_filter: # 用户名列表模式 |
| | | placeholders = ','.join(['%s'] * len(user_filter)) |
| | | base_sql += f" AND login_name IN ({placeholders})" |
| | | params.extend(user_filter) |
| | | |
| | | cursor.execute(base_sql, params) |
| | | return cursor.fetchall() |
| | | except Error: |
| | | return [] |
| | | |
| | | def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password): |
| | | """更新用户密码""" |
| | | try: |
| | | salt = bcrypt.gensalt(rounds=10) |
| | | hashed = bcrypt.hashpw(new_password.encode('utf-8'), salt).decode('utf-8') |
| | | |
| | | # 构造更新 SQL |
| | | if has_plain_text: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET plain_text = %s, password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (new_password, hashed, user_id)) |
| | | else: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (hashed, user_id)) |
| | | |
| | | return True |
| | | except Error: |
| | | return False |
| | | |
| | | def get_role_name(self, cursor, role_id): |
| | | """根据角色ID获取角色名称""" |
| | | try: |
| | | if not role_id: |
| | | return "无角色" |
| | | |
| | | cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,)) |
| | | result = cursor.fetchone() |
| | | return result[0] if result else "角色不存在" |
| | | except Error: |
| | | return "获取失败" |
| | | |
| | | def export_user_info(self, cursor, new_password): |
| | | """导出用户信息到Excel""" |
| | | try: |
| | | if not self.updated_users: |
| | | return False |
| | | |
| | | # 确保导出目录存在 |
| | | os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True) |
| | | |
| | | # 获取所有角色ID |
| | | role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id'])) |
| | | |
| | | # 获取角色名称映射 |
| | | role_name_map = {} |
| | | for role_id in role_ids: |
| | | role_name = self.get_role_name(cursor, role_id) |
| | | role_name_map[role_id] = role_name |
| | | |
| | | # 准备导出数据 |
| | | export_data = [] |
| | | for user in self.updated_users: |
| | | export_data.append({ |
| | | '用户名': user['login_name'], |
| | | '密码': new_password, |
| | | '姓名': user['name'], |
| | | '角色名称': role_name_map.get(user['role_id'], '无角色') |
| | | }) |
| | | |
| | | # 创建DataFrame并导出 |
| | | df_export = pd.DataFrame(export_data) |
| | | |
| | | # 按照要求格式化文件名:项目名称测试环境账号密码_年月日 |
| | | safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project) |
| | | safe_project_name = safe_project_name.replace(" ", "_") |
| | | date_str = datetime.now().strftime("%Y%m%d") |
| | | filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx" |
| | | filepath = os.path.join(DEFAULT_EXPORT_DIR, filename) |
| | | |
| | | df_export.to_excel(filepath, index=False) |
| | | return True |
| | | |
| | | except Exception: |
| | | return False |
| | | |
| | | def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD): |
| | | """更新数据库中的用户密码""" |
| | | conn = None |
| | | cursor = None |
| | | try: |
| | | conn = self.connect() |
| | | if not conn: |
| | | return False |
| | | |
| | | # 使用同步方式创建游标 |
| | | cursor = conn.cursor() |
| | | |
| | | # 检查是否有 plain_text 字段 |
| | | has_plain_text = self.check_plain_text_column(cursor) |
| | | |
| | | # 获取需要更新的用户 |
| | | users = self.get_users_to_update(cursor, user_filter) |
| | | |
| | | if not users: |
| | | return False |
| | | |
| | | # 更新每个用户的密码 |
| | | for uid, login_name, name, current_role_id in users: |
| | | success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password) |
| | | |
| | | if success: |
| | | # 记录更新成功的用户信息 |
| | | self.updated_users.append({ |
| | | 'id': uid, |
| | | 'login_name': login_name, |
| | | 'name': name, |
| | | 'role_id': current_role_id |
| | | }) |
| | | |
| | | # 写入日志 |
| | | self.log_records.append({ |
| | | '项目': self.project, |
| | | '数据库': self.database, |
| | | '用户ID': uid, |
| | | '用户名': login_name, |
| | | '明文密码': new_password if has_plain_text else '未更新', |
| | | '哈希密码': bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt(rounds=10)).decode( |
| | | 'utf-8') |
| | | }) |
| | | |
| | | # 提交事务 |
| | | conn.commit() |
| | | return True |
| | | |
| | | except Error: |
| | | return False |
| | | finally: |
| | | if cursor: |
| | | cursor.close() |
| | | if conn: |
| | | conn.close() |
| | | |
| | | |
| | | class PasswordUpdaterApp(QMainWindow): |
| | | """主应用程序窗口""" |
| | | |
| | | def __init__(self): |
| | | super().__init__() |
| | | self.updater_thread = None |
| | | self.init_ui() |
| | | |
| | | def init_ui(self): |
| | | """初始化用户界面""" |
| | | self.setWindowTitle('数据库密码批量修改工具') |
| | | self.setGeometry(100, 100, 800, 600) |
| | | |
| | | # 创建中央部件和布局 |
| | | central_widget = QWidget() |
| | | self.setCentralWidget(central_widget) |
| | | layout = QVBoxLayout(central_widget) |
| | | |
| | | # 创建选项卡 |
| | | tabs = QTabWidget() |
| | | layout.addWidget(tabs) |
| | | |
| | | # 配置选项卡 |
| | | config_tab = QWidget() |
| | | config_layout = QVBoxLayout(config_tab) |
| | | |
| | | # Excel文件选择 |
| | | excel_group = QGroupBox("Excel配置文件") |
| | | excel_layout = QHBoxLayout(excel_group) |
| | | self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH) |
| | | excel_browse_btn = QPushButton("浏览...") |
| | | excel_browse_btn.clicked.connect(self.browse_excel_file) |
| | | excel_layout.addWidget(QLabel("Excel文件路径:")) |
| | | excel_layout.addWidget(self.excel_path_edit) |
| | | excel_layout.addWidget(excel_browse_btn) |
| | | config_layout.addWidget(excel_group) |
| | | |
| | | # 新密码设置 |
| | | password_group = QGroupBox("新密码设置") |
| | | password_layout = QHBoxLayout(password_group) |
| | | self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD) |
| | | self.password_edit.setEchoMode(QLineEdit.Password) |
| | | password_layout.addWidget(QLabel("新密码:")) |
| | | password_layout.addWidget(self.password_edit) |
| | | config_layout.addWidget(password_group) |
| | | |
| | | # 项目选择 |
| | | projects_group = QGroupBox("项目选择") |
| | | projects_layout = QVBoxLayout(projects_group) |
| | | self.projects_list = QListWidget() |
| | | self.load_projects_btn = QPushButton("加载项目") |
| | | self.load_projects_btn.clicked.connect(self.load_projects) |
| | | projects_layout.addWidget(self.projects_list) |
| | | projects_layout.addWidget(self.load_projects_btn) |
| | | config_layout.addWidget(projects_group) |
| | | |
| | | # 操作模式选择 |
| | | mode_group = QGroupBox("操作模式") |
| | | mode_layout = QVBoxLayout(mode_group) |
| | | self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)") |
| | | self.specific_users_radio = QRadioButton("指定用户名更新") |
| | | self.all_users_radio.setChecked(True) |
| | | |
| | | # 用户名输入 |
| | | self.usernames_edit = QLineEdit() |
| | | self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔") |
| | | self.usernames_edit.setEnabled(False) |
| | | |
| | | # 连接信号 |
| | | self.all_users_radio.toggled.connect( |
| | | lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked())) |
| | | self.specific_users_radio.toggled.connect( |
| | | lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked())) |
| | | |
| | | mode_layout.addWidget(self.all_users_radio) |
| | | mode_layout.addWidget(self.specific_users_radio) |
| | | mode_layout.addWidget(self.usernames_edit) |
| | | config_layout.addWidget(mode_group) |
| | | |
| | | # 进度条 |
| | | self.progress_bar = QProgressBar() |
| | | config_layout.addWidget(self.progress_bar) |
| | | |
| | | # 开始按钮 |
| | | self.start_btn = QPushButton("开始更新") |
| | | self.start_btn.clicked.connect(self.start_update) |
| | | config_layout.addWidget(self.start_btn) |
| | | |
| | | # 日志选项卡 |
| | | log_tab = QWidget() |
| | | log_layout = QVBoxLayout(log_tab) |
| | | self.log_text = QTextEdit() |
| | | self.log_text.setReadOnly(True) |
| | | log_layout.addWidget(QLabel("操作日志:")) |
| | | log_layout.addWidget(self.log_text) |
| | | |
| | | # 导出选项卡 |
| | | export_tab = QWidget() |
| | | export_layout = QVBoxLayout(export_tab) |
| | | self.export_list = QListWidget() |
| | | self.export_btn = QPushButton("导出选中项目用户信息") |
| | | self.export_btn.clicked.connect(self.export_user_info) |
| | | export_layout.addWidget(QLabel("已更新项目列表:")) |
| | | export_layout.addWidget(self.export_list) |
| | | export_layout.addWidget(self.export_btn) |
| | | |
| | | # 添加选项卡 |
| | | tabs.addTab(config_tab, "配置") |
| | | tabs.addTab(log_tab, "日志") |
| | | tabs.addTab(export_tab, "导出") |
| | | |
| | | def browse_excel_file(self): |
| | | """浏览Excel文件""" |
| | | file_path, _ = QFileDialog.getOpenFileName( |
| | | self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)" |
| | | ) |
| | | if file_path: |
| | | self.excel_path_edit.setText(file_path) |
| | | |
| | | def load_projects(self): |
| | | """加载项目列表""" |
| | | excel_path = self.excel_path_edit.text() |
| | | if not os.path.exists(excel_path): |
| | | QMessageBox.warning(self, "错误", "Excel文件不存在") |
| | | return |
| | | |
| | | try: |
| | | df = pd.read_excel(excel_path) |
| | | projects = df['项目'].unique().tolist() |
| | | |
| | | self.projects_list.clear() |
| | | for project in projects: |
| | | item = QListWidgetItem(project) |
| | | item.setCheckState(Qt.Unchecked) |
| | | self.projects_list.addItem(item) |
| | | |
| | | self.log_text.append(f"已加载 {len(projects)} 个项目") |
| | | except Exception as e: |
| | | QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}") |
| | | |
| | | def start_update(self): |
| | | """开始更新""" |
| | | # 获取选中的项目 |
| | | selected_projects = [] |
| | | for i in range(self.projects_list.count()): |
| | | item = self.projects_list.item(i) |
| | | if item.checkState() == Qt.Checked: |
| | | selected_projects.append(item.text()) |
| | | |
| | | if not selected_projects: |
| | | QMessageBox.warning(self, "错误", "请至少选择一个项目") |
| | | return |
| | | |
| | | # 获取操作模式 |
| | | if self.all_users_radio.isChecked(): |
| | | mode = 'all' |
| | | user_filter = None |
| | | else: |
| | | mode = 'specific' |
| | | usernames = self.usernames_edit.text().strip() |
| | | if not usernames: |
| | | QMessageBox.warning(self, "错误", "请输入要更新的用户名") |
| | | return |
| | | user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')] |
| | | |
| | | # 获取新密码 |
| | | new_password = self.password_edit.text() |
| | | if not new_password: |
| | | QMessageBox.warning(self, "错误", "请输入新密码") |
| | | return |
| | | |
| | | # 创建并启动更新线程 |
| | | self.updater_thread = DatabaseUpdater( |
| | | self.excel_path_edit.text(), |
| | | new_password, |
| | | mode, |
| | | user_filter, |
| | | selected_projects |
| | | ) |
| | | |
| | | # 连接信号 |
| | | self.updater_thread.log_signal.connect(self.log_text.append) |
| | | self.updater_thread.progress_signal.connect(self.progress_bar.setValue) |
| | | self.updater_thread.finished_signal.connect(self.on_update_finished) |
| | | self.updater_thread.error_signal.connect(self.show_error) |
| | | self.updater_thread.export_finished_signal.connect(self.log_text.append) |
| | | |
| | | # 禁用开始按钮 |
| | | self.start_btn.setEnabled(False) |
| | | |
| | | # 启动线程 |
| | | self.updater_thread.start() |
| | | |
| | | def on_update_finished(self): |
| | | """更新完成后的处理""" |
| | | self.start_btn.setEnabled(True) |
| | | self.progress_bar.setValue(100) |
| | | |
| | | # 更新导出列表 |
| | | if self.updater_thread and self.updater_thread.updaters: |
| | | self.export_list.clear() |
| | | for updater in self.updater_thread.updaters: |
| | | if updater.updated_users: |
| | | item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)") |
| | | item.setData(Qt.UserRole, updater.project) |
| | | self.export_list.addItem(item) |
| | | |
| | | def export_user_info(self): |
| | | """导出用户信息""" |
| | | selected_items = self.export_list.selectedItems() |
| | | if not selected_items: |
| | | QMessageBox.warning(self, "错误", "请选择要导出的项目") |
| | | return |
| | | |
| | | if not self.updater_thread: |
| | | QMessageBox.warning(self, "错误", "没有可导出的数据") |
| | | return |
| | | |
| | | for item in selected_items: |
| | | project_name = item.data(Qt.UserRole) |
| | | self.log_text.append(f"开始导出项目 {project_name} 的用户信息...") |
| | | self.updater_thread.export_user_info(project_name) |
| | | |
| | | def show_error(self, error_msg): |
| | | """显示错误信息""" |
| | | QMessageBox.critical(self, "错误", error_msg) |
| | | self.start_btn.setEnabled(True) |
| | | |
| | | def closeEvent(self, event): |
| | | """关闭应用程序事件""" |
| | | if self.updater_thread and self.updater_thread.isRunning(): |
| | | reply = QMessageBox.question( |
| | | self, '确认退出', |
| | | '更新操作正在进行中,确定要退出吗?', |
| | | QMessageBox.Yes | QMessageBox.No, |
| | | QMessageBox.No |
| | | ) |
| | | |
| | | if reply == QMessageBox.Yes: |
| | | self.updater_thread.stop() |
| | | self.updater_thread.wait() |
| | | event.accept() |
| | | else: |
| | | event.ignore() |
| | | else: |
| | | event.accept() |
| | | |
| | | |
| | | def main(): |
| | | """主函数""" |
| | | app = QApplication(sys.argv) |
| | | window = PasswordUpdaterApp() |
| | | window.show() |
| | | sys.exit(app.exec_()) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | main() |
| New file |
| | |
| | | import sys |
| | | import os |
| | | import pandas as pd |
| | | import bcrypt |
| | | import mysql.connector |
| | | from mysql.connector import Error |
| | | import asyncio |
| | | import threading |
| | | from datetime import datetime |
| | | from typing import List, Dict, Any, Optional |
| | | import re |
| | | from PyQt5 import QtWidgets, QtCore, QtGui |
| | | from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, |
| | | QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget, |
| | | QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar, |
| | | QGroupBox, QRadioButton, QButtonGroup, QTabWidget) |
| | | from PyQt5.QtCore import Qt, pyqtSignal, QThread |
| | | |
| | | # 判断是否是打包后的可执行文件 |
| | | if getattr(sys, 'frozen', False): |
| | | # 如果是可执行文件,使用可执行文件所在目录作为基础路径 |
| | | BASE_DIR = os.path.dirname(sys.executable) |
| | | else: |
| | | # 如果是脚本,使用脚本所在目录 |
| | | BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| | | |
| | | # 修改默认路径配置 |
| | | DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx') |
| | | DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv') |
| | | DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出') |
| | | DEFAULT_NEW_PASSWORD = 'Baoyi@1341' |
| | | PROTECTED_USER_ID = 1 |
| | | |
| | | |
| | | class DatabaseUpdater(QThread): |
| | | """数据库更新器类,运行在单独的线程中""" |
| | | |
| | | # 定义信号 |
| | | log_signal = pyqtSignal(str) |
| | | progress_signal = pyqtSignal(int) |
| | | finished_signal = pyqtSignal() |
| | | error_signal = pyqtSignal(str) |
| | | export_finished_signal = pyqtSignal(str) |
| | | |
| | | def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None): |
| | | super().__init__() |
| | | self.excel_path = excel_path |
| | | self.new_password = new_password |
| | | self.mode = mode # 'all' 或 'specific' |
| | | self.user_filter = user_filter |
| | | self.selected_projects = selected_projects |
| | | self.updaters = [] |
| | | self.log_records = [] |
| | | self.is_running = True |
| | | |
| | | def run(self): |
| | | """主运行方法""" |
| | | try: |
| | | # 读取 Excel 数据 |
| | | try: |
| | | df = pd.read_excel(self.excel_path) |
| | | except Exception as e: |
| | | self.error_signal.emit(f"读取Excel文件失败:{e}") |
| | | return |
| | | |
| | | # 获取所有唯一项目列表 |
| | | all_projects = df['项目'].unique().tolist() |
| | | if not all_projects: |
| | | self.error_signal.emit("Excel 文件中未找到任何项目") |
| | | return |
| | | |
| | | # 过滤 DataFrame |
| | | if self.selected_projects: |
| | | df_filtered = df[df['项目'].isin(self.selected_projects)] |
| | | else: |
| | | df_filtered = df |
| | | |
| | | if df_filtered.empty: |
| | | self.error_signal.emit("筛选后无有效项目,请检查输入") |
| | | return |
| | | |
| | | # 创建数据库更新器实例 |
| | | self.updaters = [] |
| | | for _, row in df_filtered.iterrows(): |
| | | project, host, database, user, password = row[:5] |
| | | self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database}") |
| | | updater = ProjectUpdater(project, host, database, user, password) |
| | | self.updaters.append(updater) |
| | | |
| | | # 执行更新任务 |
| | | total_projects = len(self.updaters) |
| | | for i, updater in enumerate(self.updaters): |
| | | if not self.is_running: |
| | | break |
| | | |
| | | self.log_signal.emit(f"正在处理项目: {updater.project}") |
| | | |
| | | # 运行更新 |
| | | success = updater.update_database( |
| | | user_filter=self.user_filter if self.mode == 'specific' else None, |
| | | new_password=self.new_password |
| | | ) |
| | | |
| | | if success: |
| | | self.log_records.extend(updater.log_records) |
| | | self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户") |
| | | else: |
| | | self.log_signal.emit(f"❌ {updater.project} 更新失败") |
| | | |
| | | # 更新进度 |
| | | self.progress_signal.emit(int((i + 1) * 100 / total_projects)) |
| | | |
| | | # 保存日志 |
| | | if self.log_records: |
| | | pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig') |
| | | self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}") |
| | | else: |
| | | self.log_signal.emit("无任何更新记录生成") |
| | | |
| | | except Exception as e: |
| | | self.error_signal.emit(f"运行过程中发生错误: {str(e)}") |
| | | finally: |
| | | self.finished_signal.emit() |
| | | |
| | | def stop(self): |
| | | """停止运行""" |
| | | self.is_running = False |
| | | |
| | | def export_user_info(self, project_name): |
| | | """导出用户信息""" |
| | | try: |
| | | # 找到对应的项目更新器 |
| | | updater = None |
| | | for u in self.updaters: |
| | | if u.project == project_name: |
| | | updater = u |
| | | break |
| | | |
| | | if not updater or not updater.updated_users: |
| | | self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息") |
| | | return |
| | | |
| | | # 连接到数据库并导出 |
| | | conn = updater.connect() |
| | | if conn: |
| | | cursor = conn.cursor() |
| | | success = updater.export_user_info(cursor, self.new_password) |
| | | cursor.close() |
| | | conn.close() |
| | | |
| | | if success: |
| | | self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成") |
| | | else: |
| | | self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败") |
| | | else: |
| | | self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库") |
| | | |
| | | except Exception as e: |
| | | self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}") |
| | | |
| | | |
| | | class ProjectUpdater: |
| | | """单个项目更新器""" |
| | | |
| | | def __init__(self, project, host, database, user, password): |
| | | self.project = project |
| | | self.host = host |
| | | self.database = database |
| | | self.db_user = user |
| | | self.db_password = password |
| | | self.updated_users = [] |
| | | self.log_records = [] |
| | | |
| | | def connect(self): |
| | | """建立数据库连接""" |
| | | try: |
| | | conn = mysql.connector.connect( |
| | | host=self.host, |
| | | port=3306, |
| | | user=self.db_user, |
| | | password=self.db_password, |
| | | database=self.database, |
| | | use_pure=True |
| | | ) |
| | | return conn |
| | | except Error as e: |
| | | return None |
| | | |
| | | def check_plain_text_column(self, cursor): |
| | | """检查是否存在 plain_text 字段""" |
| | | try: |
| | | cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'") |
| | | return cursor.fetchone() is not None |
| | | except Error: |
| | | return False |
| | | |
| | | def get_users_to_update(self, cursor, user_filter=None): |
| | | """获取需要更新的用户列表""" |
| | | try: |
| | | # 构造基础查询语句 |
| | | base_sql = """ |
| | | SELECT id, login_name, name, current_role_id |
| | | FROM sys_user |
| | | WHERE id != %s |
| | | """ # 默认排除保护用户 |
| | | |
| | | params = [PROTECTED_USER_ID] |
| | | |
| | | # 添加用户筛选条件 |
| | | if isinstance(user_filter, list) and user_filter: # 用户名列表模式 |
| | | placeholders = ','.join(['%s'] * len(user_filter)) |
| | | base_sql += f" AND login_name IN ({placeholders})" |
| | | params.extend(user_filter) |
| | | |
| | | cursor.execute(base_sql, params) |
| | | return cursor.fetchall() |
| | | except Error: |
| | | return [] |
| | | |
| | | def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password): |
| | | """更新用户密码""" |
| | | try: |
| | | salt = bcrypt.gensalt(rounds=10) |
| | | hashed = bcrypt.hashpw(new_password.encode('utf-8'), salt).decode('utf-8') |
| | | |
| | | # 构造更新 SQL |
| | | if has_plain_text: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET plain_text = %s, password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (new_password, hashed, user_id)) |
| | | else: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (hashed, user_id)) |
| | | |
| | | return True |
| | | except Error: |
| | | return False |
| | | |
| | | def get_role_name(self, cursor, role_id): |
| | | """根据角色ID获取角色名称""" |
| | | try: |
| | | if not role_id: |
| | | return "无角色" |
| | | |
| | | cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,)) |
| | | result = cursor.fetchone() |
| | | return result[0] if result else "角色不存在" |
| | | except Error: |
| | | return "获取失败" |
| | | |
| | | def export_user_info(self, cursor, new_password): |
| | | """导出用户信息到Excel""" |
| | | try: |
| | | if not self.updated_users: |
| | | return False |
| | | |
| | | # 确保导出目录存在 |
| | | os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True) |
| | | |
| | | # 获取所有角色ID |
| | | role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id'])) |
| | | |
| | | # 获取角色名称映射 |
| | | role_name_map = {} |
| | | for role_id in role_ids: |
| | | role_name = self.get_role_name(cursor, role_id) |
| | | role_name_map[role_id] = role_name |
| | | |
| | | # 准备导出数据 |
| | | export_data = [] |
| | | for user in self.updated_users: |
| | | export_data.append({ |
| | | '用户名': user['login_name'], |
| | | '密码': new_password, |
| | | '姓名': user['name'], |
| | | '角色名称': role_name_map.get(user['role_id'], '无角色') |
| | | }) |
| | | |
| | | # 创建DataFrame并导出 |
| | | df_export = pd.DataFrame(export_data) |
| | | |
| | | # 按照要求格式化文件名:项目名称测试环境账号密码_年月日 |
| | | safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project) |
| | | safe_project_name = safe_project_name.replace(" ", "_") |
| | | date_str = datetime.now().strftime("%Y%m%d") |
| | | filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx" |
| | | filepath = os.path.join(DEFAULT_EXPORT_DIR, filename) |
| | | |
| | | df_export.to_excel(filepath, index=False) |
| | | return True |
| | | |
| | | except Exception: |
| | | return False |
| | | |
| | | def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD): |
| | | """更新数据库中的用户密码""" |
| | | conn = None |
| | | cursor = None |
| | | try: |
| | | conn = self.connect() |
| | | if not conn: |
| | | return False |
| | | |
| | | # 使用同步方式创建游标 |
| | | cursor = conn.cursor() |
| | | |
| | | # 检查是否有 plain_text 字段 |
| | | has_plain_text = self.check_plain_text_column(cursor) |
| | | |
| | | # 获取需要更新的用户 |
| | | users = self.get_users_to_update(cursor, user_filter) |
| | | |
| | | if not users: |
| | | return False |
| | | |
| | | # 更新每个用户的密码 |
| | | for uid, login_name, name, current_role_id in users: |
| | | success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password) |
| | | |
| | | if success: |
| | | # 记录更新成功的用户信息 |
| | | self.updated_users.append({ |
| | | 'id': uid, |
| | | 'login_name': login_name, |
| | | 'name': name, |
| | | 'role_id': current_role_id |
| | | }) |
| | | |
| | | # 写入日志 |
| | | self.log_records.append({ |
| | | '项目': self.project, |
| | | '数据库': self.database, |
| | | '用户ID': uid, |
| | | '用户名': login_name, |
| | | '明文密码': new_password if has_plain_text else '未更新', |
| | | '哈希密码': bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt(rounds=10)).decode( |
| | | 'utf-8') |
| | | }) |
| | | |
| | | # 提交事务 |
| | | conn.commit() |
| | | return True |
| | | |
| | | except Error: |
| | | return False |
| | | finally: |
| | | if cursor: |
| | | cursor.close() |
| | | if conn: |
| | | conn.close() |
| | | |
| | | |
| | | class PasswordUpdaterApp(QMainWindow): |
| | | """主应用程序窗口""" |
| | | |
| | | def __init__(self): |
| | | super().__init__() |
| | | self.updater_thread = None |
| | | self.init_ui() |
| | | |
| | | def init_ui(self): |
| | | """初始化用户界面""" |
| | | self.setWindowTitle('数据库密码批量修改工具') |
| | | self.setGeometry(100, 100, 800, 600) |
| | | |
| | | # 创建中央部件和布局 |
| | | central_widget = QWidget() |
| | | self.setCentralWidget(central_widget) |
| | | layout = QVBoxLayout(central_widget) |
| | | |
| | | # 创建选项卡 |
| | | tabs = QTabWidget() |
| | | layout.addWidget(tabs) |
| | | |
| | | # 配置选项卡 |
| | | config_tab = QWidget() |
| | | config_layout = QVBoxLayout(config_tab) |
| | | |
| | | # Excel文件选择 |
| | | excel_group = QGroupBox("Excel配置文件") |
| | | excel_layout = QHBoxLayout(excel_group) |
| | | self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH) |
| | | excel_browse_btn = QPushButton("浏览...") |
| | | excel_browse_btn.clicked.connect(self.browse_excel_file) |
| | | excel_layout.addWidget(QLabel("Excel文件路径:")) |
| | | excel_layout.addWidget(self.excel_path_edit) |
| | | excel_layout.addWidget(excel_browse_btn) |
| | | config_layout.addWidget(excel_group) |
| | | |
| | | # 新密码设置 |
| | | password_group = QGroupBox("新密码设置") |
| | | password_layout = QHBoxLayout(password_group) |
| | | self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD) |
| | | self.password_edit.setEchoMode(QLineEdit.Password) |
| | | password_layout.addWidget(QLabel("新密码:")) |
| | | password_layout.addWidget(self.password_edit) |
| | | config_layout.addWidget(password_group) |
| | | |
| | | # 项目选择 |
| | | projects_group = QGroupBox("项目选择") |
| | | projects_layout = QVBoxLayout(projects_group) |
| | | self.projects_list = QListWidget() |
| | | self.load_projects_btn = QPushButton("加载项目") |
| | | self.load_projects_btn.clicked.connect(self.load_projects) |
| | | projects_layout.addWidget(self.projects_list) |
| | | projects_layout.addWidget(self.load_projects_btn) |
| | | config_layout.addWidget(projects_group) |
| | | |
| | | # 操作模式选择 |
| | | mode_group = QGroupBox("操作模式") |
| | | mode_layout = QVBoxLayout(mode_group) |
| | | self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)") |
| | | self.specific_users_radio = QRadioButton("指定用户名更新") |
| | | self.all_users_radio.setChecked(True) |
| | | |
| | | # 用户名输入 |
| | | self.usernames_edit = QLineEdit() |
| | | self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔") |
| | | self.usernames_edit.setEnabled(False) |
| | | |
| | | # 连接信号 |
| | | self.all_users_radio.toggled.connect( |
| | | lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked())) |
| | | self.specific_users_radio.toggled.connect( |
| | | lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked())) |
| | | |
| | | mode_layout.addWidget(self.all_users_radio) |
| | | mode_layout.addWidget(self.specific_users_radio) |
| | | mode_layout.addWidget(self.usernames_edit) |
| | | config_layout.addWidget(mode_group) |
| | | |
| | | # 进度条 |
| | | self.progress_bar = QProgressBar() |
| | | config_layout.addWidget(self.progress_bar) |
| | | |
| | | # 开始按钮 |
| | | self.start_btn = QPushButton("开始更新") |
| | | self.start_btn.clicked.connect(self.start_update) |
| | | config_layout.addWidget(self.start_btn) |
| | | |
| | | # 日志选项卡 |
| | | log_tab = QWidget() |
| | | log_layout = QVBoxLayout(log_tab) |
| | | self.log_text = QTextEdit() |
| | | self.log_text.setReadOnly(True) |
| | | log_layout.addWidget(QLabel("操作日志:")) |
| | | log_layout.addWidget(self.log_text) |
| | | |
| | | # 导出选项卡 |
| | | export_tab = QWidget() |
| | | export_layout = QVBoxLayout(export_tab) |
| | | self.export_list = QListWidget() |
| | | self.export_btn = QPushButton("导出选中项目用户信息") |
| | | self.export_btn.clicked.connect(self.export_user_info) |
| | | export_layout.addWidget(QLabel("已更新项目列表:")) |
| | | export_layout.addWidget(self.export_list) |
| | | export_layout.addWidget(self.export_btn) |
| | | |
| | | # 添加选项卡 |
| | | tabs.addTab(config_tab, "配置") |
| | | tabs.addTab(log_tab, "日志") |
| | | tabs.addTab(export_tab, "导出") |
| | | |
| | | def browse_excel_file(self): |
| | | """浏览Excel文件""" |
| | | file_path, _ = QFileDialog.getOpenFileName( |
| | | self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)" |
| | | ) |
| | | if file_path: |
| | | self.excel_path_edit.setText(file_path) |
| | | |
| | | def load_projects(self): |
| | | """加载项目列表""" |
| | | excel_path = self.excel_path_edit.text() |
| | | if not os.path.exists(excel_path): |
| | | QMessageBox.warning(self, "错误", "Excel文件不存在") |
| | | return |
| | | |
| | | try: |
| | | df = pd.read_excel(excel_path) |
| | | projects = df['项目'].unique().tolist() |
| | | |
| | | self.projects_list.clear() |
| | | for project in projects: |
| | | item = QListWidgetItem(project) |
| | | item.setCheckState(Qt.Unchecked) |
| | | self.projects_list.addItem(item) |
| | | |
| | | self.log_text.append(f"已加载 {len(projects)} 个项目") |
| | | except Exception as e: |
| | | QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}") |
| | | |
| | | def start_update(self): |
| | | """开始更新""" |
| | | # 获取选中的项目 |
| | | selected_projects = [] |
| | | for i in range(self.projects_list.count()): |
| | | item = self.projects_list.item(i) |
| | | if item.checkState() == Qt.Checked: |
| | | selected_projects.append(item.text()) |
| | | |
| | | if not selected_projects: |
| | | QMessageBox.warning(self, "错误", "请至少选择一个项目") |
| | | return |
| | | |
| | | # 获取操作模式 |
| | | if self.all_users_radio.isChecked(): |
| | | mode = 'all' |
| | | user_filter = None |
| | | else: |
| | | mode = 'specific' |
| | | usernames = self.usernames_edit.text().strip() |
| | | if not usernames: |
| | | QMessageBox.warning(self, "错误", "请输入要更新的用户名") |
| | | return |
| | | user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')] |
| | | |
| | | # 获取新密码 |
| | | new_password = self.password_edit.text() |
| | | if not new_password: |
| | | QMessageBox.warning(self, "错误", "请输入新密码") |
| | | return |
| | | |
| | | # 创建并启动更新线程 |
| | | self.updater_thread = DatabaseUpdater( |
| | | self.excel_path_edit.text(), |
| | | new_password, |
| | | mode, |
| | | user_filter, |
| | | selected_projects |
| | | ) |
| | | |
| | | # 连接信号 |
| | | self.updater_thread.log_signal.connect(self.log_text.append) |
| | | self.updater_thread.progress_signal.connect(self.progress_bar.setValue) |
| | | self.updater_thread.finished_signal.connect(self.on_update_finished) |
| | | self.updater_thread.error_signal.connect(self.show_error) |
| | | self.updater_thread.export_finished_signal.connect(self.log_text.append) |
| | | |
| | | # 禁用开始按钮 |
| | | self.start_btn.setEnabled(False) |
| | | |
| | | # 启动线程 |
| | | self.updater_thread.start() |
| | | |
| | | def on_update_finished(self): |
| | | """更新完成后的处理""" |
| | | self.start_btn.setEnabled(True) |
| | | self.progress_bar.setValue(100) |
| | | |
| | | # 更新导出列表 |
| | | if self.updater_thread and self.updater_thread.updaters: |
| | | self.export_list.clear() |
| | | for updater in self.updater_thread.updaters: |
| | | if updater.updated_users: |
| | | item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)") |
| | | item.setData(Qt.UserRole, updater.project) |
| | | self.export_list.addItem(item) |
| | | |
| | | def export_user_info(self): |
| | | """导出用户信息""" |
| | | selected_items = self.export_list.selectedItems() |
| | | if not selected_items: |
| | | QMessageBox.warning(self, "错误", "请选择要导出的项目") |
| | | return |
| | | |
| | | if not self.updater_thread: |
| | | QMessageBox.warning(self, "错误", "没有可导出的数据") |
| | | return |
| | | |
| | | for item in selected_items: |
| | | project_name = item.data(Qt.UserRole) |
| | | self.log_text.append(f"开始导出项目 {project_name} 的用户信息...") |
| | | self.updater_thread.export_user_info(project_name) |
| | | |
| | | def show_error(self, error_msg): |
| | | """显示错误信息""" |
| | | QMessageBox.critical(self, "错误", error_msg) |
| | | self.start_btn.setEnabled(True) |
| | | |
| | | def closeEvent(self, event): |
| | | """关闭应用程序事件""" |
| | | if self.updater_thread and self.updater_thread.isRunning(): |
| | | reply = QMessageBox.question( |
| | | self, '确认退出', |
| | | '更新操作正在进行中,确定要退出吗?', |
| | | QMessageBox.Yes | QMessageBox.No, |
| | | QMessageBox.No |
| | | ) |
| | | |
| | | if reply == QMessageBox.Yes: |
| | | self.updater_thread.stop() |
| | | self.updater_thread.wait() |
| | | event.accept() |
| | | else: |
| | | event.ignore() |
| | | else: |
| | | event.accept() |
| | | |
| | | |
| | | def main(): |
| | | """主函数""" |
| | | app = QApplication(sys.argv) |
| | | window = PasswordUpdaterApp() |
| | | window.show() |
| | | sys.exit(app.exec_()) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | main() |
| New file |
| | |
| | | import sys |
| | | import os |
| | | import pandas as pd |
| | | import mysql.connector |
| | | from mysql.connector import Error |
| | | from datetime import datetime |
| | | from typing import List, Dict, Any, Optional |
| | | import re |
| | | from PyQt5 import QtWidgets, QtCore, QtGui |
| | | from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, |
| | | QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget, |
| | | QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar, |
| | | QGroupBox, QRadioButton, QButtonGroup, QTabWidget) |
| | | from PyQt5.QtCore import Qt, pyqtSignal, QThread |
| | | |
| | | # 判断是否是打包后的可执行文件 |
| | | if getattr(sys, 'frozen', False): |
| | | # 如果是可执行文件,使用可执行文件所在目录作为基础路径 |
| | | BASE_DIR = os.path.dirname(sys.executable) |
| | | else: |
| | | # 如果是脚本,使用脚本所在目录 |
| | | BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| | | |
| | | # 修改默认路径配置 |
| | | DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx') |
| | | DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv') |
| | | DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出') |
| | | DEFAULT_NEW_PASSWORD = 'Baoyi@1341' |
| | | PROTECTED_USER_ID = 1 |
| | | |
| | | # 导入Argon2PasswordEncoder相关库 |
| | | try: |
| | | from argon2 import PasswordHasher |
| | | except ImportError: |
| | | print("请安装argon2-cffi库: pip install argon2-cffi") |
| | | sys.exit(1) |
| | | |
| | | # 配置Argon2参数 |
| | | SALT_LENGTH = 32 |
| | | HASH_LENGTH = 64 |
| | | PARALLELISM = 2 |
| | | MEMORY = 65536 # 64MB |
| | | ITERATIONS = 5 |
| | | |
| | | # 创建Argon2密码编码器实例 |
| | | argon2_hasher = PasswordHasher( |
| | | time_cost=ITERATIONS, |
| | | memory_cost=MEMORY, |
| | | parallelism=PARALLELISM, |
| | | hash_len=HASH_LENGTH, |
| | | salt_len=SALT_LENGTH |
| | | ) |
| | | |
| | | def encode_password(raw_password): |
| | | """使用Argon2编码密码""" |
| | | return argon2_hasher.hash(raw_password) |
| | | |
| | | class DatabaseUpdater(QThread): |
| | | """数据库更新器类,运行在单独的线程中""" |
| | | |
| | | # 定义信号 |
| | | log_signal = pyqtSignal(str) |
| | | progress_signal = pyqtSignal(int) |
| | | finished_signal = pyqtSignal() |
| | | error_signal = pyqtSignal(str) |
| | | export_finished_signal = pyqtSignal(str) |
| | | |
| | | def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None): |
| | | super().__init__() |
| | | self.excel_path = excel_path |
| | | self.new_password = new_password |
| | | self.mode = mode # 'all' 或 'specific' |
| | | self.user_filter = user_filter |
| | | self.selected_projects = selected_projects |
| | | self.updaters = [] |
| | | self.log_records = [] |
| | | self.is_running = True |
| | | |
| | | def run(self): |
| | | """主运行方法""" |
| | | try: |
| | | # 读取 Excel 数据 |
| | | try: |
| | | df = pd.read_excel(self.excel_path) |
| | | except Exception as e: |
| | | self.error_signal.emit(f"读取Excel文件失败:{e}") |
| | | return |
| | | |
| | | # 获取所有唯一项目列表 |
| | | all_projects = df['项目'].unique().tolist() |
| | | if not all_projects: |
| | | self.error_signal.emit("Excel 文件中未找到任何项目") |
| | | return |
| | | |
| | | # 过滤 DataFrame |
| | | if self.selected_projects: |
| | | df_filtered = df[df['项目'].isin(self.selected_projects)] |
| | | else: |
| | | df_filtered = df |
| | | |
| | | if df_filtered.empty: |
| | | self.error_signal.emit("筛选后无有效项目,请检查输入") |
| | | return |
| | | |
| | | # 创建数据库更新器实例 |
| | | self.updaters = [] |
| | | for _, row in df_filtered.iterrows(): |
| | | project, host, database, user, password = row[:5] |
| | | self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database}") |
| | | updater = ProjectUpdater(project, host, database, user, password) |
| | | self.updaters.append(updater) |
| | | |
| | | # 执行更新任务 |
| | | total_projects = len(self.updaters) |
| | | for i, updater in enumerate(self.updaters): |
| | | if not self.is_running: |
| | | break |
| | | |
| | | self.log_signal.emit(f"正在处理项目: {updater.project}") |
| | | |
| | | # 运行更新 |
| | | success = updater.update_database( |
| | | user_filter=self.user_filter if self.mode == 'specific' else None, |
| | | new_password=self.new_password |
| | | ) |
| | | |
| | | if success: |
| | | self.log_records.extend(updater.log_records) |
| | | self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户") |
| | | else: |
| | | self.log_signal.emit(f"❌ {updater.project} 更新失败") |
| | | |
| | | # 更新进度 |
| | | self.progress_signal.emit(int((i + 1) * 100 / total_projects)) |
| | | |
| | | # 保存日志 |
| | | if self.log_records: |
| | | pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig') |
| | | self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}") |
| | | else: |
| | | self.log_signal.emit("无任何更新记录生成") |
| | | |
| | | except Exception as e: |
| | | self.error_signal.emit(f"运行过程中发生错误: {str(e)}") |
| | | finally: |
| | | self.finished_signal.emit() |
| | | |
| | | def stop(self): |
| | | """停止运行""" |
| | | self.is_running = False |
| | | |
| | | def export_user_info(self, project_name): |
| | | """导出用户信息""" |
| | | try: |
| | | # 找到对应的项目更新器 |
| | | updater = None |
| | | for u in self.updaters: |
| | | if u.project == project_name: |
| | | updater = u |
| | | break |
| | | |
| | | if not updater or not updater.updated_users: |
| | | self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息") |
| | | return |
| | | |
| | | # 连接到数据库并导出 |
| | | conn = updater.connect() |
| | | if conn: |
| | | cursor = conn.cursor() |
| | | success = updater.export_user_info(cursor, self.new_password) |
| | | cursor.close() |
| | | conn.close() |
| | | |
| | | if success: |
| | | self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成") |
| | | else: |
| | | self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败") |
| | | else: |
| | | self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库") |
| | | |
| | | except Exception as e: |
| | | self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}") |
| | | |
| | | |
| | | class ProjectUpdater: |
| | | """单个项目更新器""" |
| | | |
| | | def __init__(self, project, host, database, user, password): |
| | | self.project = project |
| | | self.host = host |
| | | self.database = database |
| | | self.db_user = user |
| | | self.db_password = password |
| | | self.updated_users = [] |
| | | self.log_records = [] |
| | | |
| | | def connect(self): |
| | | """建立数据库连接""" |
| | | try: |
| | | conn = mysql.connector.connect( |
| | | host=self.host, |
| | | port=3306, |
| | | user=self.db_user, |
| | | password=self.db_password, |
| | | database=self.database, |
| | | use_pure=True |
| | | ) |
| | | return conn |
| | | except Error as e: |
| | | return None |
| | | |
| | | def check_plain_text_column(self, cursor): |
| | | """检查是否存在 plain_text 字段""" |
| | | try: |
| | | cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'") |
| | | return cursor.fetchone() is not None |
| | | except Error: |
| | | return False |
| | | |
| | | def get_users_to_update(self, cursor, user_filter=None): |
| | | """获取需要更新的用户列表""" |
| | | try: |
| | | # 构造基础查询语句 |
| | | base_sql = """ |
| | | SELECT id, login_name, name, current_role_id |
| | | FROM sys_user |
| | | WHERE id != %s |
| | | """ # 默认排除保护用户 |
| | | |
| | | params = [PROTECTED_USER_ID] |
| | | |
| | | # 添加用户筛选条件 |
| | | if isinstance(user_filter, list) and user_filter: # 用户名列表模式 |
| | | placeholders = ','.join(['%s'] * len(user_filter)) |
| | | base_sql += f" AND login_name IN ({placeholders})" |
| | | params.extend(user_filter) |
| | | |
| | | cursor.execute(base_sql, params) |
| | | return cursor.fetchall() |
| | | except Error: |
| | | return [] |
| | | |
| | | def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password): |
| | | """更新用户密码""" |
| | | try: |
| | | # 使用Argon2生成哈希密码 |
| | | hashed = encode_password(new_password) |
| | | |
| | | # 构造更新 SQL |
| | | if has_plain_text: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET plain_text = %s, password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (new_password, hashed, user_id)) |
| | | else: |
| | | update_sql = """ |
| | | UPDATE sys_user |
| | | SET password = %s |
| | | WHERE id = %s |
| | | """ |
| | | cursor.execute(update_sql, (hashed, user_id)) |
| | | |
| | | return True |
| | | except Error: |
| | | return False |
| | | |
| | | def get_role_name(self, cursor, role_id): |
| | | """根据角色ID获取角色名称""" |
| | | try: |
| | | if not role_id: |
| | | return "无角色" |
| | | |
| | | cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,)) |
| | | result = cursor.fetchone() |
| | | return result[0] if result else "角色不存在" |
| | | except Error: |
| | | return "获取失败" |
| | | |
| | | def export_user_info(self, cursor, new_password): |
| | | """导出用户信息到Excel""" |
| | | try: |
| | | if not self.updated_users: |
| | | return False |
| | | |
| | | # 确保导出目录存在 |
| | | os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True) |
| | | |
| | | # 获取所有角色ID |
| | | role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id'])) |
| | | |
| | | # 获取角色名称映射 |
| | | role_name_map = {} |
| | | for role_id in role_ids: |
| | | role_name = self.get_role_name(cursor, role_id) |
| | | role_name_map[role_id] = role_name |
| | | |
| | | # 准备导出数据 |
| | | export_data = [] |
| | | for user in self.updated_users: |
| | | export_data.append({ |
| | | '用户名': user['login_name'], |
| | | '密码': new_password, |
| | | '姓名': user['name'], |
| | | '角色名称': role_name_map.get(user['role_id'], '无角色') |
| | | }) |
| | | |
| | | # 创建DataFrame并导出 |
| | | df_export = pd.DataFrame(export_data) |
| | | |
| | | # 按照要求格式化文件名:项目名称测试环境账号密码_年月日 |
| | | safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project) |
| | | safe_project_name = safe_project_name.replace(" ", "_") |
| | | date_str = datetime.now().strftime("%Y%m%d") |
| | | filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx" |
| | | filepath = os.path.join(DEFAULT_EXPORT_DIR, filename) |
| | | |
| | | df_export.to_excel(filepath, index=False) |
| | | return True |
| | | |
| | | except Exception: |
| | | return False |
| | | |
| | | def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD): |
| | | """更新数据库中的用户密码""" |
| | | conn = None |
| | | cursor = None |
| | | try: |
| | | conn = self.connect() |
| | | if not conn: |
| | | return False |
| | | |
| | | # 使用同步方式创建游标 |
| | | cursor = conn.cursor() |
| | | |
| | | # 检查是否有 plain_text 字段 |
| | | has_plain_text = self.check_plain_text_column(cursor) |
| | | |
| | | # 获取需要更新的用户 |
| | | users = self.get_users_to_update(cursor, user_filter) |
| | | |
| | | if not users: |
| | | return False |
| | | |
| | | # 更新每个用户的密码 |
| | | for uid, login_name, name, current_role_id in users: |
| | | success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password) |
| | | |
| | | if success: |
| | | # 记录更新成功的用户信息 |
| | | self.updated_users.append({ |
| | | 'id': uid, |
| | | 'login_name': login_name, |
| | | 'name': name, |
| | | 'role_id': current_role_id |
| | | }) |
| | | |
| | | # 写入日志 |
| | | self.log_records.append({ |
| | | '项目': self.project, |
| | | '数据库': self.database, |
| | | '用户ID': uid, |
| | | '用户名': login_name, |
| | | '明文密码': new_password if has_plain_text else '未更新', |
| | | '哈希密码': encode_password(new_password) |
| | | }) |
| | | |
| | | # 提交事务 |
| | | conn.commit() |
| | | return True |
| | | |
| | | except Error: |
| | | return False |
| | | finally: |
| | | if cursor: |
| | | cursor.close() |
| | | if conn: |
| | | conn.close() |
| | | |
| | | |
| | | class PasswordUpdaterApp(QMainWindow): |
| | | """主应用程序窗口""" |
| | | |
| | | def __init__(self): |
| | | super().__init__() |
| | | self.updater_thread = None |
| | | self.init_ui() |
| | | |
| | | def init_ui(self): |
| | | """初始化用户界面""" |
| | | self.setWindowTitle('数据库密码批量修改工具') |
| | | self.setGeometry(100, 100, 800, 600) |
| | | |
| | | # 创建中央部件和布局 |
| | | central_widget = QWidget() |
| | | self.setCentralWidget(central_widget) |
| | | layout = QVBoxLayout(central_widget) |
| | | |
| | | # 创建选项卡 |
| | | tabs = QTabWidget() |
| | | layout.addWidget(tabs) |
| | | |
| | | # 配置选项卡 |
| | | config_tab = QWidget() |
| | | config_layout = QVBoxLayout(config_tab) |
| | | |
| | | # Excel文件选择 |
| | | excel_group = QGroupBox("Excel配置文件") |
| | | excel_layout = QHBoxLayout(excel_group) |
| | | self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH) |
| | | excel_browse_btn = QPushButton("浏览...") |
| | | excel_browse_btn.clicked.connect(self.browse_excel_file) |
| | | excel_layout.addWidget(QLabel("Excel文件路径:")) |
| | | excel_layout.addWidget(self.excel_path_edit) |
| | | excel_layout.addWidget(excel_browse_btn) |
| | | config_layout.addWidget(excel_group) |
| | | |
| | | # 新密码设置 |
| | | password_group = QGroupBox("新密码设置") |
| | | password_layout = QHBoxLayout(password_group) |
| | | self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD) |
| | | self.password_edit.setEchoMode(QLineEdit.Password) |
| | | password_layout.addWidget(QLabel("新密码:")) |
| | | password_layout.addWidget(self.password_edit) |
| | | config_layout.addWidget(password_group) |
| | | |
| | | # 项目选择 |
| | | projects_group = QGroupBox("项目选择") |
| | | projects_layout = QVBoxLayout(projects_group) |
| | | self.projects_list = QListWidget() |
| | | self.load_projects_btn = QPushButton("加载项目") |
| | | self.load_projects_btn.clicked.connect(self.load_projects) |
| | | projects_layout.addWidget(self.projects_list) |
| | | projects_layout.addWidget(self.load_projects_btn) |
| | | config_layout.addWidget(projects_group) |
| | | |
| | | # 操作模式选择 |
| | | mode_group = QGroupBox("操作模式") |
| | | mode_layout = QVBoxLayout(mode_group) |
| | | self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)") |
| | | self.specific_users_radio = QRadioButton("指定用户名更新") |
| | | self.all_users_radio.setChecked(True) |
| | | |
| | | # 用户名输入 |
| | | self.usernames_edit = QLineEdit() |
| | | self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔") |
| | | self.usernames_edit.setEnabled(False) |
| | | |
| | | # 连接信号 |
| | | self.all_users_radio.toggled.connect( |
| | | lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked())) |
| | | self.specific_users_radio.toggled.connect( |
| | | lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked())) |
| | | |
| | | mode_layout.addWidget(self.all_users_radio) |
| | | mode_layout.addWidget(self.specific_users_radio) |
| | | mode_layout.addWidget(self.usernames_edit) |
| | | config_layout.addWidget(mode_group) |
| | | |
| | | # 进度条 |
| | | self.progress_bar = QProgressBar() |
| | | config_layout.addWidget(self.progress_bar) |
| | | |
| | | # 开始按钮 |
| | | self.start_btn = QPushButton("开始更新") |
| | | self.start_btn.clicked.connect(self.start_update) |
| | | config_layout.addWidget(self.start_btn) |
| | | |
| | | # 日志选项卡 |
| | | log_tab = QWidget() |
| | | log_layout = QVBoxLayout(log_tab) |
| | | self.log_text = QTextEdit() |
| | | self.log_text.setReadOnly(True) |
| | | log_layout.addWidget(QLabel("操作日志:")) |
| | | log_layout.addWidget(self.log_text) |
| | | |
| | | # 导出选项卡 |
| | | export_tab = QWidget() |
| | | export_layout = QVBoxLayout(export_tab) |
| | | self.export_list = QListWidget() |
| | | self.export_btn = QPushButton("导出选中项目用户信息") |
| | | self.export_btn.clicked.connect(self.export_user_info) |
| | | export_layout.addWidget(QLabel("已更新项目列表:")) |
| | | export_layout.addWidget(self.export_list) |
| | | export_layout.addWidget(self.export_btn) |
| | | |
| | | # 添加选项卡 |
| | | tabs.addTab(config_tab, "配置") |
| | | tabs.addTab(log_tab, "日志") |
| | | tabs.addTab(export_tab, "导出") |
| | | |
| | | def browse_excel_file(self): |
| | | """浏览Excel文件""" |
| | | file_path, _ = QFileDialog.getOpenFileName( |
| | | self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)" |
| | | ) |
| | | if file_path: |
| | | self.excel_path_edit.setText(file_path) |
| | | |
| | | def load_projects(self): |
| | | """加载项目列表""" |
| | | excel_path = self.excel_path_edit.text() |
| | | if not os.path.exists(excel_path): |
| | | QMessageBox.warning(self, "错误", "Excel文件不存在") |
| | | return |
| | | |
| | | try: |
| | | df = pd.read_excel(excel_path) |
| | | projects = df['项目'].unique().tolist() |
| | | |
| | | self.projects_list.clear() |
| | | for project in projects: |
| | | item = QListWidgetItem(project) |
| | | item.setCheckState(Qt.Unchecked) |
| | | self.projects_list.addItem(item) |
| | | |
| | | self.log_text.append(f"已加载 {len(projects)} 个项目") |
| | | except Exception as e: |
| | | QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}") |
| | | |
| | | def start_update(self): |
| | | """开始更新""" |
| | | # 获取选中的项目 |
| | | selected_projects = [] |
| | | for i in range(self.projects_list.count()): |
| | | item = self.projects_list.item(i) |
| | | if item.checkState() == Qt.Checked: |
| | | selected_projects.append(item.text()) |
| | | |
| | | if not selected_projects: |
| | | QMessageBox.warning(self, "错误", "请至少选择一个项目") |
| | | return |
| | | |
| | | # 获取操作模式 |
| | | if self.all_users_radio.isChecked(): |
| | | mode = 'all' |
| | | user_filter = None |
| | | else: |
| | | mode = 'specific' |
| | | usernames = self.usernames_edit.text().strip() |
| | | if not usernames: |
| | | QMessageBox.warning(self, "错误", "请输入要更新的用户名") |
| | | return |
| | | user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')] |
| | | |
| | | # 获取新密码 |
| | | new_password = self.password_edit.text() |
| | | if not new_password: |
| | | QMessageBox.warning(self, "错误", "请输入新密码") |
| | | return |
| | | |
| | | # 创建并启动更新线程 |
| | | self.updater_thread = DatabaseUpdater( |
| | | self.excel_path_edit.text(), |
| | | new_password, |
| | | mode, |
| | | user_filter, |
| | | selected_projects |
| | | ) |
| | | |
| | | # 连接信号 |
| | | self.updater_thread.log_signal.connect(self.log_text.append) |
| | | self.updater_thread.progress_signal.connect(self.progress_bar.setValue) |
| | | self.updater_thread.finished_signal.connect(self.on_update_finished) |
| | | self.updater_thread.error_signal.connect(self.show_error) |
| | | self.updater_thread.export_finished_signal.connect(self.log_text.append) |
| | | |
| | | # 禁用开始按钮 |
| | | self.start_btn.setEnabled(False) |
| | | |
| | | # 启动线程 |
| | | self.updater_thread.start() |
| | | |
| | | def on_update_finished(self): |
| | | """更新完成后的处理""" |
| | | self.start_btn.setEnabled(True) |
| | | self.progress_bar.setValue(100) |
| | | |
| | | # 更新导出列表 |
| | | if self.updater_thread and self.updater_thread.updaters: |
| | | self.export_list.clear() |
| | | for updater in self.updater_thread.updaters: |
| | | if updater.updated_users: |
| | | item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)") |
| | | item.setData(Qt.UserRole, updater.project) |
| | | self.export_list.addItem(item) |
| | | |
| | | def export_user_info(self): |
| | | """导出用户信息""" |
| | | selected_items = self.export_list.selectedItems() |
| | | if not selected_items: |
| | | QMessageBox.warning(self, "错误", "请选择要导出的项目") |
| | | return |
| | | |
| | | if not self.updater_thread: |
| | | QMessageBox.warning(self, "错误", "没有可导出的数据") |
| | | return |
| | | |
| | | for item in selected_items: |
| | | project_name = item.data(Qt.UserRole) |
| | | self.log_text.append(f"开始导出项目 {project_name} 的用户信息...") |
| | | self.updater_thread.export_user_info(project_name) |
| | | |
| | | def show_error(self, error_msg): |
| | | """显示错误信息""" |
| | | QMessageBox.critical(self, "错误", error_msg) |
| | | self.start_btn.setEnabled(True) |
| | | |
| | | def closeEvent(self, event): |
| | | """关闭应用程序事件""" |
| | | if self.updater_thread and self.updater_thread.isRunning(): |
| | | reply = QMessageBox.question( |
| | | self, '确认退出', |
| | | '更新操作正在进行中,确定要退出吗?', |
| | | QMessageBox.Yes | QMessageBox.No, |
| | | QMessageBox.No |
| | | ) |
| | | |
| | | if reply == QMessageBox.Yes: |
| | | self.updater_thread.stop() |
| | | self.updater_thread.wait() |
| | | event.accept() |
| | | else: |
| | | event.ignore() |
| | | else: |
| | | event.accept() |
| | | |
| | | |
| | | def main(): |
| | | """主函数""" |
| | | app = QApplication(sys.argv) |
| | | window = PasswordUpdaterApp() |
| | | window.show() |
| | | sys.exit(app.exec_()) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | main() |
| New file |
| | |
| | | # -*- mode: python ; coding: utf-8 -*- |
| | | |
| | | |
| | | a = Analysis( |
| | | ['修改数据库的哈希密码和原始密码做桌面客户端.py'], |
| | | pathex=[], |
| | | binaries=[], |
| | | datas=[('dbExcel', 'dbExcel')], |
| | | hiddenimports=[], |
| | | hookspath=[], |
| | | hooksconfig={}, |
| | | runtime_hooks=[], |
| | | excludes=[], |
| | | noarchive=False, |
| | | optimize=0, |
| | | ) |
| | | pyz = PYZ(a.pure) |
| | | |
| | | exe = EXE( |
| | | pyz, |
| | | a.scripts, |
| | | a.binaries, |
| | | a.datas, |
| | | [], |
| | | name='密码更新工具', |
| | | debug=False, |
| | | bootloader_ignore_signals=False, |
| | | strip=False, |
| | | upx=True, |
| | | upx_exclude=[], |
| | | runtime_tmpdir=None, |
| | | console=False, |
| | | disable_windowed_traceback=False, |
| | | argv_emulation=False, |
| | | target_arch=None, |
| | | codesign_identity=None, |
| | | entitlements_file=None, |
| | | ) |