123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- -- Data Pipeline API 数据库初始化脚本
- --
- -- 此脚本在pgvector向量数据库中创建Data Pipeline API系统所需的表和索引
- -- 注意:这些表应该创建在pgvector数据库中,而不是业务数据库中
- --
- -- 执行方式(使用PGVECTOR_CONFIG中的连接信息):
- -- psql -h host -p port -U username -d pgvector_database_name -f init_tables.sql
- -- 设置客户端编码
- SET client_encoding = 'UTF8';
- -- 开始事务
- BEGIN;
- -- ====================================================================
- -- 任务主表 (data_pipeline_tasks)
- -- ====================================================================
- CREATE TABLE IF NOT EXISTS data_pipeline_tasks (
- -- 主键:时间戳格式的任务ID
- task_id VARCHAR(32) PRIMARY KEY, -- 'task_20250627_143052'
-
- -- 任务基本信息
- task_name VARCHAR(255), -- 任务自定义名称(可选)
- task_type VARCHAR(50) NOT NULL DEFAULT 'data_workflow',
- status VARCHAR(20) NOT NULL DEFAULT 'pending', -- pending/in_progress/partial_completed/completed/failed
-
- -- 配置和结果(JSON格式)
- parameters JSONB NOT NULL, -- 任务配置参数
- result JSONB, -- 最终执行结果
-
- -- 错误处理
- error_message TEXT, -- 错误详细信息
-
- -- 时间戳
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- started_at TIMESTAMP,
- completed_at TIMESTAMP,
-
- -- 创建者信息
- created_type VARCHAR(50) DEFAULT 'api', -- 'api', 'manual', 'system'
- by_user VARCHAR(50), -- 'guest'或其它user_id
-
- -- 输出目录
- output_directory TEXT, -- 任务输出目录路径
-
- -- 索引字段
- db_name VARCHAR(100) -- 数据库名称(便于筛选)
- );
- -- 添加约束
- ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_task_status
- CHECK (status IN ('pending', 'in_progress', 'partial_completed', 'completed', 'failed'));
- ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_task_type
- CHECK (task_type IN ('data_workflow', 'complete_workflow'));
- ALTER TABLE data_pipeline_tasks ADD CONSTRAINT chk_created_type
- CHECK (created_type IN ('api', 'manual', 'system'));
- -- ====================================================================
- -- 任务步骤状态表 (data_pipeline_task_steps)
- -- ====================================================================
- CREATE TABLE IF NOT EXISTS data_pipeline_task_steps (
- id SERIAL PRIMARY KEY,
- task_id VARCHAR(32) REFERENCES data_pipeline_tasks(task_id) ON DELETE CASCADE,
- execution_id VARCHAR(100), -- 执行批次ID(可为空)
- step_name VARCHAR(50) NOT NULL, -- 'ddl_generation', 'qa_generation', 'sql_validation', 'training_load'
- step_status VARCHAR(50) NOT NULL DEFAULT 'pending', -- 'pending', 'running', 'completed', 'failed'
- started_at TIMESTAMP,
- completed_at TIMESTAMP,
- error_message TEXT -- 错误详细信息
- );
- -- 添加约束
- ALTER TABLE data_pipeline_task_steps ADD CONSTRAINT chk_step_status
- CHECK (step_status IN ('pending', 'running', 'completed', 'failed'));
- ALTER TABLE data_pipeline_task_steps ADD CONSTRAINT chk_step_name
- CHECK (step_name IN ('ddl_generation', 'qa_generation', 'sql_validation', 'training_load'));
- -- ====================================================================
- -- 创建索引
- -- ====================================================================
- -- 任务表索引
- CREATE INDEX IF NOT EXISTS idx_tasks_status ON data_pipeline_tasks(status);
- CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON data_pipeline_tasks(created_at DESC);
- CREATE INDEX IF NOT EXISTS idx_tasks_db_name ON data_pipeline_tasks(db_name);
- CREATE INDEX IF NOT EXISTS idx_tasks_created_type ON data_pipeline_tasks(created_type);
- CREATE INDEX IF NOT EXISTS idx_tasks_task_type ON data_pipeline_tasks(task_type);
- CREATE INDEX IF NOT EXISTS idx_tasks_task_name ON data_pipeline_tasks(task_name);
- -- 步骤状态表索引
- CREATE INDEX IF NOT EXISTS idx_steps_task_id ON data_pipeline_task_steps(task_id);
- CREATE INDEX IF NOT EXISTS idx_steps_step_name ON data_pipeline_task_steps(step_name);
- CREATE INDEX IF NOT EXISTS idx_steps_step_status ON data_pipeline_task_steps(step_status);
- CREATE INDEX IF NOT EXISTS idx_steps_started_at ON data_pipeline_task_steps(started_at DESC);
- CREATE INDEX IF NOT EXISTS idx_steps_task_step ON data_pipeline_task_steps(task_id, step_name);
- -- ====================================================================
- -- 创建清理函数
- -- ====================================================================
- -- 清理旧任务的函数
- CREATE OR REPLACE FUNCTION cleanup_old_data_pipeline_tasks(days_to_keep INTEGER DEFAULT 30)
- RETURNS INTEGER AS $$
- DECLARE
- deleted_count INTEGER;
- cutoff_date TIMESTAMP;
- BEGIN
- cutoff_date := NOW() - INTERVAL '1 day' * days_to_keep;
-
- -- 删除旧任务(级联删除相关步骤记录)
- DELETE FROM data_pipeline_tasks
- WHERE created_at < cutoff_date
- AND status IN ('completed', 'failed');
-
- GET DIAGNOSTICS deleted_count = ROW_COUNT;
-
- RETURN deleted_count;
- END;
- $$ LANGUAGE plpgsql;
- -- 获取任务统计信息的函数
- CREATE OR REPLACE FUNCTION get_data_pipeline_task_stats()
- RETURNS TABLE (
- total_tasks INTEGER,
- pending_tasks INTEGER,
- running_tasks INTEGER,
- completed_tasks INTEGER,
- failed_tasks INTEGER,
- avg_completion_time INTERVAL
- ) AS $$
- BEGIN
- RETURN QUERY
- SELECT
- COUNT(*)::INTEGER as total_tasks,
- COUNT(*) FILTER (WHERE status = 'pending')::INTEGER as pending_tasks,
- COUNT(*) FILTER (WHERE status IN ('in_progress'))::INTEGER as running_tasks,
- COUNT(*) FILTER (WHERE status = 'completed')::INTEGER as completed_tasks,
- COUNT(*) FILTER (WHERE status = 'failed')::INTEGER as failed_tasks,
- AVG(completed_at - started_at) FILTER (WHERE status = 'completed') as avg_completion_time
- FROM data_pipeline_tasks;
- END;
- $$ LANGUAGE plpgsql;
- -- 检查僵尸任务的函数
- CREATE OR REPLACE FUNCTION check_zombie_data_pipeline_tasks(timeout_hours INTEGER DEFAULT 2)
- RETURNS INTEGER AS $$
- DECLARE
- zombie_count INTEGER;
- cutoff_time TIMESTAMP;
- BEGIN
- cutoff_time := NOW() - INTERVAL '1 hour' * timeout_hours;
-
- -- 查找超时的运行中步骤
- UPDATE data_pipeline_task_steps
- SET step_status = 'failed',
- error_message = FORMAT('步骤执行超时(超过%s小时),可能已停止运行', timeout_hours),
- completed_at = NOW()
- WHERE step_status = 'running'
- AND started_at < cutoff_time;
-
- GET DIAGNOSTICS zombie_count = ROW_COUNT;
-
- -- 更新相关任务状态
- UPDATE data_pipeline_tasks
- SET status = 'failed',
- error_message = FORMAT('任务超时(超过%s小时),可能已停止运行', timeout_hours)
- WHERE status IN ('in_progress')
- AND started_at < cutoff_time;
-
- RETURN zombie_count;
- END;
- $$ LANGUAGE plpgsql;
- -- ====================================================================
- -- 插入初始数据(如果需要)
- -- ====================================================================
- -- 这里可以插入一些初始配置数据
- -- 目前暂时不需要
- -- ====================================================================
- -- 创建视图(便于查询)
- -- ====================================================================
- -- 任务步骤概览视图
- CREATE OR REPLACE VIEW v_task_step_overview AS
- SELECT
- t.task_id,
- t.task_name,
- t.task_type,
- t.status as task_status,
- t.created_at,
- t.started_at,
- t.completed_at,
- t.created_type,
- t.by_user,
- t.db_name,
- s.step_name,
- s.step_status,
- s.started_at as step_started_at,
- s.completed_at as step_completed_at,
- s.error_message as step_error_message
- FROM data_pipeline_tasks t
- LEFT JOIN data_pipeline_task_steps s ON t.task_id = s.task_id
- ORDER BY t.created_at DESC,
- CASE s.step_name
- WHEN 'ddl_generation' THEN 1
- WHEN 'qa_generation' THEN 2
- WHEN 'sql_validation' THEN 3
- WHEN 'training_load' THEN 4
- ELSE 5
- END;
- -- 提交事务
- COMMIT;
- -- 输出创建结果
- \echo 'Data Pipeline API 数据库表创建完成!'
- \echo ''
- \echo '已创建的表:'
- \echo '- data_pipeline_tasks: 任务主表'
- \echo '- data_pipeline_task_steps: 任务步骤状态表'
- \echo ''
- \echo '已创建的函数:'
- \echo '- cleanup_old_data_pipeline_tasks(days): 清理旧任务'
- \echo '- get_data_pipeline_task_stats(): 获取任务统计'
- \echo '- check_zombie_data_pipeline_tasks(hours): 检查僵尸任务'
- \echo ''
- \echo '已创建的视图:'
- \echo '- v_task_step_overview: 任务步骤概览'
|