| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- #!/usr/bin/env python
- """
- 用户数据迁移脚本
- 将用户数据从JSON文件迁移到PostgreSQL数据库
- """
- import json
- import logging
- import os
- import sys
- import time
- # 添加项目根目录到Python路径
- sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
- from app.config.config import config, current_env
- from app.core.system.auth import get_pg_connection, init_db, release_pg_connection
- # 获取配置
- app_config = config[current_env]
- # 配置日志
- log_level_name = getattr(app_config, "LOG_LEVEL", "INFO")
- log_level = getattr(logging, log_level_name)
- log_format = getattr(
- app_config, "LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- )
- logging.basicConfig(level=log_level, format=log_format)
- logger = logging.getLogger(__name__)
- # 旧的用户数据文件路径
- OLD_USER_DATA_PATH = os.path.join(
- os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "users.json"
- )
- def migrate_users():
- """
- 将用户数据从JSON文件迁移到PostgreSQL数据库
- """
- logger.info("开始迁移用户数据...")
- # 确保用户表已创建
- init_db()
- # 检查旧的用户数据文件是否存在
- if not os.path.exists(OLD_USER_DATA_PATH):
- logger.warning(f"用户数据文件不存在: {OLD_USER_DATA_PATH},无需迁移")
- return
- conn = None
- try:
- # 读取旧的用户数据
- with open(OLD_USER_DATA_PATH, "r", encoding="utf-8") as f:
- users = json.load(f)
- logger.info(f"从文件中读取了 {len(users)} 个用户")
- # 连接数据库
- conn = get_pg_connection()
- cursor = conn.cursor()
- migrated_count = 0
- skipped_count = 0
- for username, user_data in users.items():
- # 检查用户是否已存在
- check_query = "SELECT username FROM users WHERE username = %s"
- cursor.execute(check_query, (username,))
- if cursor.fetchone():
- logger.info(f"用户 {username} 已存在,跳过")
- skipped_count += 1
- continue
- # 创建用户
- insert_query = """
- INSERT INTO users (id, username, password, created_at, last_login, is_admin)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
- cursor.execute(
- insert_query,
- (
- user_data.get("id", f"migrated-{time.time()}"),
- username,
- user_data.get("password", ""),
- user_data.get("created_at", time.time()),
- user_data.get("last_login"),
- user_data.get("is_admin", False),
- ),
- )
- migrated_count += 1
- logger.info(f"已迁移用户: {username}")
- conn.commit()
- cursor.close()
- logger.info(
- f"迁移完成: 成功迁移 {migrated_count} 个用户,跳过 {skipped_count} 个用户"
- )
- # 备份旧文件
- backup_path = f"{OLD_USER_DATA_PATH}.bak.{int(time.time())}"
- os.rename(OLD_USER_DATA_PATH, backup_path)
- logger.info(f"已备份旧用户数据文件到: {backup_path}")
- except Exception as e:
- logger.error(f"迁移用户数据失败: {str(e)}")
- if conn:
- conn.rollback()
- raise
- finally:
- if conn:
- release_pg_connection(conn)
- if __name__ == "__main__":
- try:
- migrate_users()
- except Exception as e:
- logger.error(f"迁移失败: {str(e)}")
- sys.exit(1)
- sys.exit(0)
|