| 1234567891011121314151617181920212223242526272829303132 |
- #!/usr/bin/env python3
- """测试创建数据流接口"""
- import requests
- import json
- import time
- data = {
- "name_zh": f"测试数据产品注册_{int(time.time())}",
- "describe": "测试数据产品注册功能",
- "category": "应用类",
- "leader": "system",
- "organization": "citu",
- "script_type": "sql",
- "update_mode": "append",
- "frequency": "月",
- "tag": [],
- "status": "active",
- "script_requirement": {
- "rule": "测试规则",
- "source_table": [],
- "target_table": [241]
- }
- }
- print("创建数据流...")
- response = requests.post(
- "http://localhost:5500/api/dataflow/add-dataflow",
- json=data,
- headers={"Content-Type": "application/json"}
- )
- print("Status Code:", response.status_code)
- print("Response:", json.dumps(response.json(), ensure_ascii=False, indent=2))
|