migrate_wechat_users.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 微信用户表迁移脚本
  5. 创建微信用户表和相关索引
  6. """
  7. import logging
  8. import os
  9. import sys
  10. import psycopg2
  11. # 添加项目根目录到Python路径
  12. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
  13. from app.config.config import config, current_env
  14. # 获取配置
  15. app_config = config[current_env]
  16. # 配置日志
  17. log_level_name = getattr(app_config, "LOG_LEVEL", "INFO")
  18. log_level = getattr(logging, log_level_name)
  19. log_format = getattr(
  20. app_config, "LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  21. )
  22. logging.basicConfig(level=log_level, format=log_format)
  23. logger = logging.getLogger(__name__)
  24. def get_database_connection():
  25. """
  26. 获取数据库连接
  27. Returns:
  28. psycopg2.connection: 数据库连接对象
  29. """
  30. try:
  31. # 从配置中获取数据库连接信息
  32. db_config = {
  33. "host": app_config.PG_HOST,
  34. "port": app_config.PG_PORT,
  35. "database": app_config.PG_DATABASE,
  36. "user": app_config.PG_USERNAME,
  37. "password": app_config.PG_PASSWORD,
  38. }
  39. connection = psycopg2.connect(**db_config)
  40. logger.info("成功连接到数据库")
  41. return connection
  42. except Exception as e:
  43. logger.error(f"连接数据库失败: {str(e)}")
  44. raise
  45. def check_table_exists(connection, table_name, schema="public"):
  46. """
  47. 检查表是否存在
  48. Args:
  49. connection: 数据库连接
  50. table_name (str): 表名
  51. schema (str): 模式名,默认为public
  52. Returns:
  53. bool: 表存在返回True,否则返回False
  54. """
  55. try:
  56. with connection.cursor() as cursor:
  57. cursor.execute(
  58. """
  59. SELECT EXISTS (
  60. SELECT FROM information_schema.tables
  61. WHERE table_schema = %s AND table_name = %s
  62. );
  63. """,
  64. (schema, table_name),
  65. )
  66. result = cursor.fetchone()
  67. return result[0] if result else False
  68. except Exception as e:
  69. logger.error(f"检查表是否存在时发生错误: {str(e)}")
  70. return False
  71. def create_wechat_users_table(connection):
  72. """
  73. 创建微信用户表
  74. Args:
  75. connection: 数据库连接
  76. Returns:
  77. bool: 创建成功返回True,否则返回False
  78. """
  79. try:
  80. # 读取SQL DDL文件
  81. sql_file_path = os.path.join(
  82. os.path.dirname(__file__), "../../database/create_wechat_users.sql"
  83. )
  84. if not os.path.exists(sql_file_path):
  85. logger.error(f"SQL文件不存在: {sql_file_path}")
  86. return False
  87. with open(sql_file_path, "r", encoding="utf-8") as file:
  88. sql_content = file.read()
  89. with connection.cursor() as cursor:
  90. # 执行SQL脚本
  91. cursor.execute(sql_content)
  92. connection.commit()
  93. logger.info("微信用户表创建成功")
  94. return True
  95. except Exception as e:
  96. logger.error(f"创建微信用户表失败: {str(e)}")
  97. connection.rollback()
  98. return False
  99. def migrate_wechat_users():
  100. """
  101. 执行微信用户表迁移
  102. Returns:
  103. bool: 迁移成功返回True,否则返回False
  104. """
  105. connection = None
  106. try:
  107. # 获取数据库连接
  108. connection = get_database_connection()
  109. # 检查表是否已存在
  110. if check_table_exists(connection, "wechat_users"):
  111. logger.warning("微信用户表已存在,跳过创建")
  112. return True
  113. logger.info("开始创建微信用户表...")
  114. # 创建微信用户表
  115. if create_wechat_users_table(connection):
  116. logger.info("微信用户表迁移完成")
  117. return True
  118. else:
  119. logger.error("微信用户表迁移失败")
  120. return False
  121. except Exception as e:
  122. logger.error(f"迁移过程中发生错误: {str(e)}")
  123. return False
  124. finally:
  125. if connection:
  126. connection.close()
  127. logger.info("数据库连接已关闭")
  128. def rollback_wechat_users():
  129. """
  130. 回滚微信用户表迁移(删除表)
  131. Returns:
  132. bool: 回滚成功返回True,否则返回False
  133. """
  134. connection = None
  135. try:
  136. # 获取数据库连接
  137. connection = get_database_connection()
  138. # 检查表是否存在
  139. if not check_table_exists(connection, "wechat_users"):
  140. logger.warning("微信用户表不存在,无需回滚")
  141. return True
  142. logger.info("开始回滚微信用户表...")
  143. with connection.cursor() as cursor:
  144. # 删除表
  145. cursor.execute("DROP TABLE IF EXISTS public.wechat_users CASCADE;")
  146. connection.commit()
  147. logger.info("微信用户表回滚完成")
  148. return True
  149. except Exception as e:
  150. logger.error(f"回滚过程中发生错误: {str(e)}")
  151. if connection:
  152. connection.rollback()
  153. return False
  154. finally:
  155. if connection:
  156. connection.close()
  157. logger.info("数据库连接已关闭")
  158. def main():
  159. """
  160. 主函数,根据命令行参数执行相应操作
  161. """
  162. import argparse
  163. parser = argparse.ArgumentParser(description="微信用户表迁移脚本")
  164. parser.add_argument(
  165. "--action",
  166. choices=["migrate", "rollback"],
  167. default="migrate",
  168. help="执行的操作:migrate(迁移)或 rollback(回滚)",
  169. )
  170. args = parser.parse_args()
  171. if args.action == "migrate":
  172. logger.info("开始执行微信用户表迁移...")
  173. success = migrate_wechat_users()
  174. elif args.action == "rollback":
  175. logger.info("开始执行微信用户表回滚...")
  176. success = rollback_wechat_users()
  177. else:
  178. logger.error("未知的操作类型")
  179. sys.exit(1)
  180. if success:
  181. logger.info("操作完成")
  182. sys.exit(0)
  183. else:
  184. logger.error("操作失败")
  185. sys.exit(1)
  186. if __name__ == "__main__":
  187. main()