# Cursor 自动任务执行指令 **重要:请立即执行以下任务!** **生成时间**: 2026-01-21 18:28:11 **待执行任务数量**: 2 ## 任务完成后的操作 完成每个任务后,请更新 `tasks/pending_tasks.json` 中对应任务的 `status` 为 `completed`, 并填写 `code_name`(代码文件名)和 `code_path`(代码路径)。 调度脚本会自动将完成的任务同步到数据库。 ## 任务约束要求 **重要约束**:完成脚本创建后,**不需要生成任务总结文件**。 - 不要创建任何 summary、report、总结类的文档文件 - 不要生成 task_summary.md、execution_report.md 等总结文件 - 只需创建任务要求的功能脚本文件 - 只需更新 `tasks/pending_tasks.json` 中的任务状态 --- ## 任务 1: 产品库存表的原始数据导入 - **任务ID**: `43` - **创建时间**: 2026-01-21 18:20:44 - **创建者**: cursor ### 任务描述 # Task: 产品库存表的原始数据导入 ## Source Tables ### product_inventory_table_raw_data **Data Source** - **Type**: RDBMS - **Host**: 192.168.3.143 - **Port**: 5432 - **Database**: dataops - **Schema**: public **DDL** ```sql CREATE TABLE product_inventory_table_raw_data ( updated_at timestamp COMMENT '更新时间', created_at timestamp COMMENT '创建时间', is_active boolean COMMENT '是否激活', turnover_rate numeric(5, 2) COMMENT '周转率', outbound_quantity_30d integer COMMENT '30天出库数量', inbound_quantity_30d integer COMMENT '30天入库数量', last_outbound_date date COMMENT '最近出库日期', last_inbound_date date COMMENT '最近入库日期', stock_status varchar(50) COMMENT '库存状态', selling_price numeric(10, 2) COMMENT '销售价格', unit_cost numeric(10, 2) COMMENT '单位成本', max_stock integer COMMENT '最大库存', safety_stock integer COMMENT '安全库存', current_stock integer COMMENT '当前库存', warehouse varchar(100) COMMENT '仓库', supplier varchar(200) COMMENT '供应商', brand varchar(100) COMMENT '品牌', category varchar(100) COMMENT '类别', product_name varchar(200) COMMENT '产品名称', sku varchar(50) COMMENT 'SKU', id serial COMMENT '主键' ); COMMENT ON TABLE product_inventory_table_raw_data IS '产品库存表-原始数据'; ``` ## Target Tables ### test_product_inventory **Data Source** - **Type**: postgresql - **Host**: 192.168.3.143 - **Port**: 5678 - **Database**: dataops - **Schema**: dags **DDL** ```sql CREATE TABLE test_product_inventory ( updated_at timestamp COMMENT '更新时间', created_at timestamp COMMENT '创建时间', is_active boolean COMMENT '是否启用', turnover_rate numeric(5, 2) COMMENT '周转率', outbound_quantity_30d integer COMMENT '30天出库数量', inbound_quantity_30d integer COMMENT '30天入库数量', last_outbound_date date COMMENT '最近出库日期', last_inbound_date date COMMENT '最近入库日期', stock_status varchar(50) COMMENT '库存状态', selling_price numeric(10, 2) COMMENT '销售价格', unit_cost numeric(10, 2) COMMENT '单位成本', max_stock integer COMMENT '最大库存', safety_stock integer COMMENT '安全库存', current_stock integer COMMENT '当前库存', warehouse varchar(100) COMMENT '仓库', supplier varchar(200) COMMENT '供应商', brand varchar(100) COMMENT '品牌', category varchar(100) COMMENT '类别', product_name varchar(200) COMMENT '产品名称', sku varchar(50) COMMENT 'SKU', id serial COMMENT 'ID', create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间' ); COMMENT ON TABLE test_product_inventory IS '产品库存表'; ``` ## Update Mode - **Mode**: Append (追加模式) - **Description**: 新数据将追加到目标表,不删除现有数据 ## Request Content 从标签为原始数据的产品库存表导入数据到数据资源的产品库存表。 ## Implementation Steps 1. Extract data from source tables as specified in the DDL 2. Apply transformation logic according to the rule: - Rule: 从标签为原始数据的产品库存表导入数据到数据资源的产品库存表。 3. Generate Python program to implement the data transformation logic 4. Write transformed data to target table using append mode --- ## 任务 2: DF_DO202601210001 - **任务ID**: `44` - **创建时间**: 2026-01-21 18:25:47 - **创建者**: system ### 任务描述 # Task: DF_DO202601210001 ## Related Information - **Order ID**: 26 - **Order No**: DO202601210001 - **DataFlow ID**: 2291 - **DataFlow Name**: 仓库库存汇总表_数据流程 - **Product ID**: 23 ## Source Tables ### test_product_inventory **Data Source** - **Type**: postgresql - **Host**: 192.168.3.143 - **Port**: 5678 - **Database**: dataops - **Schema**: dags **DDL** ```sql CREATE TABLE test_product_inventory ( updated_at timestamp COMMENT '更新时间', created_at timestamp COMMENT '创建时间', is_active boolean COMMENT '是否启用', turnover_rate numeric(5, 2) COMMENT '周转率', outbound_quantity_30d integer COMMENT '30天出库数量', inbound_quantity_30d integer COMMENT '30天入库数量', last_outbound_date date COMMENT '最近出库日期', last_inbound_date date COMMENT '最近入库日期', stock_status varchar(50) COMMENT '库存状态', selling_price numeric(10, 2) COMMENT '销售价格', unit_cost numeric(10, 2) COMMENT '单位成本', max_stock integer COMMENT '最大库存', safety_stock integer COMMENT '安全库存', current_stock integer COMMENT '当前库存', warehouse varchar(100) COMMENT '仓库', supplier varchar(200) COMMENT '供应商', brand varchar(100) COMMENT '品牌', category varchar(100) COMMENT '类别', product_name varchar(200) COMMENT '产品名称', sku varchar(50) COMMENT 'SKU', id serial COMMENT 'ID' ); COMMENT ON TABLE test_product_inventory IS '产品库存表'; ``` ## Target Tables ### warehouse_inventory_summary **Data Source** - **Type**: postgresql - **Host**: 192.168.3.143 - **Port**: 5678 - **Database**: dataops - **Schema**: dags **DDL** ```sql CREATE TABLE warehouse_inventory_summary ( total_inventory integer COMMENT '库存数量', warehouse_name varchar(255) COMMENT '仓库名称', create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间' ); COMMENT ON TABLE warehouse_inventory_summary IS '仓库库存汇总表'; ``` ## Update Mode - **Mode**: Append (追加模式) - **Description**: 新数据将追加到目标表,不删除现有数据 ## Request Content 1. 从源数据'产品库存表'中提取'仓库名称'字段;2. 对'产品库存表'中的'库存数量'字段进行求和计算;3. 按'仓库名称'进行分组;4. 最终输出数据格式包含'仓库名称'和对应的'库存数量'两个字段。 ## Implementation Steps 1. Extract data from source tables as specified in the DDL 2. Apply transformation logic according to the rule: - Rule: 1. 从源数据'产品库存表'中提取'仓库名称'字段;2. 对'产品库存表'中的'库存数量'字段进行求和计算;3. 按'仓库名称'进行分组;4. 最终输出数据格式包含'仓库名称'和对应的'库存数量'两个字段。 3. Generate Python program to implement the data transformation logic 4. Write transformed data to target table using append mode ---