migrate_users.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/env python
  2. """
  3. 用户数据迁移脚本
  4. 将用户数据从JSON文件迁移到PostgreSQL数据库
  5. """
  6. import json
  7. import logging
  8. import os
  9. import sys
  10. import time
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
  13. from app.config.config import config, current_env
  14. from app.core.system.auth import get_pg_connection, init_db, release_pg_connection
  15. # 获取配置
  16. app_config = config[current_env]
  17. # 配置日志
  18. log_level_name = getattr(app_config, "LOG_LEVEL", "INFO")
  19. log_level = getattr(logging, log_level_name)
  20. log_format = getattr(
  21. app_config, "LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  22. )
  23. logging.basicConfig(level=log_level, format=log_format)
  24. logger = logging.getLogger(__name__)
  25. # 旧的用户数据文件路径
  26. OLD_USER_DATA_PATH = os.path.join(
  27. os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "users.json"
  28. )
  29. def migrate_users():
  30. """
  31. 将用户数据从JSON文件迁移到PostgreSQL数据库
  32. """
  33. logger.info("开始迁移用户数据...")
  34. # 确保用户表已创建
  35. init_db()
  36. # 检查旧的用户数据文件是否存在
  37. if not os.path.exists(OLD_USER_DATA_PATH):
  38. logger.warning(f"用户数据文件不存在: {OLD_USER_DATA_PATH},无需迁移")
  39. return
  40. conn = None
  41. try:
  42. # 读取旧的用户数据
  43. with open(OLD_USER_DATA_PATH, "r", encoding="utf-8") as f:
  44. users = json.load(f)
  45. logger.info(f"从文件中读取了 {len(users)} 个用户")
  46. # 连接数据库
  47. conn = get_pg_connection()
  48. cursor = conn.cursor()
  49. migrated_count = 0
  50. skipped_count = 0
  51. for username, user_data in users.items():
  52. # 检查用户是否已存在
  53. check_query = "SELECT username FROM users WHERE username = %s"
  54. cursor.execute(check_query, (username,))
  55. if cursor.fetchone():
  56. logger.info(f"用户 {username} 已存在,跳过")
  57. skipped_count += 1
  58. continue
  59. # 创建用户
  60. insert_query = """
  61. INSERT INTO users (id, username, password, created_at, last_login, is_admin)
  62. VALUES (%s, %s, %s, %s, %s, %s)
  63. """
  64. cursor.execute(
  65. insert_query,
  66. (
  67. user_data.get("id", f"migrated-{time.time()}"),
  68. username,
  69. user_data.get("password", ""),
  70. user_data.get("created_at", time.time()),
  71. user_data.get("last_login"),
  72. user_data.get("is_admin", False),
  73. ),
  74. )
  75. migrated_count += 1
  76. logger.info(f"已迁移用户: {username}")
  77. conn.commit()
  78. cursor.close()
  79. logger.info(
  80. f"迁移完成: 成功迁移 {migrated_count} 个用户,跳过 {skipped_count} 个用户"
  81. )
  82. # 备份旧文件
  83. backup_path = f"{OLD_USER_DATA_PATH}.bak.{int(time.time())}"
  84. os.rename(OLD_USER_DATA_PATH, backup_path)
  85. logger.info(f"已备份旧用户数据文件到: {backup_path}")
  86. except Exception as e:
  87. logger.error(f"迁移用户数据失败: {str(e)}")
  88. if conn:
  89. conn.rollback()
  90. raise
  91. finally:
  92. if conn:
  93. release_pg_connection(conn)
  94. if __name__ == "__main__":
  95. try:
  96. migrate_users()
  97. except Exception as e:
  98. logger.error(f"迁移失败: {str(e)}")
  99. sys.exit(1)
  100. sys.exit(0)