migrate_users.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #!/usr/bin/env python
  2. """
  3. 用户数据迁移脚本
  4. 将用户数据从JSON文件迁移到PostgreSQL数据库
  5. """
  6. import sys
  7. import os
  8. import json
  9. import logging
  10. import time
  11. import psycopg2
  12. # 添加项目根目录到Python路径
  13. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
  14. from app.core.system.auth import init_db, get_pg_connection, release_pg_connection
  15. from app.config.config import config, current_env
  16. # 获取配置
  17. app_config = config[current_env]
  18. # 配置日志
  19. log_level_name = getattr(app_config, 'LOG_LEVEL', 'INFO')
  20. log_level = getattr(logging, log_level_name)
  21. log_format = getattr(app_config, 'LOG_FORMAT', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  22. logging.basicConfig(
  23. level=log_level,
  24. format=log_format
  25. )
  26. logger = logging.getLogger(__name__)
  27. # 旧的用户数据文件路径
  28. OLD_USER_DATA_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data', 'users.json')
  29. def migrate_users():
  30. """
  31. 将用户数据从JSON文件迁移到PostgreSQL数据库
  32. """
  33. logger.info("开始迁移用户数据...")
  34. # 确保用户表已创建
  35. init_db()
  36. # 检查旧的用户数据文件是否存在
  37. if not os.path.exists(OLD_USER_DATA_PATH):
  38. logger.warning(f"用户数据文件不存在: {OLD_USER_DATA_PATH},无需迁移")
  39. return
  40. conn = None
  41. try:
  42. # 读取旧的用户数据
  43. with open(OLD_USER_DATA_PATH, 'r', encoding='utf-8') as f:
  44. users = json.load(f)
  45. logger.info(f"从文件中读取了 {len(users)} 个用户")
  46. # 连接数据库
  47. conn = get_pg_connection()
  48. cursor = conn.cursor()
  49. migrated_count = 0
  50. skipped_count = 0
  51. for username, user_data in users.items():
  52. # 检查用户是否已存在
  53. check_query = "SELECT username FROM users WHERE username = %s"
  54. cursor.execute(check_query, (username,))
  55. if cursor.fetchone():
  56. logger.info(f"用户 {username} 已存在,跳过")
  57. skipped_count += 1
  58. continue
  59. # 创建用户
  60. insert_query = """
  61. INSERT INTO users (id, username, password, created_at, last_login, is_admin)
  62. VALUES (%s, %s, %s, %s, %s, %s)
  63. """
  64. cursor.execute(
  65. insert_query,
  66. (
  67. user_data.get('id', f"migrated-{time.time()}"),
  68. username,
  69. user_data.get('password', ''),
  70. user_data.get('created_at', time.time()),
  71. user_data.get('last_login'),
  72. user_data.get('is_admin', False)
  73. )
  74. )
  75. migrated_count += 1
  76. logger.info(f"已迁移用户: {username}")
  77. conn.commit()
  78. cursor.close()
  79. logger.info(f"迁移完成: 成功迁移 {migrated_count} 个用户,跳过 {skipped_count} 个用户")
  80. # 备份旧文件
  81. backup_path = f"{OLD_USER_DATA_PATH}.bak.{int(time.time())}"
  82. os.rename(OLD_USER_DATA_PATH, backup_path)
  83. logger.info(f"已备份旧用户数据文件到: {backup_path}")
  84. except Exception as e:
  85. logger.error(f"迁移用户数据失败: {str(e)}")
  86. if conn:
  87. conn.rollback()
  88. raise
  89. finally:
  90. if conn:
  91. release_pg_connection(conn)
  92. if __name__ == "__main__":
  93. try:
  94. migrate_users()
  95. except Exception as e:
  96. logger.error(f"迁移失败: {str(e)}")
  97. sys.exit(1)
  98. sys.exit(0)