migrate_wechat_users.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 微信用户表迁移脚本
  5. 创建微信用户表和相关索引
  6. """
  7. import os
  8. import sys
  9. import logging
  10. import psycopg2
  11. from psycopg2 import sql
  12. # 添加项目根目录到Python路径
  13. sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
  14. from app.config.config import config, current_env
  15. # 获取配置
  16. app_config = config[current_env]
  17. # 配置日志
  18. log_level_name = getattr(app_config, 'LOG_LEVEL', 'INFO')
  19. log_level = getattr(logging, log_level_name)
  20. log_format = getattr(app_config, 'LOG_FORMAT', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  21. logging.basicConfig(
  22. level=log_level,
  23. format=log_format
  24. )
  25. logger = logging.getLogger(__name__)
  26. def get_database_connection():
  27. """
  28. 获取数据库连接
  29. Returns:
  30. psycopg2.connection: 数据库连接对象
  31. """
  32. try:
  33. # 从配置中获取数据库连接信息
  34. db_config = {
  35. 'host': app_config.PG_HOST,
  36. 'port': app_config.PG_PORT,
  37. 'database': app_config.PG_DATABASE,
  38. 'user': app_config.PG_USERNAME,
  39. 'password': app_config.PG_PASSWORD
  40. }
  41. connection = psycopg2.connect(**db_config)
  42. logger.info("成功连接到数据库")
  43. return connection
  44. except Exception as e:
  45. logger.error(f"连接数据库失败: {str(e)}")
  46. raise
  47. def check_table_exists(connection, table_name, schema='public'):
  48. """
  49. 检查表是否存在
  50. Args:
  51. connection: 数据库连接
  52. table_name (str): 表名
  53. schema (str): 模式名,默认为public
  54. Returns:
  55. bool: 表存在返回True,否则返回False
  56. """
  57. try:
  58. with connection.cursor() as cursor:
  59. cursor.execute("""
  60. SELECT EXISTS (
  61. SELECT FROM information_schema.tables
  62. WHERE table_schema = %s AND table_name = %s
  63. );
  64. """, (schema, table_name))
  65. result = cursor.fetchone()
  66. return result[0] if result else False
  67. except Exception as e:
  68. logger.error(f"检查表是否存在时发生错误: {str(e)}")
  69. return False
  70. def create_wechat_users_table(connection):
  71. """
  72. 创建微信用户表
  73. Args:
  74. connection: 数据库连接
  75. Returns:
  76. bool: 创建成功返回True,否则返回False
  77. """
  78. try:
  79. # 读取SQL DDL文件
  80. sql_file_path = os.path.join(os.path.dirname(__file__), '../../database/create_wechat_users.sql')
  81. if not os.path.exists(sql_file_path):
  82. logger.error(f"SQL文件不存在: {sql_file_path}")
  83. return False
  84. with open(sql_file_path, 'r', encoding='utf-8') as file:
  85. sql_content = file.read()
  86. with connection.cursor() as cursor:
  87. # 执行SQL脚本
  88. cursor.execute(sql_content)
  89. connection.commit()
  90. logger.info("微信用户表创建成功")
  91. return True
  92. except Exception as e:
  93. logger.error(f"创建微信用户表失败: {str(e)}")
  94. connection.rollback()
  95. return False
  96. def migrate_wechat_users():
  97. """
  98. 执行微信用户表迁移
  99. Returns:
  100. bool: 迁移成功返回True,否则返回False
  101. """
  102. connection = None
  103. try:
  104. # 获取数据库连接
  105. connection = get_database_connection()
  106. # 检查表是否已存在
  107. if check_table_exists(connection, 'wechat_users'):
  108. logger.warning("微信用户表已存在,跳过创建")
  109. return True
  110. logger.info("开始创建微信用户表...")
  111. # 创建微信用户表
  112. if create_wechat_users_table(connection):
  113. logger.info("微信用户表迁移完成")
  114. return True
  115. else:
  116. logger.error("微信用户表迁移失败")
  117. return False
  118. except Exception as e:
  119. logger.error(f"迁移过程中发生错误: {str(e)}")
  120. return False
  121. finally:
  122. if connection:
  123. connection.close()
  124. logger.info("数据库连接已关闭")
  125. def rollback_wechat_users():
  126. """
  127. 回滚微信用户表迁移(删除表)
  128. Returns:
  129. bool: 回滚成功返回True,否则返回False
  130. """
  131. connection = None
  132. try:
  133. # 获取数据库连接
  134. connection = get_database_connection()
  135. # 检查表是否存在
  136. if not check_table_exists(connection, 'wechat_users'):
  137. logger.warning("微信用户表不存在,无需回滚")
  138. return True
  139. logger.info("开始回滚微信用户表...")
  140. with connection.cursor() as cursor:
  141. # 删除表
  142. cursor.execute("DROP TABLE IF EXISTS public.wechat_users CASCADE;")
  143. connection.commit()
  144. logger.info("微信用户表回滚完成")
  145. return True
  146. except Exception as e:
  147. logger.error(f"回滚过程中发生错误: {str(e)}")
  148. if connection:
  149. connection.rollback()
  150. return False
  151. finally:
  152. if connection:
  153. connection.close()
  154. logger.info("数据库连接已关闭")
  155. def main():
  156. """
  157. 主函数,根据命令行参数执行相应操作
  158. """
  159. import argparse
  160. parser = argparse.ArgumentParser(description='微信用户表迁移脚本')
  161. parser.add_argument('--action', choices=['migrate', 'rollback'], default='migrate',
  162. help='执行的操作:migrate(迁移)或 rollback(回滚)')
  163. args = parser.parse_args()
  164. if args.action == 'migrate':
  165. logger.info("开始执行微信用户表迁移...")
  166. success = migrate_wechat_users()
  167. elif args.action == 'rollback':
  168. logger.info("开始执行微信用户表回滚...")
  169. success = rollback_wechat_users()
  170. else:
  171. logger.error("未知的操作类型")
  172. sys.exit(1)
  173. if success:
  174. logger.info("操作完成")
  175. sys.exit(0)
  176. else:
  177. logger.error("操作失败")
  178. sys.exit(1)
  179. if __name__ == "__main__":
  180. main()