import pandas as pd
|
import bcrypt
|
import mysql.connector
|
from mysql.connector import Error
|
import asyncio
|
import os
|
from datetime import datetime
|
from typing import List, Dict, Any, Optional
|
import re
|
|
# === 配置 ===
|
EXCEL_PATH = 'dbExcel/数据库信息.xlsx' # Excel 文件名
|
LOG_CSV_PATH = '更新日志.csv' # 输出日志 CSV
|
EXPORT_DIR = '用户信息导出' # 导出文件目录
|
NEW_PASSWORD_PLAIN = 'Baoyi@1341' # 新的明文密码
|
USER_COUNT = -1 # 每库更新前 N 个用户,-1 表示全部
|
PROTECTED_USER_ID = 1 # 需要排除的用户ID
|
|
# Redis配置
|
REDIS_HOST = '192.168.6.168'
|
REDIS_PORT = 6002
|
REDIS_PASSWORD = None
|
|
# === 日志列表与锁 ===
|
log_records = []
|
log_lock = asyncio.Lock()
|
|
|
class RedisRefresher:
|
"""Redis缓存刷新器"""
|
|
def __init__(self):
|
self.redis_client = None
|
|
async def connect_redis(self):
|
"""连接Redis服务器"""
|
try:
|
# 尝试导入redis库
|
import redis
|
self.redis_client = redis.Redis(
|
host=REDIS_HOST,
|
port=REDIS_PORT,
|
password=REDIS_PASSWORD,
|
decode_responses=True
|
)
|
# 测试连接
|
await asyncio.to_thread(self.redis_client.ping)
|
return True
|
except ImportError:
|
print("错误: 未安装redis库,请使用 'pip install redis' 安装")
|
return False
|
except Exception as e:
|
print(f"连接Redis失败: {str(e)}")
|
return False
|
|
async def refresh_all_redis(self):
|
"""刷新所有Redis数据库"""
|
try:
|
if not self.redis_client:
|
if not await self.connect_redis():
|
return False, "无法连接Redis服务器"
|
|
# 获取所有数据库数量
|
try:
|
# 尝试获取配置信息
|
config = await asyncio.to_thread(self.redis_client.config_get, 'databases')
|
db_count = int(config.get('databases', 16))
|
except:
|
# 如果无法获取配置,默认使用16个数据库
|
db_count = 16
|
|
refreshed_dbs = 0
|
total_keys = 0
|
|
# 遍历所有数据库并刷新
|
for db_index in range(db_count):
|
try:
|
# 切换到指定数据库
|
temp_client = self.redis_client.connection_pool.get_connection()
|
await asyncio.to_thread(temp_client.send_command, 'SELECT', db_index)
|
await asyncio.to_thread(temp_client.read_response)
|
|
# 获取当前数据库的key数量
|
await asyncio.to_thread(temp_client.send_command, 'DBSIZE')
|
key_count = await asyncio.to_thread(temp_client.read_response)
|
|
if key_count > 0:
|
# 清空当前数据库
|
await asyncio.to_thread(temp_client.send_command, 'FLUSHDB')
|
await asyncio.to_thread(temp_client.read_response)
|
refreshed_dbs += 1
|
total_keys += key_count
|
print(f"已刷新数据库 {db_index}: 清除了 {key_count} 个键")
|
|
await asyncio.to_thread(self.redis_client.connection_pool.release, temp_client)
|
|
except Exception as e:
|
print(f"刷新数据库 {db_index} 时出错: {str(e)}")
|
continue
|
|
return True, f"成功刷新 {refreshed_dbs} 个Redis数据库,共清除 {total_keys} 个键"
|
|
except Exception as e:
|
return False, f"刷新Redis缓存时发生错误: {str(e)}"
|
|
|
def sanitize_filename(name: str) -> str:
|
"""清理文件名,移除可能引起问题的字符"""
|
# 移除或替换Windows文件名中不允许的字符
|
name = re.sub(r'[\\/*?:"<>|]', "", name)
|
# 替换空格为下划线
|
name = name.replace(" ", "_")
|
return name
|
|
|
class DatabaseUpdater:
|
"""数据库更新器类"""
|
|
def __init__(self, project: str, host: str, database: str,
|
user: str, password: str):
|
self.project = project
|
self.host = host
|
self.database = database
|
self.db_user = user
|
self.db_password = password
|
self.updated_users = [] # 存储更新过的用户信息
|
|
async def connect(self) -> Optional[mysql.connector.connection.MySQLConnection]:
|
"""建立数据库连接"""
|
try:
|
conn = await asyncio.to_thread(
|
mysql.connector.connect,
|
host=self.host,
|
port=3306,
|
user=self.db_user,
|
password=self.db_password,
|
database=self.database,
|
use_pure=True
|
)
|
return conn
|
except Error as e:
|
print(f"❌ [{self.project}] 数据库连接失败:{e}")
|
async with log_lock:
|
log_records.append({
|
'项目': self.project,
|
'数据库': self.database,
|
'用户ID': '连接失败',
|
'用户名': '',
|
'明文密码': '',
|
'哈希密码': f'错误信息: {e}'
|
})
|
return None
|
|
def check_plain_text_column(self, cursor) -> bool:
|
"""检查是否存在 plain_text 字段"""
|
try:
|
cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
|
return cursor.fetchone() is not None
|
except Error as e:
|
print(f"❌ [{self.project}] 检查 plain_text 字段失败:{e}")
|
return False
|
|
def get_users_to_update(self, cursor, user_filter: Optional[List[str]] = None) -> List[tuple]:
|
"""获取需要更新的用户列表"""
|
try:
|
# 构造基础查询语句
|
base_sql = """
|
SELECT id, login_name, name, current_role_id
|
FROM sys_user
|
WHERE id != %s
|
""" # 默认排除保护用户
|
|
params = [PROTECTED_USER_ID]
|
|
# 添加用户筛选条件
|
if isinstance(user_filter, list) and user_filter: # 用户名列表模式
|
placeholders = ','.join(['%s'] * len(user_filter))
|
base_sql += f" AND login_name IN ({placeholders})"
|
params.extend(user_filter)
|
elif user_filter == "LIMIT": # 限制数量模式
|
if USER_COUNT != -1:
|
base_sql += f" LIMIT {USER_COUNT}"
|
|
cursor.execute(base_sql, params)
|
return cursor.fetchall()
|
except Error as e:
|
print(f"❌ [{self.project}] 获取用户列表失败:{e}")
|
return []
|
|
def update_user_password(self, cursor, user_id: int, login_name: str,
|
has_plain_text: bool) -> bool:
|
"""更新用户密码"""
|
try:
|
salt = bcrypt.gensalt(rounds=10)
|
hashed = bcrypt.hashpw(NEW_PASSWORD_PLAIN.encode('utf-8'), salt).decode('utf-8')
|
|
# 构造更新 SQL
|
if has_plain_text:
|
update_sql = """
|
UPDATE sys_user
|
SET plain_text = %s, password = %s
|
WHERE id = %s
|
"""
|
cursor.execute(update_sql, (NEW_PASSWORD_PLAIN, hashed, user_id))
|
else:
|
update_sql = """
|
UPDATE sys_user
|
SET password = %s
|
WHERE id = %s
|
"""
|
cursor.execute(update_sql, (hashed, user_id))
|
|
return True
|
except Error as e:
|
print(f"❌ [{self.project}] 更新用户 {login_name} 密码失败:{e}")
|
return False
|
|
def get_role_name(self, cursor, role_id: int) -> str:
|
"""根据角色ID获取角色名称"""
|
try:
|
if not role_id:
|
return "无角色"
|
|
cursor.execute("SELECT name FROM sys_role WHERE id = %s", (role_id,))
|
result = cursor.fetchone()
|
return result[0] if result else "角色不存在"
|
except Error as e:
|
print(f"❌ [{self.project}] 获取角色名称失败:{e}")
|
return "获取失败"
|
|
async def export_user_info(self, cursor) -> bool:
|
"""导出用户信息到Excel"""
|
try:
|
if not self.updated_users:
|
print(f"⚠️ [{self.project}] 没有更新过的用户,无需导出")
|
return False
|
|
# 确保导出目录存在
|
os.makedirs(EXPORT_DIR, exist_ok=True)
|
|
# 获取所有角色ID
|
role_ids = list(set(user['role_id'] for user in self.updated_users if user['role_id']))
|
|
# 获取角色名称映射
|
role_name_map = {}
|
for role_id in role_ids:
|
role_name = await asyncio.to_thread(self.get_role_name, cursor, role_id)
|
role_name_map[role_id] = role_name
|
|
# 准备导出数据
|
export_data = []
|
for user in self.updated_users:
|
export_data.append({
|
'用户名': user['login_name'],
|
'密码': NEW_PASSWORD_PLAIN, # 使用新密码
|
'姓名': user['name'],
|
'角色名称': role_name_map.get(user['role_id'], '无角色')
|
})
|
|
# 创建DataFrame并导出
|
df_export = pd.DataFrame(export_data)
|
|
# 按照要求格式化文件名:项目名称测试环境账号密码_年月日
|
safe_project_name = sanitize_filename(self.project)
|
date_str = datetime.now().strftime("%Y%m%d")
|
filename = f"{safe_project_name}测试环境账号密码_{date_str}.xlsx"
|
filepath = os.path.join(EXPORT_DIR, filename)
|
|
# 移除 encoding 参数,因为 to_excel 方法不支持它
|
await asyncio.to_thread(df_export.to_excel, filepath, index=False)
|
print(f"✅ [{self.project}] 用户信息已导出到: {filepath}")
|
return True
|
|
except Exception as e:
|
print(f"❌ [{self.project}] 导出用户信息失败:{e}")
|
return False
|
|
async def update_database(self, user_filter: Optional[List[str]] = None) -> bool:
|
"""更新数据库中的用户密码"""
|
conn = None
|
cursor = None
|
try:
|
conn = await self.connect()
|
if not conn:
|
return False
|
|
# 使用同步方式创建游标
|
cursor = conn.cursor()
|
|
# 检查是否有 plain_text 字段
|
has_plain_text = await asyncio.to_thread(self.check_plain_text_column, cursor)
|
|
# 获取需要更新的用户
|
users = await asyncio.to_thread(self.get_users_to_update, cursor, user_filter)
|
|
if not users:
|
print(f"❌ [{self.project}] 未找到符合条件的用户,跳过。")
|
return False
|
|
# 更新每个用户的密码
|
for uid, login_name, name, current_role_id in users:
|
success = await asyncio.to_thread(
|
self.update_user_password, cursor, uid, login_name, has_plain_text
|
)
|
|
if success:
|
# 记录更新成功的用户信息
|
self.updated_users.append({
|
'id': uid,
|
'login_name': login_name,
|
'name': name,
|
'role_id': current_role_id
|
})
|
|
# 写入日志
|
async with log_lock:
|
log_records.append({
|
'项目': self.project,
|
'数据库': self.database,
|
'用户ID': uid,
|
'用户名': login_name,
|
'明文密码': NEW_PASSWORD_PLAIN if has_plain_text else '未更新',
|
'哈希密码': bcrypt.hashpw(NEW_PASSWORD_PLAIN.encode('utf-8'),
|
bcrypt.gensalt(rounds=10)).decode('utf-8')
|
})
|
print(f"✅ [{self.project}] 用户 {login_name} (ID:{uid}) 更新成功")
|
|
# 提交事务
|
conn.commit()
|
print(f"✅ [{self.project}] 共 {len(self.updated_users)} 个用户更新完成")
|
return True
|
|
except Error as e:
|
print(f"❌ [{self.project}] 数据库操作失败:{e}")
|
async with log_lock:
|
log_records.append({
|
'项目': self.project,
|
'数据库': self.database,
|
'用户ID': '操作失败',
|
'用户名': '',
|
'明文密码': '',
|
'哈希密码': f'错误信息: {e}'
|
})
|
return False
|
finally:
|
if cursor:
|
cursor.close()
|
if conn:
|
conn.close()
|
|
|
async def main():
|
"""主异步函数"""
|
# 读取 Excel 数据
|
try:
|
df = pd.read_excel(EXCEL_PATH)
|
except Exception as e:
|
print(f"❌ 读取Excel文件失败:{e}")
|
return
|
|
# 获取所有唯一项目列表
|
all_projects = df['项目'].unique().tolist()
|
if not all_projects:
|
print("❌ Excel 文件中未找到任何项目")
|
return
|
|
# === 项目选择 ===
|
print("\n请选择要处理的项目(输入编号,多个用逗号分隔):")
|
for idx, project in enumerate(all_projects, 1):
|
print(f" [{idx}] {project}")
|
|
while True:
|
selected = input("\n请输入项目编号(例如 1,3):").strip()
|
if not selected:
|
print("⚠️ 未选择项目,默认处理全部")
|
selected_projects = all_projects
|
break
|
|
try:
|
indexes = [int(n.strip()) - 1 for n in selected.replace(',', ',').split(',')]
|
selected_projects = [all_projects[i] for i in indexes]
|
if not selected_projects:
|
raise ValueError("至少选择一个有效项目")
|
break
|
except (ValueError, IndexError) as e:
|
print(f"❌ 输入无效:{e},请重新输入")
|
|
# === 操作模式选择 ===
|
print("\n请选择操作模式:")
|
print(" [1] 批量更新所有用户(排除ID为1的用户)")
|
print(" [2] 指定用户名更新")
|
mode = input("请输入模式编号(1/2):").strip()
|
|
user_filter = None
|
if mode == '2':
|
while True:
|
usernames = input("请输入用户名(多个用逗号分隔):").strip()
|
if not usernames:
|
print("⚠️ 用户名不能为空")
|
continue
|
user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
|
if len(user_filter) == 0:
|
print("❌ 至少输入一个有效用户名")
|
else:
|
break
|
elif mode != '1':
|
print("⚠️ 输入无效,默认使用批量模式")
|
mode = '1'
|
|
# 过滤 DataFrame
|
df_filtered = df[df['项目'].isin(selected_projects)]
|
if df_filtered.empty:
|
print("❌ 筛选后无有效项目,请检查输入")
|
return
|
|
# 创建数据库更新器实例
|
updaters = []
|
for _, row in df_filtered.iterrows():
|
project, host, database, user, password = row[:5]
|
print(f"\n🚀 准备处理项目:{project} | 数据库:{database}")
|
updater = DatabaseUpdater(project, host, database, user, password)
|
updaters.append(updater)
|
|
# 执行更新任务
|
tasks = []
|
for updater in updaters:
|
tasks.append(updater.update_database(
|
user_filter=user_filter if mode == '2' else "LIMIT"
|
))
|
|
await asyncio.gather(*tasks)
|
|
# 询问是否导出用户信息
|
for updater in updaters:
|
if updater.updated_users:
|
export_choice = input(f"\n是否导出项目 {updater.project} 的用户信息? (y/N): ").strip().lower()
|
if export_choice == 'y' or export_choice == 'yes':
|
try:
|
conn = await updater.connect()
|
if conn:
|
cursor = conn.cursor()
|
await updater.export_user_info(cursor)
|
cursor.close()
|
conn.close()
|
except Exception as e:
|
print(f"❌ 导出项目 {updater.project} 用户信息时出错: {e}")
|
else:
|
print(f"⚠️ 跳过项目 {updater.project} 的用户信息导出")
|
|
# 保存日志
|
if log_records:
|
# 保存日志时使用正确的编码参数(CSV文件需要)
|
await asyncio.to_thread(
|
pd.DataFrame(log_records).to_csv,
|
LOG_CSV_PATH,
|
index=False,
|
encoding='utf-8-sig'
|
)
|
print(f"\n✅ 所有更新日志已保存至 {LOG_CSV_PATH}")
|
else:
|
print("⚠️ 无任何更新记录生成")
|
|
# 数据库更新完成后,自动刷新Redis缓存
|
print("\n开始刷新Redis缓存...")
|
redis_refresher = RedisRefresher()
|
success, message = await redis_refresher.refresh_all_redis()
|
if success:
|
print(f"✅ {message}")
|
else:
|
print(f"❌ {message}")
|
|
|
# === 启动入口 ===
|
if __name__ == '__main__':
|
asyncio.run(main())
|