large_table_handler.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import logging
  2. import random
  3. from typing import List, Dict, Any, Optional
  4. from schema_tools.config import SCHEMA_TOOLS_CONFIG
  5. class LargeTableHandler:
  6. """大表处理策略"""
  7. def __init__(self):
  8. self.logger = logging.getLogger("schema_tools.LargeTableHandler")
  9. self.large_table_threshold = SCHEMA_TOOLS_CONFIG.get("large_table_threshold", 1000000)
  10. self.skip_large_tables = SCHEMA_TOOLS_CONFIG.get("skip_large_tables", False)
  11. self.max_table_size = SCHEMA_TOOLS_CONFIG.get("max_table_size", 10000000)
  12. def should_skip_table(self, row_count: Optional[int]) -> bool:
  13. """
  14. 判断是否应跳过表
  15. Args:
  16. row_count: 表行数
  17. Returns:
  18. 是否跳过
  19. """
  20. if not self.skip_large_tables or row_count is None:
  21. return False
  22. if row_count > self.max_table_size:
  23. self.logger.warning(f"表行数({row_count})超过最大限制({self.max_table_size}),将跳过处理")
  24. return True
  25. return False
  26. def is_large_table(self, row_count: Optional[int]) -> bool:
  27. """
  28. 判断是否为大表
  29. Args:
  30. row_count: 表行数
  31. Returns:
  32. 是否为大表
  33. """
  34. if row_count is None:
  35. return False
  36. return row_count > self.large_table_threshold
  37. async def get_smart_sample(self, db_inspector, table_name: str, schema_name: str,
  38. row_count: Optional[int], limit: int = 20) -> List[Dict[str, Any]]:
  39. """
  40. 智能采样策略
  41. Args:
  42. db_inspector: 数据库检查工具实例
  43. table_name: 表名
  44. schema_name: Schema名
  45. row_count: 表行数
  46. limit: 采样数量限制
  47. Returns:
  48. 采样数据列表
  49. """
  50. full_table_name = f"{schema_name}.{table_name}"
  51. # 如果不是大表,使用简单采样
  52. if not self.is_large_table(row_count):
  53. return await self._simple_sample(db_inspector, full_table_name, limit)
  54. self.logger.info(f"表 {full_table_name} 有 {row_count} 行,使用智能采样策略")
  55. # 大表使用分层采样
  56. return await self._stratified_sample(db_inspector, full_table_name, row_count, limit)
  57. async def _simple_sample(self, db_inspector, full_table_name: str, limit: int) -> List[Dict[str, Any]]:
  58. """简单采样策略"""
  59. query = f"SELECT * FROM {full_table_name} LIMIT {limit}"
  60. async with db_inspector.connection_pool.acquire() as conn:
  61. rows = await conn.fetch(query)
  62. return [dict(row) for row in rows]
  63. async def _stratified_sample(self, db_inspector, full_table_name: str,
  64. row_count: int, limit: int) -> List[Dict[str, Any]]:
  65. """分层采样策略(用于大表)"""
  66. samples_per_section = max(1, limit // 3)
  67. samples = []
  68. async with db_inspector.connection_pool.acquire() as conn:
  69. # 1. 前N行采样
  70. front_query = f"SELECT * FROM {full_table_name} LIMIT {samples_per_section}"
  71. front_rows = await conn.fetch(front_query)
  72. samples.extend([dict(row) for row in front_rows])
  73. # 2. 随机中间采样
  74. if row_count > samples_per_section * 2:
  75. try:
  76. # 使用TABLESAMPLE进行随机采样
  77. sample_percent = min(1.0, (samples_per_section * 100.0) / row_count)
  78. middle_query = f"""
  79. SELECT * FROM {full_table_name}
  80. TABLESAMPLE SYSTEM({sample_percent})
  81. LIMIT {samples_per_section}
  82. """
  83. middle_rows = await conn.fetch(middle_query)
  84. samples.extend([dict(row) for row in middle_rows])
  85. except Exception as e:
  86. self.logger.warning(f"TABLESAMPLE采样失败,使用OFFSET采样: {e}")
  87. # 回退到OFFSET采样
  88. offset = random.randint(samples_per_section, row_count - samples_per_section)
  89. offset_query = f"SELECT * FROM {full_table_name} OFFSET {offset} LIMIT {samples_per_section}"
  90. offset_rows = await conn.fetch(offset_query)
  91. samples.extend([dict(row) for row in offset_rows])
  92. # 3. 后N行采样
  93. remaining = limit - len(samples)
  94. if remaining > 0 and row_count > limit:
  95. # 使用OFFSET获取最后的行
  96. offset = max(0, row_count - remaining)
  97. tail_query = f"SELECT * FROM {full_table_name} OFFSET {offset} LIMIT {remaining}"
  98. tail_rows = await conn.fetch(tail_query)
  99. samples.extend([dict(row) for row in tail_rows])
  100. self.logger.info(f"智能采样完成,获取了 {len(samples)} 条数据")
  101. return samples[:limit] # 确保不超过限制
  102. def get_sampling_strategy_info(self, row_count: Optional[int]) -> Dict[str, Any]:
  103. """
  104. 获取采样策略信息
  105. Args:
  106. row_count: 表行数
  107. Returns:
  108. 策略信息字典
  109. """
  110. if row_count is None:
  111. return {
  112. 'strategy': 'simple',
  113. 'reason': '未知表大小',
  114. 'is_large_table': False
  115. }
  116. if self.should_skip_table(row_count):
  117. return {
  118. 'strategy': 'skip',
  119. 'reason': f'表太大({row_count}行),超过限制({self.max_table_size}行)',
  120. 'is_large_table': True
  121. }
  122. if self.is_large_table(row_count):
  123. return {
  124. 'strategy': 'smart',
  125. 'reason': f'大表({row_count}行),使用智能采样',
  126. 'is_large_table': True
  127. }
  128. return {
  129. 'strategy': 'simple',
  130. 'reason': f'普通表({row_count}行),使用简单采样',
  131. 'is_large_table': False
  132. }