import sys
|
import os
|
import pandas as pd
|
import bcrypt
|
import mysql.connector
|
from mysql.connector import Error
|
import asyncio
|
import threading
|
from datetime import datetime
|
from typing import List, Dict, Any, Optional
|
import re
|
import locale
|
from PyQt5 import QtWidgets, QtCore, QtGui
|
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
|
QLabel, QLineEdit, QPushButton, QTextEdit, QListWidget,
|
QListWidgetItem, QCheckBox, QFileDialog, QMessageBox, QProgressBar,
|
QGroupBox, QRadioButton, QButtonGroup, QTabWidget)
|
from PyQt5.QtCore import Qt, pyqtSignal, QThread
|
|
# 【重要】在程序最开始就设置环境变量,避免任何pandas操作之前出现本地化问题
|
# 强制设置为C语言环境,这是最安全的通用设置
|
os.environ['LANG'] = 'C'
|
os.environ['LC_ALL'] = 'C'
|
os.environ['LC_CTYPE'] = 'C'
|
os.environ['LC_NUMERIC'] = 'C'
|
os.environ['LC_TIME'] = 'C'
|
os.environ['LC_COLLATE'] = 'C'
|
os.environ['LC_MONETARY'] = 'C'
|
os.environ['LC_MESSAGES'] = 'C'
|
os.environ['LC_PAPER'] = 'C'
|
os.environ['LC_NAME'] = 'C'
|
os.environ['LC_ADDRESS'] = 'C'
|
os.environ['LC_TELEPHONE'] = 'C'
|
os.environ['LC_MEASUREMENT'] = 'C'
|
os.environ['LC_IDENTIFICATION'] = 'C'
|
os.environ['PANDAS_USAGE_STATS'] = 'False'
|
|
# 尝试设置locale,但如果失败也不影响,因为已经设置了环境变量
|
try:
|
locale.setlocale(locale.LC_ALL, 'C')
|
except Exception:
|
pass
|
|
# 不再设置pandas选项,避免因版本差异导致的错误
|
# 只保留必要的环境变量设置,确保本地化不会出现问题
|
|
# 判断是否是打包后的可执行文件
|
if getattr(sys, 'frozen', False):
|
# 如果是可执行文件,使用可执行文件所在目录作为基础路径
|
BASE_DIR = os.path.dirname(sys.executable)
|
else:
|
# 如果是脚本,使用脚本所在目录
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
# 修改默认路径配置
|
DEFAULT_EXCEL_PATH = os.path.join(BASE_DIR, 'dbExcel', '数据库信息.xlsx')
|
DEFAULT_LOG_CSV_PATH = os.path.join(BASE_DIR, '更新日志.csv')
|
DEFAULT_EXPORT_DIR = os.path.join(BASE_DIR, '用户信息导出')
|
DEFAULT_NEW_PASSWORD = 'Baoyi@1341'
|
PROTECTED_USER_ID = 1
|
|
# Redis配置
|
REDIS_HOST = '192.168.6.168'
|
REDIS_PORT = 6002
|
REDIS_PASSWORD = None
|
|
|
class RedisRefresher:
|
"""Redis缓存刷新器"""
|
|
def __init__(self):
|
self.redis_client = None
|
|
def connect_redis(self):
|
"""连接Redis服务器"""
|
try:
|
# 尝试导入redis库
|
import redis
|
self.redis_client = redis.Redis(
|
host=REDIS_HOST,
|
port=REDIS_PORT,
|
password=REDIS_PASSWORD,
|
decode_responses=True
|
)
|
# 测试连接
|
self.redis_client.ping()
|
return True
|
except ImportError:
|
print("错误: 未安装redis库,请使用 'pip install redis' 安装")
|
return False
|
except Exception as e:
|
print(f"连接Redis失败: {str(e)}")
|
return False
|
|
def refresh_all_redis(self):
|
"""刷新所有Redis数据库"""
|
try:
|
if not self.redis_client:
|
if not self.connect_redis():
|
return False, "无法连接Redis服务器"
|
|
# 获取所有数据库数量
|
try:
|
# 尝试获取配置信息
|
config = self.redis_client.config_get('databases')
|
db_count = int(config.get('databases', 16))
|
except:
|
# 如果无法获取配置,默认使用16个数据库
|
db_count = 16
|
|
refreshed_dbs = 0
|
total_keys = 0
|
|
# 遍历所有数据库并刷新
|
for db_index in range(db_count):
|
try:
|
# 创建一个临时Redis客户端来操作指定数据库
|
import redis
|
temp_client = redis.Redis(
|
host=REDIS_HOST,
|
port=REDIS_PORT,
|
password=REDIS_PASSWORD,
|
db=db_index,
|
decode_responses=True
|
)
|
|
# 获取当前数据库的key数量
|
key_count = temp_client.dbsize()
|
|
if key_count > 0:
|
# 清空当前数据库
|
temp_client.flushdb()
|
refreshed_dbs += 1
|
total_keys += key_count
|
print(f"已刷新数据库 {db_index}: 清除了 {key_count} 个键")
|
|
except Exception as e:
|
print(f"刷新数据库 {db_index} 时出错: {str(e)}")
|
continue
|
|
return True, f"成功刷新 {refreshed_dbs} 个Redis数据库,共清除 {total_keys} 个键"
|
|
except Exception as e:
|
return False, f"刷新Redis缓存时发生错误: {str(e)}"
|
|
|
class DatabaseUpdater(QThread):
|
"""数据库更新器类,运行在单独的线程中"""
|
|
# 定义信号
|
log_signal = pyqtSignal(str)
|
progress_signal = pyqtSignal(int)
|
finished_signal = pyqtSignal()
|
error_signal = pyqtSignal(str)
|
export_finished_signal = pyqtSignal(str)
|
|
def __init__(self, excel_path, new_password, mode, user_filter=None, selected_projects=None):
|
super().__init__()
|
self.excel_path = excel_path
|
self.new_password = new_password
|
self.mode = mode # 'all' 或 'specific'
|
self.user_filter = user_filter
|
self.selected_projects = selected_projects
|
self.updaters = []
|
self.log_records = []
|
self.is_running = True
|
self.redis_refresher = RedisRefresher()
|
|
def run(self):
|
"""主运行方法"""
|
try:
|
# 保存日志时可能会出现本地化问题,这里临时设置环境变量
|
original_lang = os.environ.get('LANG')
|
original_lc_all = os.environ.get('LC_ALL')
|
|
# 强制设置为C语言环境,避免pandas本地化问题
|
os.environ['LANG'] = 'C'
|
os.environ['LC_ALL'] = 'C'
|
|
# 读取 Excel 数据
|
try:
|
df = pd.read_excel(self.excel_path)
|
except Exception as e:
|
self.error_signal.emit(f"读取Excel文件失败:{e}")
|
return
|
|
# 获取所有唯一项目列表
|
all_projects = df['项目'].unique().tolist()
|
if not all_projects:
|
self.error_signal.emit("Excel 文件中未找到任何项目")
|
return
|
|
# 过滤 DataFrame
|
if self.selected_projects:
|
df_filtered = df[df['项目'].isin(self.selected_projects)]
|
else:
|
df_filtered = df
|
|
if df_filtered.empty:
|
self.error_signal.emit("筛选后无有效项目,请检查输入")
|
return
|
|
# 创建数据库更新器实例
|
self.updaters = []
|
for _, row in df_filtered.iterrows():
|
project, host, port, database, user, password = row[:6]
|
self.log_signal.emit(f"准备处理项目:{project} | 数据库:{database} | 端口:{port}")
|
updater = ProjectUpdater(project, host, database, user, password, port)
|
self.updaters.append(updater)
|
|
# 执行更新任务
|
total_projects = len(self.updaters)
|
for i, updater in enumerate(self.updaters):
|
if not self.is_running:
|
break
|
|
self.log_signal.emit(f"正在处理项目: {updater.project}")
|
|
# 运行更新
|
success = updater.update_database(
|
user_filter=self.user_filter if self.mode == 'specific' else None,
|
new_password=self.new_password
|
)
|
|
if success:
|
self.log_records.extend(updater.log_records)
|
self.log_signal.emit(f"✅ {updater.project} 更新完成,共 {len(updater.updated_users)} 个用户")
|
else:
|
self.log_signal.emit(f"❌ {updater.project} 更新失败")
|
|
# 更新进度
|
self.progress_signal.emit(int((i + 1) * 100 / total_projects))
|
|
# 保存日志
|
if self.log_records:
|
try:
|
pd.DataFrame(self.log_records).to_csv(DEFAULT_LOG_CSV_PATH, index=False, encoding='utf-8-sig')
|
self.log_signal.emit(f"所有更新日志已保存至 {DEFAULT_LOG_CSV_PATH}")
|
except Exception as e:
|
self.log_signal.emit(f"保存日志失败:{str(e)}")
|
else:
|
self.log_signal.emit("无任何更新记录生成")
|
|
# 恢复原始环境变量
|
if original_lang:
|
os.environ['LANG'] = original_lang
|
else:
|
os.environ.pop('LANG', None)
|
|
if original_lc_all:
|
os.environ['LC_ALL'] = original_lc_all
|
else:
|
os.environ.pop('LC_ALL', None)
|
|
# 数据库更新完成后,自动刷新Redis缓存
|
self.log_signal.emit("开始刷新Redis缓存...")
|
success, message = self.redis_refresher.refresh_all_redis()
|
if success:
|
self.log_signal.emit(f"✅ {message}")
|
else:
|
self.log_signal.emit(f"❌ {message}")
|
|
except Exception as e:
|
self.error_signal.emit(f"运行过程中发生错误: {str(e)}")
|
finally:
|
self.finished_signal.emit()
|
|
def stop(self):
|
"""停止运行"""
|
self.is_running = False
|
|
def export_user_info(self, project_name):
|
"""导出用户信息"""
|
try:
|
# 找到对应的项目更新器
|
updater = None
|
for u in self.updaters:
|
if u.project == project_name:
|
updater = u
|
break
|
|
if not updater or not updater.updated_users:
|
self.error_signal.emit(f"项目 {project_name} 没有可导出的用户信息")
|
return
|
|
# 连接到数据库并导出
|
conn = updater.connect()
|
if conn:
|
cursor = conn.cursor()
|
success = updater.export_user_info(cursor, self.new_password)
|
cursor.close()
|
conn.close()
|
|
if success:
|
self.export_finished_signal.emit(f"项目 {project_name} 的用户信息导出完成")
|
else:
|
self.error_signal.emit(f"项目 {project_name} 的用户信息导出失败")
|
else:
|
self.error_signal.emit(f"无法连接到项目 {project_name} 的数据库")
|
|
except Exception as e:
|
self.error_signal.emit(f"导出用户信息时发生错误: {str(e)}")
|
|
|
class ProjectUpdater:
|
"""单个项目更新器"""
|
|
def __init__(self, project, host, database, user, password, port=3306):
|
self.project = project
|
self.host = host
|
self.database = database
|
self.db_user = user
|
self.db_password = password
|
self.port = port
|
self.updated_users = []
|
self.log_records = []
|
|
def connect(self):
|
"""建立数据库连接"""
|
try:
|
conn = mysql.connector.connect(
|
host=self.host,
|
port=self.port,
|
user=self.db_user,
|
password=self.db_password,
|
database=self.database,
|
use_pure=True
|
)
|
return conn
|
except Error as e:
|
return None
|
|
def check_plain_text_column(self, cursor):
|
"""检查是否存在 plain_text 字段"""
|
try:
|
cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
|
return cursor.fetchone() is not None
|
except Error:
|
return False
|
|
def get_users_to_update(self, cursor, user_filter=None):
|
"""获取需要更新的用户列表"""
|
try:
|
# 构造基础查询语句
|
base_sql = """
|
SELECT id, login_name, name, current_role_id
|
FROM sys_user
|
WHERE id != %s
|
""" # 默认排除保护用户
|
|
params = [PROTECTED_USER_ID]
|
|
# 添加用户筛选条件
|
if isinstance(user_filter, list) and user_filter: # 用户名列表模式
|
placeholders = ','.join(['%s'] * len(user_filter))
|
base_sql += f" AND login_name IN ({placeholders})"
|
params.extend(user_filter)
|
|
cursor.execute(base_sql, params)
|
return cursor.fetchall()
|
except Error:
|
return []
|
|
def update_user_password(self, cursor, user_id, login_name, has_plain_text, new_password):
|
"""更新用户密码"""
|
try:
|
salt = bcrypt.gensalt(rounds=10)
|
hashed = bcrypt.hashpw(new_password.encode('utf-8'), salt).decode('utf-8')
|
|
# 构造更新 SQL
|
if has_plain_text:
|
update_sql = """
|
UPDATE sys_user
|
SET plain_text = %s, password = %s
|
WHERE id = %s
|
"""
|
cursor.execute(update_sql, (new_password, hashed, user_id))
|
else:
|
update_sql = """
|
UPDATE sys_user
|
SET password = %s
|
WHERE id = %s
|
"""
|
cursor.execute(update_sql, (hashed, user_id))
|
|
return True
|
except Error:
|
return False
|
|
def get_role_name(self, cursor, role_id):
|
"""根据角色ID获取角色名称"""
|
try:
|
if not role_id:
|
return "无角色"
|
|
cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,))
|
result = cursor.fetchone()
|
return result[0] if result else "角色不存在"
|
except Error:
|
return "获取失败"
|
|
def export_user_info(self, cursor, new_password):
|
"""导出用户信息到Excel"""
|
try:
|
if not self.updated_users:
|
return False
|
|
# 确保导出目录存在
|
os.makedirs(DEFAULT_EXPORT_DIR, exist_ok=True)
|
|
# 获取所有角色ID
|
role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id']))
|
|
# 获取角色名称映射
|
role_name_map = {}
|
for role_id in role_ids:
|
role_name = self.get_role_name(cursor, role_id)
|
role_name_map[role_id] = role_name
|
|
# 准备导出数据
|
export_data = []
|
for user in self.updated_users:
|
export_data.append({
|
'用户名': user['login_name'],
|
'密码': new_password,
|
'姓名': user['name'],
|
'角色名称': role_name_map.get(user['role_id'], '无角色')
|
})
|
|
# 创建DataFrame并导出
|
df_export = pd.DataFrame(export_data)
|
|
# 按照要求格式化文件名:项目名称测试环境账号密码_年月日
|
safe_project_name = re.sub(r'[\\/*?:"<>|]', "", self.project)
|
safe_project_name = safe_project_name.replace(" ", "_")
|
date_str = datetime.now().strftime("%Y%m%d")
|
filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx"
|
filepath = os.path.join(DEFAULT_EXPORT_DIR, filename)
|
|
df_export.to_excel(filepath, index=False)
|
return True
|
|
except Exception:
|
return False
|
|
def update_database(self, user_filter=None, new_password=DEFAULT_NEW_PASSWORD):
|
"""更新数据库中的用户密码"""
|
conn = None
|
cursor = None
|
try:
|
conn = self.connect()
|
if not conn:
|
return False
|
|
# 使用同步方式创建游标
|
cursor = conn.cursor()
|
|
# 检查是否有 plain_text 字段
|
has_plain_text = self.check_plain_text_column(cursor)
|
|
# 获取需要更新的用户
|
users = self.get_users_to_update(cursor, user_filter)
|
|
if not users:
|
return False
|
|
# 更新每个用户的密码
|
for uid, login_name, name, current_role_id in users:
|
success = self.update_user_password(cursor, uid, login_name, has_plain_text, new_password)
|
|
if success:
|
# 记录更新成功的用户信息
|
self.updated_users.append({
|
'id': uid,
|
'login_name': login_name,
|
'name': name,
|
'role_id': current_role_id
|
})
|
|
# 写入日志
|
self.log_records.append({
|
'项目': self.project,
|
'数据库': self.database,
|
'用户ID': uid,
|
'用户名': login_name,
|
'明文密码': new_password if has_plain_text else '未更新',
|
'哈希密码': bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt(rounds=10)).decode(
|
'utf-8')
|
})
|
|
# 提交事务
|
conn.commit()
|
return True
|
|
except Error:
|
return False
|
finally:
|
if cursor:
|
cursor.close()
|
if conn:
|
conn.close()
|
|
|
class PasswordUpdaterApp(QMainWindow):
|
"""主应用程序窗口"""
|
|
def __init__(self):
|
super().__init__()
|
self.updater_thread = None
|
self.init_ui()
|
|
def init_ui(self):
|
"""初始化用户界面"""
|
self.setWindowTitle('数据库密码批量修改工具')
|
self.setGeometry(100, 100, 800, 600)
|
|
# 创建中央部件和布局
|
central_widget = QWidget()
|
self.setCentralWidget(central_widget)
|
layout = QVBoxLayout(central_widget)
|
|
# 创建选项卡
|
tabs = QTabWidget()
|
layout.addWidget(tabs)
|
|
# 配置选项卡
|
config_tab = QWidget()
|
config_layout = QVBoxLayout(config_tab)
|
|
# Excel文件选择
|
excel_group = QGroupBox("Excel配置文件")
|
excel_layout = QHBoxLayout(excel_group)
|
self.excel_path_edit = QLineEdit(DEFAULT_EXCEL_PATH)
|
excel_browse_btn = QPushButton("浏览...")
|
excel_browse_btn.clicked.connect(self.browse_excel_file)
|
excel_layout.addWidget(QLabel("Excel文件路径:"))
|
excel_layout.addWidget(self.excel_path_edit)
|
excel_layout.addWidget(excel_browse_btn)
|
config_layout.addWidget(excel_group)
|
|
# 新密码设置
|
password_group = QGroupBox("新密码设置")
|
password_layout = QHBoxLayout(password_group)
|
self.password_edit = QLineEdit(DEFAULT_NEW_PASSWORD)
|
self.password_edit.setEchoMode(QLineEdit.Password)
|
password_layout.addWidget(QLabel("新密码:"))
|
password_layout.addWidget(self.password_edit)
|
config_layout.addWidget(password_group)
|
|
# 项目选择
|
projects_group = QGroupBox("项目选择")
|
projects_layout = QVBoxLayout(projects_group)
|
self.projects_list = QListWidget()
|
self.load_projects_btn = QPushButton("加载项目")
|
self.load_projects_btn.clicked.connect(self.load_projects)
|
projects_layout.addWidget(self.projects_list)
|
projects_layout.addWidget(self.load_projects_btn)
|
config_layout.addWidget(projects_group)
|
|
# 操作模式选择
|
mode_group = QGroupBox("操作模式")
|
mode_layout = QVBoxLayout(mode_group)
|
self.all_users_radio = QRadioButton("批量更新所有用户(排除ID为1的用户)")
|
self.specific_users_radio = QRadioButton("指定用户名更新")
|
self.all_users_radio.setChecked(True)
|
|
# 用户名输入
|
self.usernames_edit = QLineEdit()
|
self.usernames_edit.setPlaceholderText("输入用户名,多个用逗号分隔")
|
self.usernames_edit.setEnabled(False)
|
|
# 连接信号
|
self.all_users_radio.toggled.connect(
|
lambda: self.usernames_edit.setEnabled(not self.all_users_radio.isChecked()))
|
self.specific_users_radio.toggled.connect(
|
lambda: self.usernames_edit.setEnabled(self.specific_users_radio.isChecked()))
|
|
mode_layout.addWidget(self.all_users_radio)
|
mode_layout.addWidget(self.specific_users_radio)
|
mode_layout.addWidget(self.usernames_edit)
|
config_layout.addWidget(mode_group)
|
|
# 进度条
|
self.progress_bar = QProgressBar()
|
config_layout.addWidget(self.progress_bar)
|
|
# 开始按钮
|
self.start_btn = QPushButton("开始更新")
|
self.start_btn.clicked.connect(self.start_update)
|
config_layout.addWidget(self.start_btn)
|
|
# 日志选项卡
|
log_tab = QWidget()
|
log_layout = QVBoxLayout(log_tab)
|
self.log_text = QTextEdit()
|
self.log_text.setReadOnly(True)
|
log_layout.addWidget(QLabel("操作日志:"))
|
log_layout.addWidget(self.log_text)
|
|
# 导出选项卡
|
export_tab = QWidget()
|
export_layout = QVBoxLayout(export_tab)
|
self.export_list = QListWidget()
|
self.export_btn = QPushButton("导出选中项目用户信息")
|
self.export_btn.clicked.connect(self.export_user_info)
|
export_layout.addWidget(QLabel("已更新项目列表:"))
|
export_layout.addWidget(self.export_list)
|
export_layout.addWidget(self.export_btn)
|
|
# 添加选项卡
|
tabs.addTab(config_tab, "配置")
|
tabs.addTab(log_tab, "日志")
|
tabs.addTab(export_tab, "导出")
|
|
def browse_excel_file(self):
|
"""浏览Excel文件"""
|
file_path, _ = QFileDialog.getOpenFileName(
|
self, "选择Excel文件", "", "Excel Files (*.xlsx *.xls)"
|
)
|
if file_path:
|
self.excel_path_edit.setText(file_path)
|
|
def load_projects(self):
|
"""加载项目列表"""
|
excel_path = self.excel_path_edit.text()
|
if not os.path.exists(excel_path):
|
QMessageBox.warning(self, "错误", "Excel文件不存在")
|
return
|
|
try:
|
df = pd.read_excel(excel_path)
|
projects = df['项目'].unique().tolist()
|
|
self.projects_list.clear()
|
for project in projects:
|
item = QListWidgetItem(project)
|
item.setCheckState(Qt.Unchecked)
|
self.projects_list.addItem(item)
|
|
self.log_text.append(f"已加载 {len(projects)} 个项目")
|
except Exception as e:
|
QMessageBox.warning(self, "错误", f"读取Excel文件失败: {str(e)}")
|
|
def start_update(self):
|
"""开始更新"""
|
# 获取选中的项目
|
selected_projects = []
|
for i in range(self.projects_list.count()):
|
item = self.projects_list.item(i)
|
if item.checkState() == Qt.Checked:
|
selected_projects.append(item.text())
|
|
if not selected_projects:
|
QMessageBox.warning(self, "错误", "请至少选择一个项目")
|
return
|
|
# 获取操作模式
|
if self.all_users_radio.isChecked():
|
mode = 'all'
|
user_filter = None
|
else:
|
mode = 'specific'
|
usernames = self.usernames_edit.text().strip()
|
if not usernames:
|
QMessageBox.warning(self, "错误", "请输入要更新的用户名")
|
return
|
user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
|
|
# 获取新密码
|
new_password = self.password_edit.text()
|
if not new_password:
|
QMessageBox.warning(self, "错误", "请输入新密码")
|
return
|
|
# 创建并启动更新线程
|
self.updater_thread = DatabaseUpdater(
|
self.excel_path_edit.text(),
|
new_password,
|
mode,
|
user_filter,
|
selected_projects
|
)
|
|
# 连接信号
|
self.updater_thread.log_signal.connect(self.log_text.append)
|
self.updater_thread.progress_signal.connect(self.progress_bar.setValue)
|
self.updater_thread.finished_signal.connect(self.on_update_finished)
|
self.updater_thread.error_signal.connect(self.show_error)
|
self.updater_thread.export_finished_signal.connect(self.log_text.append)
|
|
# 禁用开始按钮
|
self.start_btn.setEnabled(False)
|
|
# 启动线程
|
self.updater_thread.start()
|
|
def on_update_finished(self):
|
"""更新完成后的处理"""
|
self.start_btn.setEnabled(True)
|
self.progress_bar.setValue(100)
|
|
# 更新导出列表
|
if self.updater_thread and self.updater_thread.updaters:
|
self.export_list.clear()
|
for updater in self.updater_thread.updaters:
|
if updater.updated_users:
|
item = QListWidgetItem(f"{updater.project} ({len(updater.updated_users)} 用户)")
|
item.setData(Qt.UserRole, updater.project)
|
self.export_list.addItem(item)
|
|
def export_user_info(self):
|
"""导出用户信息"""
|
selected_items = self.export_list.selectedItems()
|
if not selected_items:
|
QMessageBox.warning(self, "错误", "请选择要导出的项目")
|
return
|
|
if not self.updater_thread:
|
QMessageBox.warning(self, "错误", "没有可导出的数据")
|
return
|
|
for item in selected_items:
|
project_name = item.data(Qt.UserRole)
|
self.log_text.append(f"开始导出项目 {project_name} 的用户信息...")
|
self.updater_thread.export_user_info(project_name)
|
|
def show_error(self, error_msg):
|
"""显示错误信息"""
|
QMessageBox.critical(self, "错误", error_msg)
|
self.start_btn.setEnabled(True)
|
|
def closeEvent(self, event):
|
"""关闭应用程序事件"""
|
if self.updater_thread and self.updater_thread.isRunning():
|
reply = QMessageBox.question(
|
self, '确认退出',
|
'更新操作正在进行中,确定要退出吗?',
|
QMessageBox.Yes | QMessageBox.No,
|
QMessageBox.No
|
)
|
|
if reply == QMessageBox.Yes:
|
self.updater_thread.stop()
|
self.updater_thread.wait()
|
event.accept()
|
else:
|
event.ignore()
|
else:
|
event.accept()
|
|
|
def main():
|
"""主函数"""
|
app = QApplication(sys.argv)
|
window = PasswordUpdaterApp()
|
window.show()
|
sys.exit(app.exec_())
|
|
|
if __name__ == '__main__':
|
main()
|