init_tables.sql 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. -- Data Pipeline API 数据库初始化脚本
  2. --
  3. -- 此脚本在pgvector向量数据库中创建Data Pipeline API系统所需的表和索引
  4. -- 注意:这些表应该创建在pgvector数据库中,而不是业务数据库中
  5. --
  6. -- 执行方式(使用PGVECTOR_CONFIG中的连接信息):
  7. -- psql -h host -p port -U username -d pgvector_database_name -f init_tables.sql
  8. -- 设置客户端编码
  9. SET client_encoding = 'UTF8';
  10. -- 开始事务
  11. BEGIN;
  12. -- ====================================================================
  13. -- 任务主表 (data_pipeline_tasks)
  14. -- ====================================================================
  15. CREATE TABLE IF NOT EXISTS data_pipeline_tasks (
  16. -- 主键:时间戳格式的任务ID
  17. task_id VARCHAR(32) PRIMARY KEY, -- 'task_20250627_143052'
  18. -- 任务基本信息
  19. task_name VARCHAR(255), -- 任务自定义名称(可选)
  20. task_type VARCHAR(50) NOT NULL DEFAULT 'data_workflow',
  21. status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending/in_progress/partial_completed/completed/failed
  22. -- 配置和结果(JSON格式)
  23. parameters JSONB NOT NULL, -- 任务配置参数
  24. result JSONB, -- 最终执行结果
  25. -- 错误处理
  26. error_message TEXT, -- 错误详细信息
  27. -- 时间戳
  28. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  29. started_at TIMESTAMP,
  30. completed_at TIMESTAMP,
  31. -- 创建者信息
  32. created_type VARCHAR(50) DEFAULT 'api', -- 'api', 'manual', 'system'
  33. by_user VARCHAR(50), -- 'guest'或其它user_id
  34. -- 输出目录
  35. output_directory TEXT, -- 任务输出目录路径
  36. -- 索引字段
  37. db_name VARCHAR(100) -- 数据库名称(便于筛选)
  38. );
  39. -- 添加约束
  40. ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_task_status
  41. CHECK (status IN ('pending', 'in_progress', 'partial_completed', 'completed', 'failed'));
  42. ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_task_type
  43. CHECK (task_type IN ('data_workflow', 'complete_workflow'));
  44. ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_created_type
  45. CHECK (created_type IN ('api', 'manual', 'system'));
  46. -- ====================================================================
  47. -- 任务步骤状态表 (data_pipeline_task_steps)
  48. -- ====================================================================
  49. CREATE TABLE IF NOT EXISTS data_pipeline_task_steps (
  50. id SERIAL PRIMARY KEY,
  51. task_id VARCHAR(32) REFERENCES data_pipeline_tasks(task_id) ON DELETE CASCADE,
  52. execution_id VARCHAR(100), -- 执行批次ID(可为空)
  53. step_name VARCHAR(50) NOT NULL, -- 'ddl_generation', 'qa_generation', 'sql_validation', 'training_load'
  54. step_status VARCHAR(50) NOT NULL DEFAULT 'pending', -- 'pending', 'running', 'completed', 'failed'
  55. started_at TIMESTAMP,
  56. completed_at TIMESTAMP,
  57. error_message TEXT -- 错误详细信息
  58. );
  59. -- 添加约束
  60. ALTER TABLE data_pipeline_task_steps ADD CONSTRAINT chk_step_status
  61. CHECK (step_status IN ('pending', 'running', 'completed', 'failed'));
  62. ALTER TABLE data_pipeline_task_steps ADD CONSTRAINT chk_step_name
  63. CHECK (step_name IN ('ddl_generation', 'qa_generation', 'sql_validation', 'training_load'));
  64. -- ====================================================================
  65. -- 创建索引
  66. -- ====================================================================
  67. -- 任务表索引
  68. CREATE INDEX IF NOT EXISTS idx_tasks_status ON data_pipeline_tasks(status);
  69. CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON data_pipeline_tasks(created_at DESC);
  70. CREATE INDEX IF NOT EXISTS idx_tasks_db_name ON data_pipeline_tasks(db_name);
  71. CREATE INDEX IF NOT EXISTS idx_tasks_created_type ON data_pipeline_tasks(created_type);
  72. CREATE INDEX IF NOT EXISTS idx_tasks_task_type ON data_pipeline_tasks(task_type);
  73. CREATE INDEX IF NOT EXISTS idx_tasks_task_name ON data_pipeline_tasks(task_name);
  74. -- 步骤状态表索引
  75. CREATE INDEX IF NOT EXISTS idx_steps_task_id ON data_pipeline_task_steps(task_id);
  76. CREATE INDEX IF NOT EXISTS idx_steps_step_name ON data_pipeline_task_steps(step_name);
  77. CREATE INDEX IF NOT EXISTS idx_steps_step_status ON data_pipeline_task_steps(step_status);
  78. CREATE INDEX IF NOT EXISTS idx_steps_started_at ON data_pipeline_task_steps(started_at DESC);
  79. CREATE INDEX IF NOT EXISTS idx_steps_task_step ON data_pipeline_task_steps(task_id, step_name);
  80. -- ====================================================================
  81. -- 创建清理函数
  82. -- ====================================================================
  83. -- 清理旧任务的函数
  84. CREATE OR REPLACE FUNCTION cleanup_old_data_pipeline_tasks(days_to_keep INTEGER DEFAULT 30)
  85. RETURNS INTEGER AS $$
  86. DECLARE
  87. deleted_count INTEGER;
  88. cutoff_date TIMESTAMP;
  89. BEGIN
  90. cutoff_date := NOW() - INTERVAL '1 day' * days_to_keep;
  91. -- 删除旧任务(级联删除相关步骤记录)
  92. DELETE FROM data_pipeline_tasks
  93. WHERE created_at < cutoff_date
  94. AND status IN ('completed', 'failed');
  95. GET DIAGNOSTICS deleted_count = ROW_COUNT;
  96. RETURN deleted_count;
  97. END;
  98. $$ LANGUAGE plpgsql;
  99. -- 获取任务统计信息的函数
  100. CREATE OR REPLACE FUNCTION get_data_pipeline_task_stats()
  101. RETURNS TABLE (
  102. total_tasks INTEGER,
  103. pending_tasks INTEGER,
  104. running_tasks INTEGER,
  105. completed_tasks INTEGER,
  106. failed_tasks INTEGER,
  107. avg_completion_time INTERVAL
  108. ) AS $$
  109. BEGIN
  110. RETURN QUERY
  111. SELECT
  112. COUNT(*)::INTEGER as total_tasks,
  113. COUNT(*) FILTER (WHERE status = 'pending')::INTEGER as pending_tasks,
  114. COUNT(*) FILTER (WHERE status IN ('in_progress'))::INTEGER as running_tasks,
  115. COUNT(*) FILTER (WHERE status = 'completed')::INTEGER as completed_tasks,
  116. COUNT(*) FILTER (WHERE status = 'failed')::INTEGER as failed_tasks,
  117. AVG(completed_at - started_at) FILTER (WHERE status = 'completed') as avg_completion_time
  118. FROM data_pipeline_tasks;
  119. END;
  120. $$ LANGUAGE plpgsql;
  121. -- 检查僵尸任务的函数
  122. CREATE OR REPLACE FUNCTION check_zombie_data_pipeline_tasks(timeout_hours INTEGER DEFAULT 2)
  123. RETURNS INTEGER AS $$
  124. DECLARE
  125. zombie_count INTEGER;
  126. cutoff_time TIMESTAMP;
  127. BEGIN
  128. cutoff_time := NOW() - INTERVAL '1 hour' * timeout_hours;
  129. -- 查找超时的运行中步骤
  130. UPDATE data_pipeline_task_steps
  131. SET step_status = 'failed',
  132. error_message = FORMAT('步骤执行超时(超过%s小时),可能已停止运行', timeout_hours),
  133. completed_at = NOW()
  134. WHERE step_status = 'running'
  135. AND started_at < cutoff_time;
  136. GET DIAGNOSTICS zombie_count = ROW_COUNT;
  137. -- 更新相关任务状态
  138. UPDATE data_pipeline_tasks
  139. SET status = 'failed',
  140. error_message = FORMAT('任务超时(超过%s小时),可能已停止运行', timeout_hours)
  141. WHERE status IN ('in_progress')
  142. AND started_at < cutoff_time;
  143. RETURN zombie_count;
  144. END;
  145. $$ LANGUAGE plpgsql;
  146. -- ====================================================================
  147. -- 插入初始数据(如果需要)
  148. -- ====================================================================
  149. -- 这里可以插入一些初始配置数据
  150. -- 目前暂时不需要
  151. -- ====================================================================
  152. -- 创建视图(便于查询)
  153. -- ====================================================================
  154. -- 任务步骤概览视图
  155. CREATE OR REPLACE VIEW v_task_step_overview AS
  156. SELECT
  157. t.task_id,
  158. t.task_name,
  159. t.task_type,
  160. t.status as task_status,
  161. t.created_at,
  162. t.started_at,
  163. t.completed_at,
  164. t.created_type,
  165. t.by_user,
  166. t.db_name,
  167. s.step_name,
  168. s.step_status,
  169. s.started_at as step_started_at,
  170. s.completed_at as step_completed_at,
  171. s.error_message as step_error_message
  172. FROM data_pipeline_tasks t
  173. LEFT JOIN data_pipeline_task_steps s ON t.task_id = s.task_id
  174. ORDER BY t.created_at DESC,
  175. CASE s.step_name
  176. WHEN 'ddl_generation' THEN 1
  177. WHEN 'qa_generation' THEN 2
  178. WHEN 'sql_validation' THEN 3
  179. WHEN 'training_load' THEN 4
  180. ELSE 5
  181. END;
  182. -- 提交事务
  183. COMMIT;
  184. -- 输出创建结果
  185. \echo 'Data Pipeline API 数据库表创建完成!'
  186. \echo ''
  187. \echo '已创建的表:'
  188. \echo '- data_pipeline_tasks: 任务主表'
  189. \echo '- data_pipeline_task_steps: 任务步骤状态表'
  190. \echo ''
  191. \echo '已创建的函数:'
  192. \echo '- cleanup_old_data_pipeline_tasks(days): 清理旧任务'
  193. \echo '- get_data_pipeline_task_stats(): 获取任务统计'
  194. \echo '- check_zombie_data_pipeline_tasks(hours): 检查僵尸任务'
  195. \echo ''
  196. \echo '已创建的视图:'
  197. \echo '- v_task_step_overview: 任务步骤概览'