123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- #!/usr/bin/env python
- """
- 用户数据迁移脚本
- 将用户数据从JSON文件迁移到PostgreSQL数据库
- """
- import sys
- import os
- import json
- import logging
- import time
- import psycopg2
- # 添加项目根目录到Python路径
- sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
- from app.core.system.auth import init_db, get_pg_connection, release_pg_connection
- from app.config.config import config, current_env
- # 获取配置
- app_config = config[current_env]
- # 配置日志
- log_level_name = getattr(app_config, 'LOG_LEVEL', 'INFO')
- log_level = getattr(logging, log_level_name)
- log_format = getattr(app_config, 'LOG_FORMAT', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logging.basicConfig(
- level=log_level,
- format=log_format
- )
- logger = logging.getLogger(__name__)
- # 旧的用户数据文件路径
- OLD_USER_DATA_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data', 'users.json')
- def migrate_users():
- """
- 将用户数据从JSON文件迁移到PostgreSQL数据库
- """
- logger.info("开始迁移用户数据...")
-
- # 确保用户表已创建
- init_db()
-
- # 检查旧的用户数据文件是否存在
- if not os.path.exists(OLD_USER_DATA_PATH):
- logger.warning(f"用户数据文件不存在: {OLD_USER_DATA_PATH},无需迁移")
- return
-
- conn = None
- try:
- # 读取旧的用户数据
- with open(OLD_USER_DATA_PATH, 'r', encoding='utf-8') as f:
- users = json.load(f)
-
- logger.info(f"从文件中读取了 {len(users)} 个用户")
-
- # 连接数据库
- conn = get_pg_connection()
- cursor = conn.cursor()
-
- migrated_count = 0
- skipped_count = 0
-
- for username, user_data in users.items():
- # 检查用户是否已存在
- check_query = "SELECT username FROM users WHERE username = %s"
- cursor.execute(check_query, (username,))
-
- if cursor.fetchone():
- logger.info(f"用户 {username} 已存在,跳过")
- skipped_count += 1
- continue
-
- # 创建用户
- insert_query = """
- INSERT INTO users (id, username, password, created_at, last_login, is_admin)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
-
- cursor.execute(
- insert_query,
- (
- user_data.get('id', f"migrated-{time.time()}"),
- username,
- user_data.get('password', ''),
- user_data.get('created_at', time.time()),
- user_data.get('last_login'),
- user_data.get('is_admin', False)
- )
- )
-
- migrated_count += 1
- logger.info(f"已迁移用户: {username}")
-
- conn.commit()
- cursor.close()
-
- logger.info(f"迁移完成: 成功迁移 {migrated_count} 个用户,跳过 {skipped_count} 个用户")
-
- # 备份旧文件
- backup_path = f"{OLD_USER_DATA_PATH}.bak.{int(time.time())}"
- os.rename(OLD_USER_DATA_PATH, backup_path)
- logger.info(f"已备份旧用户数据文件到: {backup_path}")
-
- except Exception as e:
- logger.error(f"迁移用户数据失败: {str(e)}")
- if conn:
- conn.rollback()
- raise
- finally:
- if conn:
- release_pg_connection(conn)
- if __name__ == "__main__":
- try:
- migrate_users()
- except Exception as e:
- logger.error(f"迁移失败: {str(e)}")
- sys.exit(1)
-
- sys.exit(0)
|