permission_checker.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import logging
  2. from typing import Dict, Optional
  3. import asyncio
  4. class DatabasePermissionChecker:
  5. """数据库权限检查器"""
  6. def __init__(self, db_inspector):
  7. self.db_inspector = db_inspector
  8. self.logger = logging.getLogger("schema_tools.DatabasePermissionChecker")
  9. self._permission_cache: Optional[Dict[str, bool]] = None
  10. async def check_permissions(self) -> Dict[str, bool]:
  11. """
  12. 检查数据库权限
  13. Returns:
  14. 权限字典,包含:
  15. - connect: 是否可连接
  16. - select_metadata: 是否可查询元数据
  17. - select_data: 是否可查询数据
  18. - is_readonly: 是否为只读
  19. """
  20. if self._permission_cache is not None:
  21. return self._permission_cache
  22. permissions = {
  23. 'connect': False,
  24. 'select_metadata': False,
  25. 'select_data': False,
  26. 'is_readonly': False
  27. }
  28. try:
  29. # 检查连接权限
  30. if await self._test_connection():
  31. permissions['connect'] = True
  32. self.logger.info("✓ 数据库连接成功")
  33. else:
  34. self.logger.error("✗ 无法连接到数据库")
  35. return permissions
  36. # 检查元数据查询权限
  37. if await self._test_metadata_access():
  38. permissions['select_metadata'] = True
  39. self.logger.info("✓ 元数据查询权限正常")
  40. else:
  41. self.logger.warning("⚠ 元数据查询权限受限")
  42. # 检查数据查询权限
  43. if await self._test_data_access():
  44. permissions['select_data'] = True
  45. self.logger.info("✓ 数据查询权限正常")
  46. else:
  47. self.logger.warning("⚠ 数据查询权限受限")
  48. # 检查是否为只读库
  49. if await self._test_write_permission():
  50. permissions['is_readonly'] = False
  51. self.logger.info("✓ 数据库可读写")
  52. else:
  53. permissions['is_readonly'] = True
  54. self.logger.info("ℹ 数据库为只读模式")
  55. self._permission_cache = permissions
  56. return permissions
  57. except Exception as e:
  58. self.logger.exception(f"权限检查失败: {e}")
  59. return permissions
  60. async def _test_connection(self) -> bool:
  61. """测试数据库连接"""
  62. try:
  63. # 尝试获取数据库版本
  64. query = "SELECT version()"
  65. async with self.db_inspector.connection_pool.acquire() as conn:
  66. version = await conn.fetchval(query)
  67. self.logger.debug(f"数据库版本: {version}")
  68. return True
  69. except Exception as e:
  70. self.logger.error(f"连接测试失败: {e}")
  71. return False
  72. async def _test_metadata_access(self) -> bool:
  73. """测试元数据访问权限"""
  74. try:
  75. query = """
  76. SELECT schemaname, tablename
  77. FROM pg_tables
  78. WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
  79. LIMIT 1
  80. """
  81. async with self.db_inspector.connection_pool.acquire() as conn:
  82. result = await conn.fetch(query)
  83. return True
  84. except Exception as e:
  85. self.logger.error(f"元数据访问测试失败: {e}")
  86. return False
  87. async def _test_data_access(self) -> bool:
  88. """测试数据访问权限"""
  89. try:
  90. # 尝试查询一个简单的数据
  91. query = "SELECT 1 as test"
  92. async with self.db_inspector.connection_pool.acquire() as conn:
  93. result = await conn.fetchval(query)
  94. return result == 1
  95. except Exception as e:
  96. self.logger.error(f"数据访问测试失败: {e}")
  97. return False
  98. async def _test_write_permission(self) -> bool:
  99. """测试写入权限(通过创建临时表)"""
  100. try:
  101. async with self.db_inspector.connection_pool.acquire() as conn:
  102. # 开启事务
  103. async with conn.transaction():
  104. # 尝试创建临时表
  105. await conn.execute("""
  106. CREATE TEMP TABLE _schema_tools_permission_test (
  107. id INTEGER PRIMARY KEY,
  108. test_value TEXT
  109. )
  110. """)
  111. # 尝试插入数据
  112. await conn.execute("""
  113. INSERT INTO _schema_tools_permission_test (id, test_value)
  114. VALUES (1, 'test')
  115. """)
  116. # 清理(事务结束时临时表会自动删除)
  117. await conn.execute("DROP TABLE IF EXISTS _schema_tools_permission_test")
  118. return True
  119. except Exception as e:
  120. # 写入失败通常意味着只读权限
  121. self.logger.debug(f"写入权限测试失败(可能是只读库): {e}")
  122. return False
  123. def get_permission_summary(self) -> str:
  124. """获取权限摘要信息"""
  125. if self._permission_cache is None:
  126. return "权限未检查"
  127. perms = self._permission_cache
  128. if not perms['connect']:
  129. return "❌ 无法连接到数据库"
  130. if perms['select_metadata'] and perms['select_data']:
  131. mode = "只读" if perms['is_readonly'] else "读写"
  132. return f"✅ 权限正常({mode}模式)"
  133. elif perms['select_metadata']:
  134. return "⚠️ 仅有元数据查询权限"
  135. else:
  136. return "❌ 权限不足"
  137. def require_minimum_permissions(self) -> bool:
  138. """检查是否满足最低权限要求"""
  139. if self._permission_cache is None:
  140. return False
  141. # 最低要求:能连接和查询元数据
  142. return (self._permission_cache['connect'] and
  143. self._permission_cache['select_metadata'])