test_create_dataflow.py 843 B

1234567891011121314151617181920212223242526272829303132
  1. #!/usr/bin/env python3
  2. """测试创建数据流接口"""
  3. import requests
  4. import json
  5. import time
  6. data = {
  7. "name_zh": f"测试数据产品注册_{int(time.time())}",
  8. "describe": "测试数据产品注册功能",
  9. "category": "应用类",
  10. "leader": "system",
  11. "organization": "citu",
  12. "script_type": "sql",
  13. "update_mode": "append",
  14. "frequency": "月",
  15. "tag": [],
  16. "status": "active",
  17. "script_requirement": {
  18. "rule": "测试规则",
  19. "source_table": [],
  20. "target_table": [241]
  21. }
  22. }
  23. print("创建数据流...")
  24. response = requests.post(
  25. "http://localhost:5500/api/dataflow/add-dataflow",
  26. json=data,
  27. headers={"Content-Type": "application/json"}
  28. )
  29. print("Status Code:", response.status_code)
  30. print("Response:", json.dumps(response.json(), ensure_ascii=False, indent=2))