#!/usr/bin/env python """ 用户数据迁移脚本 将用户数据从JSON文件迁移到PostgreSQL数据库 """ import json import logging import os import sys import time # 添加项目根目录到Python路径 sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) from app.config.config import config, current_env from app.core.system.auth import get_pg_connection, init_db, release_pg_connection # 获取配置 app_config = config[current_env] # 配置日志 log_level_name = getattr(app_config, "LOG_LEVEL", "INFO") log_level = getattr(logging, log_level_name) log_format = getattr( app_config, "LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logging.basicConfig(level=log_level, format=log_format) logger = logging.getLogger(__name__) # 旧的用户数据文件路径 OLD_USER_DATA_PATH = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "users.json" ) def migrate_users(): """ 将用户数据从JSON文件迁移到PostgreSQL数据库 """ logger.info("开始迁移用户数据...") # 确保用户表已创建 init_db() # 检查旧的用户数据文件是否存在 if not os.path.exists(OLD_USER_DATA_PATH): logger.warning(f"用户数据文件不存在: {OLD_USER_DATA_PATH},无需迁移") return conn = None try: # 读取旧的用户数据 with open(OLD_USER_DATA_PATH, "r", encoding="utf-8") as f: users = json.load(f) logger.info(f"从文件中读取了 {len(users)} 个用户") # 连接数据库 conn = get_pg_connection() cursor = conn.cursor() migrated_count = 0 skipped_count = 0 for username, user_data in users.items(): # 检查用户是否已存在 check_query = "SELECT username FROM users WHERE username = %s" cursor.execute(check_query, (username,)) if cursor.fetchone(): logger.info(f"用户 {username} 已存在,跳过") skipped_count += 1 continue # 创建用户 insert_query = """ INSERT INTO users (id, username, password, created_at, last_login, is_admin) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute( insert_query, ( user_data.get("id", f"migrated-{time.time()}"), username, user_data.get("password", ""), user_data.get("created_at", time.time()), user_data.get("last_login"), user_data.get("is_admin", False), ), ) migrated_count += 1 logger.info(f"已迁移用户: {username}") conn.commit() cursor.close() logger.info( f"迁移完成: 成功迁移 {migrated_count} 个用户,跳过 {skipped_count} 个用户" ) # 备份旧文件 backup_path = f"{OLD_USER_DATA_PATH}.bak.{int(time.time())}" os.rename(OLD_USER_DATA_PATH, backup_path) logger.info(f"已备份旧用户数据文件到: {backup_path}") except Exception as e: logger.error(f"迁移用户数据失败: {str(e)}") if conn: conn.rollback() raise finally: if conn: release_pg_connection(conn) if __name__ == "__main__": try: migrate_users() except Exception as e: logger.error(f"迁移失败: {str(e)}") sys.exit(1) sys.exit(0)