import pandas as pd
|
import bcrypt
|
import mysql.connector
|
from mysql.connector import Error
|
import asyncio
|
|
# === 配置 ===
|
excel_path = 'dbExcel/数据库信息.xlsx' # Excel 文件名
|
log_csv_path = '更新日志.csv' # 输出日志 CSV
|
new_password_plain = 'Baoyi@1341' # 新的明文密码
|
user_count = -1 # 每库更新前 N 个用户,-1 表示全部
|
protected_user_id = 1 # 需要排除的用户ID
|
|
# === 日志列表与锁 ===
|
log_records = []
|
log_lock = asyncio.Lock()
|
|
|
# === 异步处理单个数据库 ===
|
async def update_database(project, host, database, user, password, user_filter=None):
|
try:
|
conn = await asyncio.to_thread(mysql.connector.connect,
|
host=host,
|
port=3306,
|
user=user,
|
password=password,
|
database=database,
|
use_pure=True
|
)
|
cursor = conn.cursor()
|
|
# ✅ 判断是否有 plain_text 字段
|
cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
|
has_plain_text = cursor.fetchone() is not None
|
|
# 构造基础查询语句
|
base_sql = """
|
SELECT id, login_name
|
FROM sys_user
|
WHERE id != %s
|
""" # 默认排除保护用户
|
|
params = [protected_user_id]
|
|
# 添加用户筛选条件
|
if isinstance(user_filter, list): # 用户名列表模式
|
placeholders = ','.join(['%s'] * len(user_filter))
|
base_sql += f" AND login_name IN ({placeholders})"
|
params.extend(user_filter)
|
elif user_filter == "LIMIT": # 限制数量模式
|
if user_count != -1:
|
base_sql += f" LIMIT {user_count}"
|
|
cursor.execute(base_sql, params)
|
users = cursor.fetchall()
|
|
if not users:
|
print(f"❌ [{project}] 未找到符合条件的用户,跳过。")
|
return
|
|
for uid, login_name in users:
|
salt = bcrypt.gensalt(rounds=10)
|
hashed = bcrypt.hashpw(new_password_plain.encode('utf-8'), salt).decode('utf-8')
|
|
# ✅ 构造更新 SQL
|
if has_plain_text:
|
update_sql = """
|
UPDATE sys_user
|
SET plain_text = %s, password = %s
|
WHERE id = %s
|
"""
|
cursor.execute(update_sql, (new_password_plain, hashed, uid))
|
else:
|
update_sql = """
|
UPDATE sys_user
|
SET password = %s
|
WHERE id = %s
|
"""
|
cursor.execute(update_sql, (hashed, uid))
|
|
# ✅ 写入日志
|
async with log_lock:
|
log_records.append({
|
'项目': project,
|
'数据库': database,
|
'用户ID': uid,
|
'用户名': login_name,
|
'明文密码': new_password_plain if has_plain_text else '未更新',
|
'哈希密码': hashed
|
})
|
print(f"✅ [{project}] 用户 {login_name} (ID:{uid}) 更新成功")
|
|
conn.commit()
|
print(f"✅ [{project}] 共 {len(users)} 个用户更新完成")
|
|
except Error as e:
|
print(f"❌ [{project}] 数据库操作失败:{e}")
|
async with log_lock:
|
log_records.append({
|
'项目': project,
|
'数据库': database,
|
'用户ID': '连接失败',
|
'用户名': '',
|
'明文密码': '',
|
'哈希密码': f'错误信息: {e}'
|
})
|
|
finally:
|
if 'cursor' in locals() and cursor:
|
cursor.close()
|
if 'conn' in locals() and conn:
|
conn.close()
|
|
|
# === 主异步函数 ===
|
async def main():
|
# 读取 Excel 数据
|
try:
|
df = pd.read_excel(excel_path)
|
except Exception as e:
|
print(f"❌ 读取Excel文件失败:{e}")
|
return
|
|
# 获取所有唯一项目列表
|
all_projects = df['项目'].unique().tolist()
|
if not all_projects:
|
print("❌ Excel 文件中未找到任何项目")
|
return
|
|
# === 项目选择 ===
|
print("\n请选择要处理的项目(输入编号,多个用逗号分隔):")
|
for idx, project in enumerate(all_projects, 1):
|
print(f" [{idx}] {project}")
|
|
while True:
|
selected = input("\n请输入项目编号(例如 1,3):").strip()
|
if not selected:
|
print("⚠️ 未选择项目,默认处理全部")
|
selected_projects = all_projects
|
break
|
|
try:
|
indexes = [int(n.strip()) - 1 for n in selected.replace(',', ',').split(',')]
|
selected_projects = [all_projects[i] for i in indexes]
|
if not selected_projects:
|
raise ValueError("至少选择一个有效项目")
|
break
|
except (ValueError, IndexError) as e:
|
print(f"❌ 输入无效:{e},请重新输入")
|
|
# === 操作模式选择 ===
|
print("\n请选择操作模式:")
|
print(" [1] 批量更新所有用户(排除ID为1的用户)")
|
print(" [2] 指定用户名更新")
|
mode = input("请输入模式编号(1/2):").strip()
|
|
user_filter = None
|
if mode == '2':
|
while True:
|
usernames = input("请输入用户名(多个用逗号分隔):").strip()
|
if not usernames:
|
print("⚠️ 用户名不能为空")
|
continue
|
user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
|
if len(user_filter) == 0:
|
print("❌ 至少输入一个有效用户名")
|
else:
|
break
|
elif mode != '1':
|
print("⚠️ 输入无效,默认使用批量模式")
|
mode = '1'
|
|
# 过滤 DataFrame
|
df_filtered = df[df['项目'].isin(selected_projects)]
|
if df_filtered.empty:
|
print("❌ 筛选后无有效项目,请检查输入")
|
return
|
|
# 创建异步任务
|
tasks = []
|
for _, row in df_filtered.iterrows():
|
project, host, database, user, password = row[:5]
|
print(f"\n🚀 准备处理项目:{project} | 数据库:{database}")
|
tasks.append(update_database(
|
project, host, database, user, password,
|
user_filter=user_filter if mode == '2' else None
|
))
|
|
await asyncio.gather(*tasks)
|
|
# 保存日志
|
if log_records:
|
pd.DataFrame(log_records).to_csv(log_csv_path, index=False, encoding='utf-8-sig')
|
print(f"\n✅ 所有更新日志已保存至 {log_csv_path}")
|
else:
|
print("⚠️ 无任何更新记录生成")
|
|
|
# === 启动入口 ===
|
if __name__ == '__main__':
|
asyncio.run(main())
|