123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727 |
- import logging
- from typing import Dict, List, Optional, Any
- from datetime import datetime
- import json
- from app.core.llm.llm_service import llm_client
- from app.core.graph.graph_operations import connect_graph, create_or_get_node, get_node, relationship_exists
- from app.core.meta_data import translate_and_parse, get_formatted_time
- from py2neo import Relationship
- from app import db
- from sqlalchemy import text
- logger = logging.getLogger(__name__)
- class DataFlowService:
- """数据流服务类,处理数据流相关的业务逻辑"""
-
- @staticmethod
- def get_dataflows(page: int = 1, page_size: int = 10, search: str = '') -> Dict[str, Any]:
- """
- 获取数据流列表
-
- Args:
- page: 页码
- page_size: 每页大小
- search: 搜索关键词
-
- Returns:
- 包含数据流列表和分页信息的字典
- """
- try:
- # 从图数据库查询数据流列表
- skip_count = (page - 1) * page_size
-
- # 构建搜索条件
- where_clause = ""
- params = {'skip': skip_count, 'limit': page_size}
-
- if search:
- where_clause = "WHERE n.name CONTAINS $search OR n.description CONTAINS $search"
- params['search'] = search
-
- # 查询数据流列表
- query = f"""
- MATCH (n:DataFlow)
- {where_clause}
- RETURN n, id(n) as node_id
- ORDER BY n.created_at DESC
- SKIP $skip
- LIMIT $limit
- """
-
- with connect_graph().session() as session:
- list_result = session.run(query, **params).data()
-
- # 查询总数
- count_query = f"""
- MATCH (n:DataFlow)
- {where_clause}
- RETURN count(n) as total
- """
- count_params = {'search': search} if search else {}
- count_result = session.run(count_query, **count_params).single()
- total = count_result['total'] if count_result else 0
-
- # 格式化结果
- dataflows = []
- for record in list_result:
- node = record['n']
- dataflow = dict(node)
- dataflow['id'] = record['node_id'] # 使用查询返回的node_id
- dataflows.append(dataflow)
-
- return {
- 'list': dataflows,
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': (total + page_size - 1) // page_size
- }
- }
- except Exception as e:
- logger.error(f"获取数据流列表失败: {str(e)}")
- raise e
-
- @staticmethod
- def get_dataflow_by_id(dataflow_id: int) -> Optional[Dict[str, Any]]:
- """
- 根据ID获取数据流详情
-
- Args:
- dataflow_id: 数据流ID
-
- Returns:
- 数据流详情字典,如果不存在则返回None
- """
- try:
- query = """
- MATCH (n:DataFlow)
- WHERE id(n) = $dataflow_id
- OPTIONAL MATCH (n)-[:label]-(la:data_label)
- OPTIONAL MATCH (n)-[:child]-(child)
- OPTIONAL MATCH (parent)-[:child]-(n)
- RETURN n, id(n) as node_id,
- collect(DISTINCT {id: id(la), name: la.name}) as tags,
- collect(DISTINCT {id: id(child), name: child.name}) as children,
- collect(DISTINCT {id: id(parent), name: parent.name}) as parents
- """
-
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
-
- if not result:
- return None
-
- record = result[0]
- node = record['n']
- dataflow = dict(node)
- dataflow['id'] = record['node_id'] # 使用查询返回的node_id
- dataflow['tags'] = record['tags']
- dataflow['children'] = record['children']
- dataflow['parents'] = record['parents']
-
- return dataflow
- except Exception as e:
- logger.error(f"获取数据流详情失败: {str(e)}")
- raise e
-
- @staticmethod
- def create_dataflow(data: Dict[str, Any]) -> Dict[str, Any]:
- """
- 创建新的数据流
-
- Args:
- data: 数据流配置数据
-
- Returns:
- 创建的数据流信息
- """
- try:
- # 验证必填字段
- required_fields = ['name', 'describe']
- for field in required_fields:
- if field not in data:
- raise ValueError(f"缺少必填字段: {field}")
-
- dataflow_name = data['name']
-
- # 使用LLM翻译名称生成英文名
- try:
- result_list = translate_and_parse(dataflow_name)
- name_en = result_list[0] if result_list else dataflow_name.lower().replace(' ', '_')
- except Exception as e:
- logger.warning(f"翻译失败,使用默认英文名: {str(e)}")
- name_en = dataflow_name.lower().replace(' ', '_')
-
- # 准备节点数据
- node_data = {
- 'name_zh': dataflow_name,
- 'name_en': name_en,
- 'category': data.get('category', ''),
- 'organization': data.get('organization', ''),
- 'leader': data.get('leader', ''),
- 'frequency': data.get('frequency', ''),
- 'tag': data.get('tag', ''),
- 'describe': data.get('describe', ''),
- 'status': data.get('status', 'inactive'),
- 'created_at': get_formatted_time(),
- 'updated_at': get_formatted_time()
- }
-
- # 创建或获取数据流节点
- dataflow_id = get_node('DataFlow', name=dataflow_name)
- if dataflow_id:
- raise ValueError(f"数据流 '{dataflow_name}' 已存在")
-
- dataflow_id = create_or_get_node('DataFlow', **node_data)
-
- # 处理标签关系
- tag_id = data.get('tag')
- if tag_id is not None:
- try:
- DataFlowService._handle_tag_relationship(dataflow_id, tag_id)
- except Exception as e:
- logger.warning(f"处理标签关系时出错: {str(e)}")
-
- # 成功创建图数据库节点后,写入PG数据库
- try:
- DataFlowService._save_to_pg_database(data, dataflow_name, name_en)
- logger.info(f"数据流信息已写入PG数据库: {dataflow_name}")
-
- # PG数据库记录成功写入后,在neo4j图数据库中创建script关系
- try:
- DataFlowService._handle_script_relationships(data,dataflow_name,name_en)
- logger.info(f"脚本关系创建成功: {dataflow_name}")
- except Exception as script_error:
- logger.warning(f"创建脚本关系失败: {str(script_error)}")
-
- except Exception as pg_error:
- logger.error(f"写入PG数据库失败: {str(pg_error)}")
- # 注意:这里可以选择回滚图数据库操作,但目前保持图数据库数据
- # 在实际应用中,可能需要考虑分布式事务
-
- # 返回创建的数据流信息
- # 查询创建的节点获取完整信息
- query = "MATCH (n:DataFlow {name: $name}) RETURN n, id(n) as node_id"
- with connect_graph().session() as session:
- id_result = session.run(query, name=dataflow_name).single()
- if id_result:
- dataflow_node = id_result['n']
- node_id = id_result['node_id']
-
- # 将节点属性转换为字典
- result = dict(dataflow_node)
- result['id'] = node_id
- else:
- # 如果查询失败,返回基本信息
- result = {
- 'id': dataflow_id if isinstance(dataflow_id, int) else None,
- 'name': dataflow_name,
- 'name_en': name_en,
- 'created_at': get_formatted_time()
- }
-
- logger.info(f"创建数据流成功: {dataflow_name}")
- return result
-
- except Exception as e:
- logger.error(f"创建数据流失败: {str(e)}")
- raise e
-
- @staticmethod
- def _save_to_pg_database(data: Dict[str, Any], script_name: str, name_en: str):
- """
- 将脚本信息保存到PG数据库
-
- Args:
- data: 包含脚本信息的数据
- script_name: 脚本名称
- en_name: 英文名称
- """
- try:
- # 提取脚本相关信息
- script_requirement = data.get('script_requirement', '')
- script_content = data.get('script_content', '')
- source_table = data.get('source_table', '')
- target_table = data.get('target_table', name_en) # 如果没有指定目标表,使用英文名
- script_type = data.get('script_type', 'python')
- user_name = data.get('created_by', 'system')
- target_dt_column = data.get('target_dt_column', '')
-
- # 验证必需字段
- if not target_table:
- target_table = name_en
- if not script_name:
- raise ValueError("script_name不能为空")
-
- # 构建插入SQL
- insert_sql = text("""
- INSERT INTO dags.data_transform_scripts
- (source_table, target_table, script_name, script_type, script_requirement,
- script_content, user_name, create_time, update_time, target_dt_column)
- VALUES
- (:source_table, :target_table, :script_name, :script_type, :script_requirement,
- :script_content, :user_name, :create_time, :update_time, :target_dt_column)
- ON CONFLICT (target_table, script_name)
- DO UPDATE SET
- source_table = EXCLUDED.source_table,
- script_type = EXCLUDED.script_type,
- script_requirement = EXCLUDED.script_requirement,
- script_content = EXCLUDED.script_content,
- user_name = EXCLUDED.user_name,
- update_time = EXCLUDED.update_time,
- target_dt_column = EXCLUDED.target_dt_column
- """)
-
- # 准备参数
- current_time = datetime.now()
- params = {
- 'source_table': source_table,
- 'target_table': target_table,
- 'script_name': script_name,
- 'script_type': script_type,
- 'script_requirement': script_requirement,
- 'script_content': script_content,
- 'user_name': user_name,
- 'create_time': current_time,
- 'update_time': current_time,
- 'target_dt_column': target_dt_column
- }
-
- # 执行插入操作
- db.session.execute(insert_sql, params)
- db.session.commit()
-
- logger.info(f"成功将脚本信息写入PG数据库: target_table={target_table}, script_name={script_name}")
-
- except Exception as e:
- db.session.rollback()
- logger.error(f"写入PG数据库失败: {str(e)}")
- raise e
-
- @staticmethod
- def _handle_children_relationships(dataflow_node, children_ids):
- """处理子节点关系"""
- logger.debug(f"处理子节点关系,原始children_ids: {children_ids}, 类型: {type(children_ids)}")
-
- # 确保children_ids是列表格式
- if not isinstance(children_ids, (list, tuple)):
- if children_ids is not None:
- children_ids = [children_ids] # 如果是单个值,转换为列表
- logger.debug(f"将单个值转换为列表: {children_ids}")
- else:
- children_ids = [] # 如果是None,转换为空列表
- logger.debug("将None转换为空列表")
-
- for child_id in children_ids:
- try:
- # 查找子节点
- query = "MATCH (n) WHERE id(n) = $child_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, child_id=child_id).data()
-
- if result:
- child_node = result[0]['n']
-
- # 获取dataflow_node的ID
- dataflow_id = getattr(dataflow_node, 'identity', None)
- if dataflow_id is None:
- # 如果没有identity属性,从名称查询ID
- query_id = "MATCH (n:DataFlow) WHERE n.name = $name RETURN id(n) as node_id"
- id_result = session.run(query_id, name=dataflow_node.get('name')).single()
- dataflow_id = id_result['node_id'] if id_result else None
-
- # 创建关系 - 使用ID调用relationship_exists
- if dataflow_id and not relationship_exists(dataflow_id, 'child', child_id):
- session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $child_id CREATE (a)-[:child]->(b)",
- dataflow_id=dataflow_id, child_id=child_id)
- logger.info(f"创建子节点关系: {dataflow_id} -> {child_id}")
- except Exception as e:
- logger.warning(f"创建子节点关系失败 {child_id}: {str(e)}")
-
- @staticmethod
- def _handle_tag_relationship(dataflow_id, tag_id):
- """处理标签关系"""
- try:
- # 查找标签节点
- query = "MATCH (n:data_label) WHERE id(n) = $tag_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, tag_id=tag_id).data()
-
- if result:
- tag_node = result[0]['n']
-
- # 创建关系 - 使用ID调用relationship_exists
- if dataflow_id and not relationship_exists(dataflow_id, 'label', tag_id):
- session.run("MATCH (a), (b) WHERE id(a) = $dataflow_id AND id(b) = $tag_id CREATE (a)-[:label]->(b)",
- dataflow_id=dataflow_id, tag_id=tag_id)
- logger.info(f"创建标签关系: {dataflow_id} -> {tag_id}")
- except Exception as e:
- logger.warning(f"创建标签关系失败 {tag_id}: {str(e)}")
-
- @staticmethod
- def update_dataflow(dataflow_id: int, data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
- """
- 更新数据流
-
- Args:
- dataflow_id: 数据流ID
- data: 更新的数据
-
- Returns:
- 更新后的数据流信息,如果不存在则返回None
- """
- try:
- # 查找节点
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
-
- if not result:
- return None
-
- # 更新节点属性
- update_fields = []
- params = {'dataflow_id': dataflow_id}
-
- for key, value in data.items():
- if key not in ['id', 'created_at']: # 保护字段
- if key == 'config' and isinstance(value, dict):
- value = json.dumps(value, ensure_ascii=False)
- update_fields.append(f"n.{key} = ${key}")
- params[key] = value
-
- if update_fields:
- params['updated_at'] = get_formatted_time()
- update_fields.append("n.updated_at = $updated_at")
-
- update_query = f"""
- MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
- SET {', '.join(update_fields)}
- RETURN n, id(n) as node_id
- """
-
- result = session.run(update_query, **params).data()
-
- if result:
- node = result[0]['n']
- updated_dataflow = dict(node)
- updated_dataflow['id'] = result[0]['node_id'] # 使用查询返回的node_id
-
- logger.info(f"更新数据流成功: ID={dataflow_id}")
- return updated_dataflow
-
- return None
-
- except Exception as e:
- logger.error(f"更新数据流失败: {str(e)}")
- raise e
-
- @staticmethod
- def delete_dataflow(dataflow_id: int) -> bool:
- """
- 删除数据流
-
- Args:
- dataflow_id: 数据流ID
-
- Returns:
- 删除是否成功
- """
- try:
- # 删除节点及其关系
- query = """
- MATCH (n:DataFlow) WHERE id(n) = $dataflow_id
- DETACH DELETE n
- RETURN count(n) as deleted_count
- """
-
- with connect_graph().session() as session:
- delete_result = session.run(query, dataflow_id=dataflow_id).single()
- result = delete_result['deleted_count'] if delete_result else 0
-
- if result and result > 0:
- logger.info(f"删除数据流成功: ID={dataflow_id}")
- return True
-
- return False
-
- except Exception as e:
- logger.error(f"删除数据流失败: {str(e)}")
- raise e
-
- @staticmethod
- def execute_dataflow(dataflow_id: int, params: Dict[str, Any] = None) -> Dict[str, Any]:
- """
- 执行数据流
-
- Args:
- dataflow_id: 数据流ID
- params: 执行参数
-
- Returns:
- 执行结果信息
- """
- try:
- # 检查数据流是否存在
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
-
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
-
- execution_id = f"exec_{dataflow_id}_{int(datetime.now().timestamp())}"
-
- # TODO: 这里应该实际执行数据流
- # 目前返回模拟结果
- result = {
- 'execution_id': execution_id,
- 'dataflow_id': dataflow_id,
- 'status': 'running',
- 'started_at': datetime.now().isoformat(),
- 'params': params or {},
- 'progress': 0
- }
-
- logger.info(f"开始执行数据流: ID={dataflow_id}, execution_id={execution_id}")
- return result
- except Exception as e:
- logger.error(f"执行数据流失败: {str(e)}")
- raise e
-
- @staticmethod
- def get_dataflow_status(dataflow_id: int) -> Dict[str, Any]:
- """
- 获取数据流执行状态
-
- Args:
- dataflow_id: 数据流ID
-
- Returns:
- 执行状态信息
- """
- try:
- # TODO: 这里应该查询实际的执行状态
- # 目前返回模拟状态
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
-
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
-
- status = ['running', 'completed', 'failed', 'pending'][dataflow_id % 4]
-
- return {
- 'dataflow_id': dataflow_id,
- 'status': status,
- 'progress': 100 if status == 'completed' else (dataflow_id * 10) % 100,
- 'started_at': datetime.now().isoformat(),
- 'completed_at': datetime.now().isoformat() if status == 'completed' else None,
- 'error_message': '执行过程中发生错误' if status == 'failed' else None
- }
- except Exception as e:
- logger.error(f"获取数据流状态失败: {str(e)}")
- raise e
-
- @staticmethod
- def get_dataflow_logs(dataflow_id: int, page: int = 1, page_size: int = 50) -> Dict[str, Any]:
- """
- 获取数据流执行日志
-
- Args:
- dataflow_id: 数据流ID
- page: 页码
- page_size: 每页大小
-
- Returns:
- 执行日志列表和分页信息
- """
- try:
- # TODO: 这里应该查询实际的执行日志
- # 目前返回模拟日志
- query = "MATCH (n:DataFlow) WHERE id(n) = $dataflow_id RETURN n"
- with connect_graph().session() as session:
- result = session.run(query, dataflow_id=dataflow_id).data()
-
- if not result:
- raise ValueError(f"数据流不存在: ID={dataflow_id}")
-
- mock_logs = [
- {
- 'id': i,
- 'timestamp': datetime.now().isoformat(),
- 'level': ['INFO', 'WARNING', 'ERROR'][i % 3],
- 'message': f'数据流执行日志消息 {i}',
- 'component': ['source', 'transform', 'target'][i % 3]
- }
- for i in range(1, 101)
- ]
-
- # 分页处理
- total = len(mock_logs)
- start = (page - 1) * page_size
- end = start + page_size
- logs = mock_logs[start:end]
-
- return {
- 'logs': logs,
- 'pagination': {
- 'page': page,
- 'page_size': page_size,
- 'total': total,
- 'total_pages': (total + page_size - 1) // page_size
- }
- }
- except Exception as e:
- logger.error(f"获取数据流日志失败: {str(e)}")
- raise e
- @staticmethod
- def create_script(request_data: str) -> str:
- """
- 使用Deepseek模型生成脚本
-
- Args:
- request_data: 请求数据,用户需求的文本描述
-
- Returns:
- 生成的脚本内容(TXT格式)
- """
- try:
- # 构建prompt
- prompt_parts = []
-
- # 添加系统提示
- prompt_parts.append("请根据以下需求生成相应的数据处理脚本:")
-
- # 直接将request_data作为文本描述添加到prompt中
- prompt_parts.append(request_data)
-
- # 添加格式要求
- prompt_parts.append("\n请生成完整可执行的脚本代码,包含必要的注释和错误处理。")
-
- # 组合prompt
- full_prompt = "\n\n".join(prompt_parts)
-
- logger.info(f"开始调用Deepseek模型生成脚本,prompt长度: {len(full_prompt)}")
-
- # 调用LLM服务
- script_content = llm_client(full_prompt)
-
- if not script_content:
- raise ValueError("Deepseek模型返回空内容")
-
- # 确保返回的是文本格式
- if not isinstance(script_content, str):
- script_content = str(script_content)
-
- logger.info(f"脚本生成成功,内容长度: {len(script_content)}")
-
- return script_content
-
- except Exception as e:
- logger.error(f"生成脚本失败: {str(e)}")
- raise e
- @staticmethod
- def _handle_script_relationships(data: Dict[str, Any],dataflow_name:str,name_en:str):
- """
- 处理脚本关系,在Neo4j图数据库中创建从source_table到target_table之间的DERIVED_FROM关系
-
- Args:
- data: 包含脚本信息的数据字典,应包含script_name, script_type, schedule_status, source_table, target_table, update_mode
- """
- try:
- # 从data中读取键值对
- script_name = dataflow_name,
- script_type = data.get('script_type', 'sql')
- schedule_status = data.get('status', 'inactive')
- source_table = data.get('source_table', '')
- target_table = data.get('target_table', '')
- update_mode = data.get('update_mode', 'full')
-
- # 验证必要字段
- if not source_table or not target_table:
- logger.warning(f"source_table或target_table为空,跳过关系创建: source_table={source_table}, target_table={target_table}")
- return
-
- logger.info(f"开始创建脚本关系: {source_table} -> {target_table}")
-
- with connect_graph().session() as session:
- # 创建或获取source_table节点
- source_node_query = """
- MERGE (source:table {name: $source_table})
- ON CREATE SET source.created_at = $created_at,
- source.type = 'source'
- RETURN source, id(source) as source_id
- """
-
- # 创建或获取target_table节点
- target_node_query = """
- MERGE (target:table {name: $target_table})
- ON CREATE SET target.created_at = $created_at,
- target.type = 'target'
- RETURN target, id(target) as target_id
- """
-
- current_time = get_formatted_time()
-
- # 执行创建节点的查询
- source_result = session.run(source_node_query,
- source_table=source_table,
- created_at=current_time).single()
- target_result = session.run(target_node_query,
- target_table=target_table,
- created_at=current_time).single()
-
- if source_result and target_result:
- source_id = source_result['source_id']
- target_id = target_result['target_id']
-
- # 检查关系是否已存在
- relationship_check_query = """
- MATCH (source:table)-[r:DERIVED_FROM]-(target:table)
- WHERE id(source) = $source_id AND id(target) = $target_id
- RETURN r
- """
-
- existing_relationship = session.run(relationship_check_query,
- source_id=source_id,
- target_id=target_id).single()
-
- if not existing_relationship:
- # 创建DERIVED_FROM关系,从source_table到target_table
- create_relationship_query = """
- MATCH (source:table), (target:table)
- WHERE id(source) = $source_id AND id(target) = $target_id
- CREATE (target)-[r:DERIVED_FROM]->(source)
- SET r.script_name = $script_name,
- r.script_type = $script_type,
- r.schedule_status = $schedule_status,
- r.update_mode = $update_mode,
- r.created_at = $created_at,
- r.updated_at = $created_at
- RETURN r
- """
-
- session.run(create_relationship_query,
- source_id=source_id,
- target_id=target_id,
- script_name=script_name,
- script_type=script_type,
- schedule_status=schedule_status,
- update_mode=update_mode,
- created_at=current_time)
-
- logger.info(f"成功创建DERIVED_FROM关系: {target_table} -> {source_table} (script: {script_name})")
- else:
- logger.info(f"DERIVED_FROM关系已存在: {target_table} -> {source_table}")
- else:
- logger.error(f"创建表节点失败: source_table={source_table}, target_table={target_table}")
-
- except Exception as e:
- logger.error(f"处理脚本关系失败: {str(e)}")
- raise e
|