hyb
2025-10-24 6496e2f5fe11a221d1153ddddb78d80f3fb78ce8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import pandas as pd
import bcrypt
import mysql.connector
from mysql.connector import Error
import asyncio
 
# === 配置 ===
excel_path = 'dbExcel/数据库信息.xlsx'  # Excel 文件名
log_csv_path = '更新日志.csv'  # 输出日志 CSV
new_password_plain = 'Baoyi@1341'  # 新的明文密码
user_count = -1  # 每库更新前 N 个用户,-1 表示全部
protected_user_id = 1  # 需要排除的用户ID
 
# === 日志列表与锁 ===
log_records = []
log_lock = asyncio.Lock()
 
 
# === 异步处理单个数据库 ===
async def update_database(project, host, database, user, password, user_filter=None):
    try:
        conn = await asyncio.to_thread(mysql.connector.connect,
                                       host=host,
                                       port=3306,
                                       user=user,
                                       password=password,
                                       database=database,
                                       use_pure=True
                                       )
        cursor = conn.cursor()
 
        # ✅ 判断是否有 plain_text 字段
        cursor.execute("SHOW COLUMNS FROM sys_user LIKE 'plain_text'")
        has_plain_text = cursor.fetchone() is not None
 
        # 构造基础查询语句
        base_sql = """
            SELECT id, login_name 
            FROM sys_user 
            WHERE id != %s
        """  # 默认排除保护用户
 
        params = [protected_user_id]
 
        # 添加用户筛选条件
        if isinstance(user_filter, list):  # 用户名列表模式
            placeholders = ','.join(['%s'] * len(user_filter))
            base_sql += f" AND login_name IN ({placeholders})"
            params.extend(user_filter)
        elif user_filter == "LIMIT":  # 限制数量模式
            if user_count != -1:
                base_sql += f" LIMIT {user_count}"
 
        cursor.execute(base_sql, params)
        users = cursor.fetchall()
 
        if not users:
            print(f"❌ [{project}] 未找到符合条件的用户,跳过。")
            return
 
        for uid, login_name in users:
            salt = bcrypt.gensalt(rounds=10)
            hashed = bcrypt.hashpw(new_password_plain.encode('utf-8'), salt).decode('utf-8')
 
            # ✅ 构造更新 SQL
            if has_plain_text:
                update_sql = """
                    UPDATE sys_user
                    SET plain_text = %s, password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (new_password_plain, hashed, uid))
            else:
                update_sql = """
                    UPDATE sys_user
                    SET password = %s
                    WHERE id = %s
                """
                cursor.execute(update_sql, (hashed, uid))
 
            # ✅ 写入日志
            async with log_lock:
                log_records.append({
                    '项目': project,
                    '数据库': database,
                    '用户ID': uid,
                    '用户名': login_name,
                    '明文密码': new_password_plain if has_plain_text else '未更新',
                    '哈希密码': hashed
                })
            print(f"✅ [{project}] 用户 {login_name} (ID:{uid}) 更新成功")
 
        conn.commit()
        print(f"✅ [{project}] 共 {len(users)} 个用户更新完成")
 
    except Error as e:
        print(f"❌ [{project}] 数据库操作失败:{e}")
        async with log_lock:
            log_records.append({
                '项目': project,
                '数据库': database,
                '用户ID': '连接失败',
                '用户名': '',
                '明文密码': '',
                '哈希密码': f'错误信息: {e}'
            })
 
    finally:
        if 'cursor' in locals() and cursor:
            cursor.close()
        if 'conn' in locals() and conn:
            conn.close()
 
 
# === 主异步函数 ===
async def main():
    # 读取 Excel 数据
    try:
        df = pd.read_excel(excel_path)
    except Exception as e:
        print(f"❌ 读取Excel文件失败:{e}")
        return
 
    # 获取所有唯一项目列表
    all_projects = df['项目'].unique().tolist()
    if not all_projects:
        print("❌ Excel 文件中未找到任何项目")
        return
 
    # === 项目选择 ===
    print("\n请选择要处理的项目(输入编号,多个用逗号分隔):")
    for idx, project in enumerate(all_projects, 1):
        print(f"  [{idx}] {project}")
 
    while True:
        selected = input("\n请输入项目编号(例如 1,3):").strip()
        if not selected:
            print("⚠️ 未选择项目,默认处理全部")
            selected_projects = all_projects
            break
 
        try:
            indexes = [int(n.strip()) - 1 for n in selected.replace(',', ',').split(',')]
            selected_projects = [all_projects[i] for i in indexes]
            if not selected_projects:
                raise ValueError("至少选择一个有效项目")
            break
        except (ValueError, IndexError) as e:
            print(f"❌ 输入无效:{e},请重新输入")
 
    # === 操作模式选择 ===
    print("\n请选择操作模式:")
    print("  [1] 批量更新所有用户(排除ID为1的用户)")
    print("  [2] 指定用户名更新")
    mode = input("请输入模式编号(1/2):").strip()
 
    user_filter = None
    if mode == '2':
        while True:
            usernames = input("请输入用户名(多个用逗号分隔):").strip()
            if not usernames:
                print("⚠️ 用户名不能为空")
                continue
            user_filter = [name.strip() for name in usernames.replace(',', ',').split(',')]
            if len(user_filter) == 0:
                print("❌ 至少输入一个有效用户名")
            else:
                break
    elif mode != '1':
        print("⚠️ 输入无效,默认使用批量模式")
        mode = '1'
 
    # 过滤 DataFrame
    df_filtered = df[df['项目'].isin(selected_projects)]
    if df_filtered.empty:
        print("❌ 筛选后无有效项目,请检查输入")
        return
 
    # 创建异步任务
    tasks = []
    for _, row in df_filtered.iterrows():
        project, host, database, user, password = row[:5]
        print(f"\n🚀 准备处理项目:{project} | 数据库:{database}")
        tasks.append(update_database(
            project, host, database, user, password,
            user_filter=user_filter if mode == '2' else None
        ))
 
    await asyncio.gather(*tasks)
 
    # 保存日志
    if log_records:
        pd.DataFrame(log_records).to_csv(log_csv_path, index=False, encoding='utf-8-sig')
        print(f"\n✅ 所有更新日志已保存至 {log_csv_path}")
    else:
        print("⚠️ 无任何更新记录生成")
 
 
# === 启动入口 ===
if __name__ == '__main__':
    asyncio.run(main())