init_tables.sql 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. -- Data Pipeline API 数据库初始化脚本
  2. --
  3. -- 此脚本在pgvector向量数据库中创建Data Pipeline API系统所需的表和索引
  4. -- 注意:这些表应该创建在pgvector数据库中,而不是业务数据库中
  5. --
  6. -- 执行方式(使用PGVECTOR_CONFIG中的连接信息):
  7. -- psql -h host -p port -U username -d pgvector_database_name -f init_tables.sql
  8. -- 设置客户端编码
  9. SET client_encoding = 'UTF8';
  10. -- 开始事务
  11. BEGIN;
  12. -- ====================================================================
  13. -- 任务主表 (data_pipeline_tasks)
  14. -- ====================================================================
  15. CREATE TABLE IF NOT EXISTS data_pipeline_tasks (
  16. -- 主键:时间戳格式的任务ID
  17. id VARCHAR(32) PRIMARY KEY, -- 'task_20250627_143052'
  18. -- 任务基本信息
  19. task_type VARCHAR(50) NOT NULL DEFAULT 'data_workflow',
  20. status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending/in_progress/partial_completed/completed/failed
  21. -- 配置和结果(JSON格式)
  22. parameters JSONB NOT NULL, -- 任务配置参数
  23. result JSONB, -- 最终执行结果
  24. -- 错误处理
  25. error_message TEXT, -- 错误详细信息
  26. -- 步骤状态跟踪
  27. step_status JSONB DEFAULT '{
  28. "ddl_generation": "pending",
  29. "qa_generation": "pending",
  30. "sql_validation": "pending",
  31. "training_load": "pending"
  32. }'::jsonb,
  33. -- 时间戳
  34. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  35. started_at TIMESTAMP,
  36. completed_at TIMESTAMP,
  37. -- 创建者信息
  38. created_by VARCHAR(50) DEFAULT 'api', -- 'api', 'manual', 'system'
  39. -- 输出目录
  40. output_directory TEXT, -- 任务输出目录路径
  41. -- 索引字段
  42. db_name VARCHAR(100), -- 数据库名称(便于筛选)
  43. business_context TEXT -- 业务上下文(便于搜索)
  44. );
  45. -- 添加约束
  46. ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_task_status
  47. CHECK (status IN ('pending', 'in_progress', 'partial_completed', 'completed', 'failed'));
  48. ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_task_type
  49. CHECK (task_type IN ('data_workflow', 'complete_workflow'));
  50. ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_created_by
  51. CHECK (created_by IN ('api', 'manual', 'system'));
  52. -- ====================================================================
  53. -- 任务执行记录表 (data_pipeline_task_executions)
  54. -- ====================================================================
  55. CREATE TABLE IF NOT EXISTS data_pipeline_task_executions (
  56. id SERIAL PRIMARY KEY,
  57. task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
  58. execution_step VARCHAR(50) NOT NULL, -- 'ddl_generation', 'qa_generation', 'sql_validation', 'training_load', 'complete'
  59. status VARCHAR(20) NOT NULL, -- 'running', 'completed', 'failed'
  60. started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  61. completed_at TIMESTAMP,
  62. error_message TEXT,
  63. execution_result JSONB, -- 步骤执行结果
  64. execution_id VARCHAR(100) UNIQUE, -- {task_id}_step_{step_name}_exec_{timestamp}
  65. force_executed BOOLEAN DEFAULT FALSE, -- 是否强制执行
  66. files_cleaned BOOLEAN DEFAULT FALSE, -- 是否清理了旧文件
  67. duration_seconds INTEGER -- 执行时长(秒)
  68. );
  69. -- 添加约束
  70. ALTER TABLE data_pipeline_task_executions ADD CONSTRAINT chk_execution_status
  71. CHECK (status IN ('running', 'completed', 'failed'));
  72. ALTER TABLE data_pipeline_task_executions ADD CONSTRAINT chk_execution_step
  73. CHECK (execution_step IN ('ddl_generation', 'qa_generation', 'sql_validation', 'training_load', 'complete'));
  74. ALTER TABLE data_pipeline_task_executions ADD CONSTRAINT chk_duration_positive
  75. CHECK (duration_seconds IS NULL OR duration_seconds >= 0);
  76. -- ====================================================================
  77. -- 任务日志表 (data_pipeline_task_logs)
  78. -- ====================================================================
  79. CREATE TABLE IF NOT EXISTS data_pipeline_task_logs (
  80. id SERIAL PRIMARY KEY,
  81. task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
  82. execution_id VARCHAR(100) REFERENCES data_pipeline_task_executions(execution_id) ON DELETE SET NULL,
  83. -- 日志内容
  84. log_level VARCHAR(10) NOT NULL, -- 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'
  85. message TEXT NOT NULL, -- 日志消息内容
  86. -- 上下文信息
  87. step_name VARCHAR(50), -- 执行步骤名称
  88. module_name VARCHAR(100), -- 模块名称
  89. function_name VARCHAR(100), -- 函数名称
  90. -- 时间戳
  91. timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  92. -- 额外信息(JSON格式)
  93. extra_data JSONB DEFAULT '{}'::jsonb -- 额外的结构化信息
  94. );
  95. -- 添加约束
  96. ALTER TABLE data_pipeline_task_logs ADD CONSTRAINT chk_log_level
  97. CHECK (log_level IN ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'));
  98. -- ====================================================================
  99. -- 任务输出文件表 (data_pipeline_task_outputs)
  100. -- ====================================================================
  101. CREATE TABLE IF NOT EXISTS data_pipeline_task_outputs (
  102. id SERIAL PRIMARY KEY,
  103. task_id VARCHAR(32) REFERENCES data_pipeline_tasks(id) ON DELETE CASCADE,
  104. execution_id VARCHAR(100) REFERENCES data_pipeline_task_executions(execution_id) ON DELETE SET NULL,
  105. -- 文件信息
  106. file_type VARCHAR(50) NOT NULL, -- 'ddl', 'md', 'json', 'log', 'report'
  107. file_name VARCHAR(255) NOT NULL, -- 文件名
  108. file_path TEXT NOT NULL, -- 相对路径
  109. file_size BIGINT DEFAULT 0, -- 文件大小(字节)
  110. -- 文件内容摘要
  111. content_hash VARCHAR(64), -- 文件内容hash
  112. description TEXT, -- 文件描述
  113. -- 时间戳
  114. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  115. modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  116. -- 状态
  117. is_primary BOOLEAN DEFAULT FALSE, -- 是否为主要输出文件
  118. is_downloadable BOOLEAN DEFAULT TRUE -- 是否可下载
  119. );
  120. -- 添加约束
  121. ALTER TABLE data_pipeline_task_outputs ADD CONSTRAINT chk_file_type
  122. CHECK (file_type IN ('ddl', 'md', 'json', 'log', 'report', 'txt', 'other'));
  123. ALTER TABLE data_pipeline_task_outputs ADD CONSTRAINT chk_file_size_positive
  124. CHECK (file_size >= 0);
  125. -- ====================================================================
  126. -- 创建索引
  127. -- ====================================================================
  128. -- 任务表索引
  129. CREATE INDEX IF NOT EXISTS idx_tasks_status ON data_pipeline_tasks(status);
  130. CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON data_pipeline_tasks(created_at DESC);
  131. CREATE INDEX IF NOT EXISTS idx_tasks_db_name ON data_pipeline_tasks(db_name);
  132. CREATE INDEX IF NOT EXISTS idx_tasks_created_by ON data_pipeline_tasks(created_by);
  133. CREATE INDEX IF NOT EXISTS idx_tasks_task_type ON data_pipeline_tasks(task_type);
  134. -- 执行记录表索引
  135. CREATE INDEX IF NOT EXISTS idx_executions_task_id ON data_pipeline_task_executions(task_id);
  136. CREATE INDEX IF NOT EXISTS idx_executions_step ON data_pipeline_task_executions(execution_step);
  137. CREATE INDEX IF NOT EXISTS idx_executions_status ON data_pipeline_task_executions(status);
  138. CREATE INDEX IF NOT EXISTS idx_executions_started_at ON data_pipeline_task_executions(started_at DESC);
  139. CREATE INDEX IF NOT EXISTS idx_executions_task_step ON data_pipeline_task_executions(task_id, execution_step);
  140. -- 日志表索引
  141. CREATE INDEX IF NOT EXISTS idx_logs_task_id ON data_pipeline_task_logs(task_id);
  142. CREATE INDEX IF NOT EXISTS idx_logs_execution_id ON data_pipeline_task_logs(execution_id);
  143. CREATE INDEX IF NOT EXISTS idx_logs_timestamp ON data_pipeline_task_logs(timestamp DESC);
  144. CREATE INDEX IF NOT EXISTS idx_logs_level ON data_pipeline_task_logs(log_level);
  145. CREATE INDEX IF NOT EXISTS idx_logs_step ON data_pipeline_task_logs(step_name);
  146. CREATE INDEX IF NOT EXISTS idx_logs_task_timestamp ON data_pipeline_task_logs(task_id, timestamp DESC);
  147. -- 文件输出表索引
  148. CREATE INDEX IF NOT EXISTS idx_outputs_task_id ON data_pipeline_task_outputs(task_id);
  149. CREATE INDEX IF NOT EXISTS idx_outputs_execution_id ON data_pipeline_task_outputs(execution_id);
  150. CREATE INDEX IF NOT EXISTS idx_outputs_file_type ON data_pipeline_task_outputs(file_type);
  151. CREATE INDEX IF NOT EXISTS idx_outputs_primary ON data_pipeline_task_outputs(is_primary) WHERE is_primary = TRUE;
  152. CREATE INDEX IF NOT EXISTS idx_outputs_downloadable ON data_pipeline_task_outputs(is_downloadable) WHERE is_downloadable = TRUE;
  153. -- ====================================================================
  154. -- 创建清理函数
  155. -- ====================================================================
  156. -- 清理旧任务的函数
  157. CREATE OR REPLACE FUNCTION cleanup_old_data_pipeline_tasks(days_to_keep INTEGER DEFAULT 30)
  158. RETURNS INTEGER AS $$
  159. DECLARE
  160. deleted_count INTEGER;
  161. cutoff_date TIMESTAMP;
  162. BEGIN
  163. cutoff_date := NOW() - INTERVAL '1 day' * days_to_keep;
  164. -- 删除旧任务(级联删除相关日志和文件记录)
  165. DELETE FROM data_pipeline_tasks
  166. WHERE created_at < cutoff_date
  167. AND status IN ('completed', 'failed');
  168. GET DIAGNOSTICS deleted_count = ROW_COUNT;
  169. -- 记录清理操作
  170. INSERT INTO data_pipeline_task_logs (task_id, log_level, message, step_name)
  171. VALUES ('system', 'INFO',
  172. FORMAT('清理了 %s 个超过 %s 天的旧任务', deleted_count, days_to_keep),
  173. 'cleanup');
  174. RETURN deleted_count;
  175. END;
  176. $$ LANGUAGE plpgsql;
  177. -- 获取任务统计信息的函数
  178. CREATE OR REPLACE FUNCTION get_data_pipeline_task_stats()
  179. RETURNS TABLE (
  180. total_tasks INTEGER,
  181. pending_tasks INTEGER,
  182. running_tasks INTEGER,
  183. completed_tasks INTEGER,
  184. failed_tasks INTEGER,
  185. avg_completion_time INTERVAL
  186. ) AS $$
  187. BEGIN
  188. RETURN QUERY
  189. SELECT
  190. COUNT(*)::INTEGER as total_tasks,
  191. COUNT(*) FILTER (WHERE status = 'pending')::INTEGER as pending_tasks,
  192. COUNT(*) FILTER (WHERE status IN ('in_progress'))::INTEGER as running_tasks,
  193. COUNT(*) FILTER (WHERE status = 'completed')::INTEGER as completed_tasks,
  194. COUNT(*) FILTER (WHERE status = 'failed')::INTEGER as failed_tasks,
  195. AVG(completed_at - started_at) FILTER (WHERE status = 'completed') as avg_completion_time
  196. FROM data_pipeline_tasks;
  197. END;
  198. $$ LANGUAGE plpgsql;
  199. -- 检查僵尸任务的函数
  200. CREATE OR REPLACE FUNCTION check_zombie_data_pipeline_tasks(timeout_hours INTEGER DEFAULT 2)
  201. RETURNS INTEGER AS $$
  202. DECLARE
  203. zombie_count INTEGER;
  204. cutoff_time TIMESTAMP;
  205. BEGIN
  206. cutoff_time := NOW() - INTERVAL '1 hour' * timeout_hours;
  207. -- 查找超时的运行中执行
  208. UPDATE data_pipeline_task_executions
  209. SET status = 'failed',
  210. error_message = FORMAT('执行超时(超过%s小时),可能已停止运行', timeout_hours),
  211. completed_at = NOW()
  212. WHERE status = 'running'
  213. AND started_at < cutoff_time;
  214. GET DIAGNOSTICS zombie_count = ROW_COUNT;
  215. -- 更新相关任务状态
  216. UPDATE data_pipeline_tasks
  217. SET status = 'failed',
  218. error_message = FORMAT('任务超时(超过%s小时),可能已停止运行', timeout_hours)
  219. WHERE status IN ('in_progress')
  220. AND started_at < cutoff_time;
  221. -- 记录检查操作
  222. IF zombie_count > 0 THEN
  223. INSERT INTO data_pipeline_task_logs (task_id, log_level, message, step_name)
  224. VALUES ('system', 'WARNING',
  225. FORMAT('发现并处理了 %s 个僵尸执行', zombie_count),
  226. 'zombie_check');
  227. END IF;
  228. RETURN zombie_count;
  229. END;
  230. $$ LANGUAGE plpgsql;
  231. -- ====================================================================
  232. -- 插入初始数据(如果需要)
  233. -- ====================================================================
  234. -- 这里可以插入一些初始配置数据
  235. -- 目前暂时不需要
  236. -- ====================================================================
  237. -- 创建视图(便于查询)
  238. -- ====================================================================
  239. -- 任务执行概览视图
  240. CREATE OR REPLACE VIEW v_task_execution_overview AS
  241. SELECT
  242. t.id as task_id,
  243. t.task_type,
  244. t.status as task_status,
  245. t.step_status,
  246. t.created_at,
  247. t.started_at,
  248. t.completed_at,
  249. t.created_by,
  250. t.db_name,
  251. COALESCE(e.current_execution, '{}') as current_execution,
  252. COALESCE(e.execution_count, 0) as total_executions
  253. FROM data_pipeline_tasks t
  254. LEFT JOIN (
  255. SELECT
  256. task_id,
  257. COUNT(*) as execution_count,
  258. json_build_object(
  259. 'execution_id', e1.execution_id,
  260. 'step', e1.execution_step,
  261. 'status', e1.status,
  262. 'started_at', e1.started_at
  263. ) as current_execution
  264. FROM data_pipeline_task_executions e1
  265. WHERE e1.id = (
  266. SELECT e2.id
  267. FROM data_pipeline_task_executions e2
  268. WHERE e2.task_id = e1.task_id
  269. ORDER BY e2.started_at DESC
  270. LIMIT 1
  271. )
  272. GROUP BY task_id, e1.execution_id, e1.execution_step, e1.status, e1.started_at
  273. ) e ON t.id = e.task_id;
  274. -- 提交事务
  275. COMMIT;
  276. -- 输出创建结果
  277. \echo 'Data Pipeline API 数据库表创建完成!'
  278. \echo ''
  279. \echo '已创建的表:'
  280. \echo '- data_pipeline_tasks: 任务主表'
  281. \echo '- data_pipeline_task_executions: 任务执行记录表'
  282. \echo '- data_pipeline_task_logs: 任务日志表'
  283. \echo '- data_pipeline_task_outputs: 任务输出文件表'
  284. \echo ''
  285. \echo '已创建的函数:'
  286. \echo '- cleanup_old_data_pipeline_tasks(days): 清理旧任务'
  287. \echo '- get_data_pipeline_task_stats(): 获取任务统计'
  288. \echo '- check_zombie_data_pipeline_tasks(hours): 检查僵尸任务'
  289. \echo ''
  290. \echo '已创建的视图:'
  291. \echo '- v_task_execution_overview: 任务执行概览'