1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237 |
- import json
- import re
- import logging
- from py2neo import Relationship
- import pandas as pd
- from app.services.neo4j_driver import neo4j_driver
- from app.services.package_function import create_or_get_node, relationship_exists, get_node
- import time
- logger = logging.getLogger("app")
- def get_formatted_time():
- """获取格式化的当前时间"""
- return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- def get_node_by_id(label, id):
- """根据ID获取指定标签的节点"""
- try:
- with neo4j_driver.get_session() as session:
- # 确保id为整数
- try:
- id_int = int(id)
- except (ValueError, TypeError):
- logger.error(f"节点ID不是有效的整数: {id}")
- return None
-
- cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
- result = session.run(cypher, id=id_int)
- record = result.single()
- return record["n"] if record else None
- except Exception as e:
- logger.error(f"根据ID获取节点失败: {str(e)}")
- return None
- def get_node_by_id_no_label(id):
- """根据ID获取节点,不限制标签"""
- try:
- with neo4j_driver.get_session() as session:
- # 确保id为整数
- try:
- id_int = int(id)
- except (ValueError, TypeError):
- logger.error(f"节点ID不是有效的整数: {id}")
- return None
-
- cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
- result = session.run(cypher, id=id_int)
- record = result.single()
- return record["n"] if record else None
- except Exception as e:
- logger.error(f"根据ID获取节点失败: {str(e)}")
- return None
- def delete_relationships(start_node, rel_type=None, end_node=None):
- """删除关系"""
- try:
- with neo4j_driver.get_session() as session:
- if rel_type and end_node:
- cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
- cypher = cypher.replace("{rel_type}", rel_type)
- session.run(cypher, start_id=start_node.id, end_id=end_node.id)
- elif rel_type:
- cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
- cypher = cypher.replace("{rel_type}", rel_type)
- session.run(cypher, start_id=start_node.id)
- else:
- cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
- session.run(cypher, start_id=start_node.id)
- return True
- except Exception as e:
- logger.error(f"删除关系失败: {str(e)}")
- return False
- def update_or_create_node(label, **properties):
- """更新或创建节点"""
- try:
- with neo4j_driver.get_session() as session:
- node_id = properties.pop('id', None)
- if node_id:
- # 更新现有节点
- set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
- cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
- result = session.run(cypher, id=int(node_id), **properties)
- else:
- # 创建新节点
- props_str = ", ".join([f"{k}: ${k}" for k in properties.keys()])
- cypher = f"CREATE (n:{label} {{{props_str}}}) RETURN n"
- result = session.run(cypher, **properties)
-
- record = result.single()
- return record["n"] if record else None
- except Exception as e:
- logger.error(f"更新或创建节点失败: {str(e)}")
- return None
- # 数据资源-元数据 关系节点创建、查询
- def handle_node(receiver, head_data, data_source=None, resource_type=None):
- """处理数据资源节点创建和关系建立"""
- try:
- # 根据resource_type设置type属性的值
- if resource_type == 'ddl':
- type_value = 'ddl'
- else:
- type_value = 'structure'
-
- # 更新属性
- update_attributes = {
- 'en_name': receiver['en_name'],
- 'time': get_formatted_time(),
- 'type': type_value # 根据资源类型设置不同的type值
- }
- if 'additional_info' in receiver:
- del receiver['additional_info']
- # 从receiver中移除data_source属性,避免将复杂对象作为节点属性
- if 'data_source' in receiver:
- del receiver['data_source']
-
- tag_list = receiver.get('tag')
- receiver.update(update_attributes)
- # 创建或获取 data_resource 节点
- with neo4j_driver.get_session() as session:
- props_str = ", ".join([f"{k}: ${k}" for k in receiver.keys()])
- cypher = f"""
- MERGE (n:data_resource {{name: $name}})
- ON CREATE SET n = {{{props_str}}}
- ON MATCH SET {", ".join([f"n.{k} = ${k}" for k in receiver.keys()])}
- RETURN n
- """
- result = session.run(cypher, **receiver)
- data_resource_node = result.single()["n"]
- resource_id = data_resource_node.id # 使用id属性获取数值ID
- # 处理标签关系
- if tag_list:
- tag_node = get_node_by_id('data_label', tag_list)
- if tag_node:
- # 检查关系是否存在
- rel_check = """
- MATCH (a:data_resource)-[r:label]->(b:data_label)
- WHERE id(a) = $resource_id AND id(b) = $tag_id
- RETURN r
- """
- rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id) # 使用数值id
-
- # 如果关系不存在则创建
- if not rel_result.single():
- rel_create = """
- MATCH (a:data_resource), (b:data_label)
- WHERE id(a) = $resource_id AND id(b) = $tag_id
- CREATE (a)-[r:label]->(b)
- RETURN r
- """
- session.run(rel_create, resource_id=resource_id, tag_id=tag_node.id)
-
-
- # 处理头部数据(元数据,字段)
- if head_data:
- for item in head_data:
- # 创建元数据节点
- meta_cypher = """
- MERGE (m:meta_data {name: $name})
- ON CREATE SET m.en_name = $en_name,
- m.create_time = $create_time,
- m.data_type = $type,
- m.status = 'true'
- ON MATCH SET m.data_type = $type,
- m.status = 'true'
- RETURN m
- """
-
- create_time = get_formatted_time()
- meta_result = session.run(
- meta_cypher,
- name=item['name'],
- en_name=item['en_name'],
- create_time=create_time,
- type=item['data_type'] # 使用data_type作为data_type属性
- )
- meta_record = meta_result.single()
-
- if meta_record and meta_record["m"]:
- meta_node = meta_record["m"]
- meta_id = meta_node.id # 使用数值ID
-
- # 打印日志确认节点创建成功和ID
- logger.info(f"创建或获取到元数据节点: ID={meta_id}, name={item['name']}")
-
- # 确认数据资源节点是否可以正确查询到
- check_resource_cypher = """
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- RETURN n
- """
- check_resource = session.run(check_resource_cypher, resource_id=resource_id)
- if check_resource.single():
- logger.info(f"找到数据资源节点: ID={resource_id}")
- else:
- logger.error(f"无法找到数据资源节点: ID={resource_id}")
- continue
-
- # 创建关系
- rel_cypher = """
- MATCH (a:data_resource), (m:meta_data)
- WHERE id(a) = $resource_id AND id(m) = $meta_id
- MERGE (a)-[r:contain]->(m)
- RETURN r
- """
-
- rel_result = session.run(
- rel_cypher,
- resource_id=resource_id,
- meta_id=meta_id
- )
-
- rel_record = rel_result.single()
- if rel_record:
- logger.info(f"成功创建数据资源与元数据的关系: {resource_id} -> {meta_id}")
- else:
- logger.warning(f"创建数据资源与元数据的关系失败: {resource_id} -> {meta_id}")
- else:
- logger.error(f"未能创建或获取元数据节点: {item['name']}")
-
- # 处理数据源关系
- if data_source and resource_type == 'ddl':
- try:
- # 创建或获取数据源节点
- # data_source_en_name = handle_data_source(data_source)
- data_source_en_name = data_source['en_name']
-
- # 创建数据资源与数据源的关系
- if data_source_en_name:
- # 创建 originates_from 关系
- rel_data_source_cypher = """
- MATCH (a:data_resource), (b:data_source)
- WHERE id(a) = $resource_id AND b.en_name = $ds_en_name
- MERGE (a)-[r:originates_from]->(b)
- RETURN r
- """
- rel_result = session.run(
- rel_data_source_cypher,
- resource_id=resource_id,
- ds_en_name=data_source_en_name
- )
- rel_record = rel_result.single()
-
- if rel_record:
- logger.info(f"已创建数据资源与数据源的关系: {resource_id} -> {data_source_en_name}")
- else:
- # 添加严重错误日志
- error_msg = f"创建数据资源与数据源的关系失败: {resource_id} -> {data_source_en_name}"
- logger.error(error_msg)
-
- # 检查数据源节点是否存在
- check_ds_cypher = "MATCH (b:data_source) WHERE b.en_name = $ds_en_name RETURN b"
- check_ds_result = session.run(check_ds_cypher, ds_en_name=data_source_en_name)
- if not check_ds_result.single():
- logger.error(f"数据源节点不存在: en_name={data_source_en_name}")
-
- # 检查数据资源节点是否存在
- check_res_cypher = "MATCH (a:data_resource) WHERE id(a) = $resource_id RETURN a"
- check_res_result = session.run(check_res_cypher, resource_id=resource_id)
- if not check_res_result.single():
- logger.error(f"数据资源节点不存在: id={resource_id}")
-
- # 严重错误应该抛出异常
- raise RuntimeError(error_msg)
- except Exception as e:
- logger.error(f"处理数据源关系失败: {str(e)}")
- raise RuntimeError(f"处理数据源关系失败: {str(e)}")
-
- return resource_id
- except Exception as e:
- logger.error(f"处理数据资源节点创建和关系建立失败: {str(e)}")
- raise
-
- def handle_id_resource(resource_id):
- """处理单个数据资源查询"""
- try:
- with neo4j_driver.get_session() as session:
- # 确保resource_id为整数
- try:
- resource_id_int = int(resource_id)
- except (ValueError, TypeError):
- logger.error(f"资源ID不是有效的整数: {resource_id}")
- return None
-
- # 使用数值ID查询
- cypher = """
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- RETURN n
- """
- result = session.run(cypher, resource_id=resource_id_int)
- record = result.single()
-
- if not record:
- logger.error(f"未找到资源,ID: {resource_id_int}")
- return None
-
- # 构建返回数据
- data_resource = dict(record["n"])
- data_resource["id"] = record["n"].id
-
- # 查询关联的标签
- tag_cypher = """
- MATCH (n:data_resource)-[r:label]->(t:data_label)
- WHERE id(n) = $resource_id
- RETURN t
- """
- tag_result = session.run(tag_cypher, resource_id=resource_id_int)
- tag_record = tag_result.single()
-
- if tag_record:
- tag = dict(tag_record["t"])
- tag["id"] = tag_record["t"].id
- data_resource["tag_info"] = tag
-
- # 查询关联的元数据 - 支持meta_data和Metadata两种标签
- meta_cypher = """
- MATCH (n:data_resource)-[:contain]->(m)
- WHERE id(n) = $resource_id
- AND (m:meta_data OR m:Metadata)
- RETURN m
- """
- meta_result = session.run(meta_cypher, resource_id=resource_id_int)
-
- meta_list = []
- for meta_record in meta_result:
- meta = dict(meta_record["m"])
- meta["id"] = meta_record["m"].id
- meta_list.append(meta)
-
- data_resource["meta_list"] = meta_list
-
- logger.info(f"成功获取资源详情,ID: {resource_id_int}")
- return data_resource
- except Exception as e:
- logger.error(f"处理单个数据资源查询失败: {str(e)}")
- return None
- def id_resource_graph(resource_id):
- """获取数据资源图谱"""
- try:
- with neo4j_driver.get_session() as session:
- # 查询数据资源节点及其关系
- cypher = """
- MATCH (n:data_resource)-[r]-(m)
- WHERE id(n) = $resource_id
- RETURN n, r, m
- """
- result = session.run(cypher, resource_id=int(resource_id))
-
- # 收集节点和关系
- nodes = {}
- relationships = []
-
- for record in result:
- # 处理源节点
- source_node = dict(record["n"])
- source_node["id"] = record["n"].id
- nodes[source_node["id"]] = source_node
-
- # 处理目标节点
- target_node = dict(record["m"])
- target_node["id"] = record["m"].id
- nodes[target_node["id"]] = target_node
-
- # 处理关系
- rel = record["r"]
- relationship = {
- "id": rel.id,
- "source": record["n"].id,
- "target": record["m"].id,
- "type": rel.type
- }
- relationships.append(relationship)
-
- return {
- "nodes": list(nodes.values()),
- "relationships": relationships
- }
- except Exception as e:
- logger.error(f"获取数据资源图谱失败: {str(e)}")
- return {"nodes": [], "relationships": []}
- def resource_list(page, page_size, en_name_filter=None, name_filter=None,
- type_filter='all', category_filter=None, tag_filter=None):
- """获取数据资源列表"""
- try:
- with neo4j_driver.get_session() as session:
- # 构建查询条件
- match_clause = "MATCH (n:data_resource)"
- where_conditions = []
-
- if en_name_filter:
- where_conditions.append(f"n.en_name CONTAINS '{en_name_filter}'")
-
- if name_filter:
- where_conditions.append(f"n.name CONTAINS '{name_filter}'")
-
- if type_filter and type_filter != 'all':
- where_conditions.append(f"n.type = '{type_filter}'")
-
- if category_filter:
- where_conditions.append(f"n.category = '{category_filter}'")
-
- # 标签过滤需要额外的匹配
- if tag_filter:
- match_clause += "-[:label]->(t:data_label)"
- where_conditions.append(f"t.name = '{tag_filter}'")
-
- where_clause = " WHERE " + " AND ".join(where_conditions) if where_conditions else ""
-
- # 计算总数
- count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
- count_result = session.run(count_cypher)
- total_count = count_result.single()["count"]
-
- # 分页查询
- skip = (page - 1) * page_size
- cypher = f"""
- {match_clause}{where_clause}
- RETURN n
- ORDER BY n.time DESC
- SKIP {skip} LIMIT {page_size}
- """
- result = session.run(cypher)
-
- # 格式化结果
- resources = []
- for record in result:
- node = dict(record["n"])
- node["id"] = record["n"].id
-
- # 查询关联的标签
- tag_cypher = """
- MATCH (n:data_resource)-[r:label]->(t:data_label)
- WHERE id(n) = $resource_id
- RETURN t
- """
- tag_result = session.run(tag_cypher, resource_id=node["id"])
- tag_record = tag_result.single()
-
- if tag_record:
- tag = dict(tag_record["t"])
- tag["id"] = tag_record["t"].id
- node["tag_info"] = tag
-
- resources.append(node)
-
- return resources, total_count
- except Exception as e:
- logger.error(f"获取数据资源列表失败: {str(e)}")
- return [], 0
- def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
- name_filter=None, category_filter=None, tag_filter=None):
- """获取特定数据资源关联的元数据列表"""
- try:
- with neo4j_driver.get_session() as session:
- # 确保resource_id为整数
- try:
- resource_id_int = int(resource_id)
- except (ValueError, TypeError):
- logger.error(f"资源ID不是有效的整数: {resource_id}")
- return [], 0
-
- # 基本匹配语句 - 支持meta_data和Metadata标签
- match_clause = """
- MATCH (n:data_resource)-[:contain]->(m)
- WHERE id(n) = $resource_id
- AND (m:meta_data OR m:Metadata)
- """
-
- where_conditions = []
-
- if en_name_filter:
- where_conditions.append(f"m.en_name CONTAINS '{en_name_filter}'")
-
- if name_filter:
- where_conditions.append(f"m.name CONTAINS '{name_filter}'")
-
- if category_filter:
- where_conditions.append(f"m.category = '{category_filter}'")
-
- # 标签过滤需要额外的匹配
- tag_match = ""
- if tag_filter:
- tag_match = "MATCH (m)-[:HAS_TAG]->(t:Tag) WHERE t.name = $tag_filter"
-
- where_clause = " AND " + " AND ".join(where_conditions) if where_conditions else ""
-
- # 计算总数
- count_cypher = f"""
- {match_clause}{where_clause}
- {tag_match}
- RETURN count(m) as count
- """
- count_params = {"resource_id": resource_id_int}
- if tag_filter:
- count_params["tag_filter"] = tag_filter
-
- count_result = session.run(count_cypher, **count_params)
- total_count = count_result.single()["count"]
-
- # 分页查询
- skip = (page - 1) * page_size
- cypher = f"""
- {match_clause}{where_clause}
- {tag_match}
- RETURN m
- ORDER BY m.name
- SKIP {skip} LIMIT {page_size}
- """
-
- result = session.run(cypher, **count_params)
-
- # 格式化结果
- metadata_list = []
- for record in result:
- meta = dict(record["m"])
- meta["id"] = record["m"].id
- metadata_list.append(meta)
-
- logger.info(f"成功获取资源关联元数据,ID: {resource_id_int}, 元数据数量: {total_count}")
- return metadata_list, total_count
- except Exception as e:
- logger.error(f"获取数据资源关联的元数据列表失败: {str(e)}")
- return [], 0
- def resource_kinship_graph(resource_id, include_meta=True):
- """获取数据资源亲缘关系图谱"""
- try:
- with neo4j_driver.get_session() as session:
- # 确保resource_id为整数
- try:
- resource_id_int = int(resource_id)
- except (ValueError, TypeError):
- logger.error(f"资源ID不是有效的整数: {resource_id}")
- return {"nodes": [], "relationships": []}
-
- # 基本查询
- cypher_parts = [
- f"MATCH (n:data_resource) WHERE id(n) = $resource_id",
- "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
- ]
-
- # 是否包含元数据 - 支持meta_data和Metadata两种标签
- if include_meta:
- cypher_parts.append("OPTIONAL MATCH (n)-[:contain]->(m) WHERE (m:meta_data OR m:Metadata)")
-
- cypher_parts.append("RETURN n, l, collect(m) as metadata")
-
- cypher = "\n".join(cypher_parts)
-
- result = session.run(cypher, resource_id=resource_id_int)
- record = result.single()
-
- if not record:
- logger.error(f"未找到资源图谱数据,ID: {resource_id_int}")
- return {"nodes": [], "relationships": []}
-
- # 收集节点和关系
- nodes = {}
- relationships = []
-
- # 处理数据资源节点
- resource_node = dict(record["n"])
- resource_node["id"] = record["n"].id
- resource_node["labels"] = list(record["n"].labels)
- nodes[resource_node["id"]] = resource_node
-
- # 处理标签节点
- if record["l"]:
- label_node = dict(record["l"])
- label_node["id"] = record["l"].id
- label_node["labels"] = list(record["l"].labels)
- nodes[label_node["id"]] = label_node
-
- # 添加资源-标签关系
- relationships.append({
- "id": f"rel-{resource_node['id']}-label-{label_node['id']}",
- "source": resource_node["id"],
- "target": label_node["id"],
- "type": "label"
- })
-
- # 处理元数据节点
- if include_meta and record["metadata"]:
- for meta in record["metadata"]:
- if meta: # 检查元数据节点是否存在
- meta_node = dict(meta)
- meta_node["id"] = meta.id
- meta_node["labels"] = list(meta.labels)
- nodes[meta_node["id"]] = meta_node
-
- # 添加资源-元数据关系
- relationships.append({
- "id": f"rel-{resource_node['id']}-contain-{meta_node['id']}",
- "source": resource_node["id"],
- "target": meta_node["id"],
- "type": "contain"
- })
-
- logger.info(f"成功获取资源图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
- return {
- "nodes": list(nodes.values()),
- "relationships": relationships
- }
- except Exception as e:
- logger.error(f"获取数据资源亲缘关系图谱失败: {str(e)}")
- return {"nodes": [], "relationships": []}
- def resource_impact_all_graph(resource_id, include_meta=True):
- """获取数据资源影响关系图谱"""
- try:
- with neo4j_driver.get_session() as session:
- # 确保resource_id为整数
- try:
- resource_id_int = int(resource_id)
- except (ValueError, TypeError):
- logger.error(f"资源ID不是有效的整数: {resource_id}")
- return {"nodes": [], "relationships": []}
-
- # 根据meta参数决定查询深度
- if include_meta:
- cypher = """
- MATCH path = (n:data_resource)-[*1..3]-(m)
- WHERE id(n) = $resource_id
- RETURN path
- """
- else:
- cypher = """
- MATCH path = (n:data_resource)-[*1..2]-(m)
- WHERE id(n) = $resource_id
- AND NOT (m:meta_data) AND NOT (m:Metadata)
- RETURN path
- """
-
- result = session.run(cypher, resource_id=resource_id_int)
-
- # 收集节点和关系
- nodes = {}
- relationships = {}
-
- for record in result:
- path = record["path"]
-
- # 处理路径中的所有节点
- for node in path.nodes:
- if node.id not in nodes:
- node_dict = dict(node)
- node_dict["id"] = node.id
- node_dict["labels"] = list(node.labels)
- nodes[node.id] = node_dict
-
- # 处理路径中的所有关系
- for rel in path.relationships:
- if rel.id not in relationships:
- rel_dict = {
- "id": rel.id,
- "source": rel.start_node.id,
- "target": rel.end_node.id,
- "type": rel.type
- }
- relationships[rel.id] = rel_dict
-
- logger.info(f"成功获取完整图谱,ID: {resource_id_int}, 节点数: {len(nodes)}")
- return {
- "nodes": list(nodes.values()),
- "relationships": list(relationships.values())
- }
- except Exception as e:
- logger.error(f"获取数据资源影响关系图谱失败: {str(e)}")
- return {"nodes": [], "relationships": []}
- def clean_type(type_str):
- """清洗SQL类型字符串"""
- # 提取基本类型,不包括长度或精度信息
- basic_type = re.sub(r'\(.*?\)', '', type_str).strip().upper()
-
- # 移除 VARYING 这样的后缀
- basic_type = re.sub(r'\s+VARYING$', '', basic_type)
-
- # 标准化常见类型
- type_mapping = {
- 'INT': 'INTEGER',
- 'INT4': 'INTEGER',
- 'INT8': 'BIGINT',
- 'SMALLINT': 'SMALLINT',
- 'BIGINT': 'BIGINT',
- 'FLOAT4': 'FLOAT',
- 'FLOAT8': 'DOUBLE',
- 'REAL': 'FLOAT',
- 'DOUBLE PRECISION': 'DOUBLE',
- 'NUMERIC': 'DECIMAL',
- 'BOOL': 'BOOLEAN',
- 'CHARACTER': 'CHAR',
- 'CHAR VARYING': 'VARCHAR',
- 'CHARACTER VARYING': 'VARCHAR',
- 'TEXT': 'TEXT',
- 'DATE': 'DATE',
- 'TIME': 'TIME',
- 'TIMESTAMP': 'TIMESTAMP',
- 'TIMESTAMPTZ': 'TIMESTAMP WITH TIME ZONE',
- 'BYTEA': 'BINARY',
- 'JSON': 'JSON',
- 'JSONB': 'JSONB',
- 'UUID': 'UUID',
- 'SERIAL': 'SERIAL',
- 'SERIAL4': 'SERIAL',
- 'SERIAL8': 'BIGSERIAL',
- 'BIGSERIAL': 'BIGSERIAL'
- }
-
- # 尝试从映射表中获取标准化的类型
- return type_mapping.get(basic_type, basic_type)
- def clean_field_name(field_name):
- """清洗字段名"""
- return field_name.strip('`').strip('"').strip("'")
- def select_create_ddl(sql_content):
- """从SQL内容中提取创建表的DDL语句"""
- try:
- # 解析复杂的SQL文件,识别所有的CREATE TABLE语句及其关联的注释
- # 找到所有以CREATE TABLE开头的语句块,每个语句块包含主语句和相关的注释
-
- # 首先,分割 SQL 内容按分号
- statements = []
- current_statement = ""
- in_string = False
- string_quote = None
-
- for char in sql_content:
- if char in ["'", '"']:
- if not in_string:
- in_string = True
- string_quote = char
- elif char == string_quote:
- in_string = False
- string_quote = None
- current_statement += char
- elif char == ';' and not in_string:
- current_statement += char
- if current_statement.strip():
- statements.append(current_statement.strip())
- current_statement = ""
- else:
- current_statement += char
-
- if current_statement.strip():
- statements.append(current_statement.strip())
-
- # 找出所有的CREATE TABLE语句和关联的注释
- create_table_statements = []
- create_index = -1
- in_table_block = False
- current_table = None
- current_block = ""
-
- for i, stmt in enumerate(statements):
- if re.search(r'^\s*CREATE\s+TABLE', stmt, re.IGNORECASE):
- # 如果已经在处理表,先保存当前块
- if in_table_block and current_block:
- create_table_statements.append(current_block)
-
- # 开始新的表块
- in_table_block = True
- current_block = stmt
-
- # 提取表名
- table_match = re.search(r'CREATE\s+TABLE\s+(?:(?:"[^"]+"|\'[^\']+\'|[^"\'\s\.]+)\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))', stmt, re.IGNORECASE)
- if table_match:
- current_table = table_match.group(1) or table_match.group(2) or table_match.group(3)
- current_table = current_table.strip('"\'') if current_table else ""
-
- elif in_table_block and (re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE) or
- re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE)):
- # 检查注释是否属于当前表
- if current_table:
- # 表注释处理
- if re.search(r'COMMENT\s+ON\s+TABLE', stmt, re.IGNORECASE):
- table_comment_match = re.search(r'COMMENT\s+ON\s+TABLE\s+[\'"]?(\w+)[\'"]?', stmt, re.IGNORECASE)
- if table_comment_match:
- comment_table = table_comment_match.group(1).strip('"\'')
- if comment_table == current_table:
- current_block += " " + stmt
- else:
- # 这是另一个表的注释,当前表的DDL到此结束
- create_table_statements.append(current_block)
- in_table_block = False
- current_block = ""
- current_table = None
- # 列注释处理
- elif re.search(r'COMMENT\s+ON\s+COLUMN', stmt, re.IGNORECASE):
- column_comment_match = re.search(
- r'COMMENT\s+ON\s+COLUMN\s+[\'"]?(\w+)[\'"]?\.[\'"]?(\w+)[\'"]?\s+IS\s+\'([^\']+)\'',
- stmt,
- re.IGNORECASE
- )
- if column_comment_match:
- comment_table = column_comment_match.group(1)
- if comment_table == current_table:
- current_block += " " + stmt
- else:
- # 这是另一个表的注释,当前表的DDL到此结束
- create_table_statements.append(current_block)
- in_table_block = False
- current_block = ""
- current_table = None
-
- elif in_table_block and re.search(r'^\s*CREATE\s+', stmt, re.IGNORECASE):
- # 如果遇到新的CREATE语句(不是注释),保存当前块并结束
- create_table_statements.append(current_block)
- in_table_block = False
- current_block = ""
- current_table = None
-
- # 添加最后一个块
- if in_table_block and current_block:
- create_table_statements.append(current_block)
-
- # 日志记录
- logger.debug(f"提取到 {len(create_table_statements)} 个DDL语句")
- for i, stmt in enumerate(create_table_statements):
- logger.debug(f"DDL语句 {i+1}: {stmt}")
-
- return create_table_statements
-
- except Exception as e:
- logger.error(f"提取DDL语句失败: {str(e)}")
- # logger.error(traceback.format_exc())
- return []
- def table_sql(sql):
- """解析表定义SQL,支持带schema和不带schema两种格式"""
- try:
- # 支持以下格式:
- # 1. CREATE TABLE tablename
- # 2. CREATE TABLE "tablename"
- # 3. CREATE TABLE 'tablename'
- # 4. CREATE TABLE schema.tablename
- # 5. CREATE TABLE "schema"."tablename"
- # 6. CREATE TABLE 'schema'.'tablename'
- # 匹配表名,支持带引号和不带引号的情况
- table_pattern = r'CREATE\s+TABLE\s+(?:(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\.]+))\.)?(?:"([^"]+)"|\'([^\']+)\'|([^"\'\s\(]+))'
- table_match = re.search(table_pattern, sql, re.IGNORECASE)
-
- if not table_match:
- logger.error(f"无法匹配CREATE TABLE语句: {sql[:100]}...")
- return None
-
- # 获取表名
- schema = table_match.group(1) or table_match.group(2) or table_match.group(3)
- table_name = table_match.group(4) or table_match.group(5) or table_match.group(6)
- if not table_name:
- logger.error("无法解析表名")
- return None
-
- logger.debug(f"解析到表名: {table_name}")
-
- # 提取CREATE TABLE语句的主体部分(括号内的内容)
- body_pattern = r'CREATE\s+TABLE\s+[^(]*\((.*?)\)(?=\s*;|\s*$)'
- body_match = re.search(body_pattern, sql, re.DOTALL | re.IGNORECASE)
-
- if not body_match:
- logger.error("无法提取表主体内容")
- return None
-
- body_text = body_match.group(1).strip()
- logger.debug(f"表定义主体部分: {body_text}")
-
- # 解析字段定义
- fields = []
- # 分割字段定义,处理括号嵌套和引号
- field_defs = []
- pos = 0
- in_parentheses = 0
- in_quotes = False
- quote_char = None
-
- for i, char in enumerate(body_text):
- if char in ["'", '"', '`'] and (not in_quotes or char == quote_char):
- in_quotes = not in_quotes
- if in_quotes:
- quote_char = char
- else:
- quote_char = None
- elif char == '(' and not in_quotes:
- in_parentheses += 1
- elif char == ')' and not in_quotes:
- in_parentheses -= 1
- elif char == ',' and in_parentheses == 0 and not in_quotes:
- field_defs.append(body_text[pos:i].strip())
- pos = i + 1
-
- # 添加最后一个字段定义
- if pos < len(body_text):
- field_defs.append(body_text[pos:].strip())
-
- logger.debug(f"解析出 {len(field_defs)} 个字段定义")
-
- # 处理每个字段定义
- for field_def in field_defs:
- # 跳过约束定义
- if re.match(r'^\s*(?:PRIMARY|UNIQUE|FOREIGN|CHECK|CONSTRAINT)\s+', field_def, re.IGNORECASE):
- continue
-
- # 提取字段名和类型
- field_pattern = r'^\s*(?:"([^"]+)"|\'([^\']+)\'|`([^`]+)`|([a-zA-Z0-9_]+))\s+(.+?)(?:\s+DEFAULT\s+|\s+NOT\s+NULL|\s+REFERENCES|\s*$)'
- field_match = re.search(field_pattern, field_def, re.IGNORECASE)
-
- if field_match:
- # 提取字段名
- field_name = field_match.group(1) or field_match.group(2) or field_match.group(3) or field_match.group(4)
- # 提取类型
- field_type = field_match.group(5).strip()
-
- # 处理类型中可能的括号
- type_base = re.split(r'\s+', field_type)[0]
- clean_type_value = clean_type(type_base)
-
- fields.append((field_name, clean_type_value))
- logger.debug(f"解析到字段: {field_name}, 类型: {clean_type_value}")
- else:
- logger.warning(f"无法解析字段定义: {field_def}")
-
- # 提取表注释
- table_comment = ""
- table_comment_pattern = r"COMMENT\s+ON\s+TABLE\s+(?:['\"]?(\w+)['\"]?)\s+IS\s+'([^']+)'"
- table_comment_match = re.search(table_comment_pattern, sql, re.IGNORECASE)
-
- if table_comment_match:
- comment_table = table_comment_match.group(1)
- if comment_table.strip("'\"") == table_name.strip("'\""):
- table_comment = table_comment_match.group(2)
- logger.debug(f"找到表注释: {table_comment}")
-
- # 提取列注释
- comments = {}
- column_comment_pattern = r"COMMENT\s+ON\s+COLUMN\s+['\"]?(\w+)['\"]?\.['\"]?(\w+)['\"]?\s+IS\s+'([^']+)'"
-
- for match in re.finditer(column_comment_pattern, sql, re.IGNORECASE):
- comment_table = match.group(1)
- column_name = match.group(2)
- comment = match.group(3)
-
- # 检查表名是否匹配
- if comment_table.strip("'\"") == table_name.strip("'\""):
- comments[column_name] = comment
- logger.debug(f"找到列注释: {column_name} - {comment}")
- else:
- logger.debug(f"忽略列注释,表名不匹配: {comment_table} vs {table_name}")
-
- # 检查字段和注释匹配情况
- logger.debug("========字段和注释匹配情况========")
- field_names = [f[0] for f in fields]
- logger.debug(f"字段列表 ({len(field_names)}): {field_names}")
- logger.debug(f"注释字段 ({len(comments)}): {list(comments.keys())}")
-
- # 构建返回结果
- meta_list = []
- for field_name, field_type in fields:
- chinese_name = comments.get(field_name, "")
- meta_list.append({
- "en_name": field_name,
- "data_type": field_type,
- "name": chinese_name if chinese_name else field_name
- })
-
- # 检查表是否存在
- try:
- status = status_query([table_name])
- except Exception as e:
- logger.error(f"检查表存在状态失败: {str(e)}")
- status = [False]
-
- # 构建返回结果
- result = {
- table_name: {
- "exist": status[0] if status else False,
- "meta": meta_list
- }
- }
-
- logger.debug(f"解析结果: {json.dumps(result, ensure_ascii=False)}")
- return result
-
- except Exception as e:
- logger.error(f"解析表定义SQL失败: {str(e)}")
- logger.error(f"异常详情: {e}")
- import traceback
- logger.error(traceback.format_exc())
- return None
- # 判断英文表名是否在图谱中存在
- def status_query(key_list):
- query = """
- unwind $Key_list as name
- OPTIONAL MATCH (n:data_model {en_name: name})
- OPTIONAL MATCH (n:data_resource {en_name: name})
- OPTIONAL MATCH (n:data_metric {en_name: name})
- WITH name, CASE
- WHEN n IS NOT NULL THEN True
- ELSE False
- END AS exist
- return collect(exist)AS exist
- """
- with neo4j_driver.get_session() as session:
- result = session.run(query, Key_list=key_list)
- data = result.value() # 获取单个值
- return data
- def select_sql(sql_query):
- """解析SELECT查询语句"""
- try:
- # 提取SELECT子句
- select_pattern = r'SELECT\s+(.*?)\s+FROM'
- select_match = re.search(select_pattern, sql_query, re.IGNORECASE | re.DOTALL)
-
- if not select_match:
- return None
-
- select_clause = select_match.group(1)
-
- # 分割字段
- fields = []
-
- # 处理字段列表,避免在函数调用中的逗号导致错误分割
- in_parenthesis = 0
- current_field = ""
-
- for char in select_clause:
- if char == '(':
- in_parenthesis += 1
- current_field += char
- elif char == ')':
- in_parenthesis -= 1
- current_field += char
- elif char == ',' and in_parenthesis == 0:
- fields.append(current_field.strip())
- current_field = ""
- else:
- current_field += char
-
- if current_field.strip():
- fields.append(current_field.strip())
-
- # 解析每个字段
- parsed_fields = []
-
- for field in fields:
- # 检查是否有字段别名
- alias_pattern = r'(.*?)\s+[aA][sS]\s+(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))$'
- alias_match = re.search(alias_pattern, field)
-
- if alias_match:
- field_expr = alias_match.group(1).strip()
- field_alias = next((g for g in alias_match.groups()[1:] if g is not None), "")
-
- parsed_fields.append({
- "expression": field_expr,
- "alias": field_alias
- })
- else:
- # 没有别名的情况
- parsed_fields.append({
- "expression": field.strip(),
- "alias": None
- })
-
- # 提取FROM子句和表名
- from_pattern = r'FROM\s+(.*?)(?:\s+WHERE|\s+GROUP|\s+HAVING|\s+ORDER|\s+LIMIT|$)'
- from_match = re.search(from_pattern, sql_query, re.IGNORECASE | re.DOTALL)
-
- tables = []
- if from_match:
- from_clause = from_match.group(1).strip()
-
- # 分析FROM子句中的表
- table_pattern = r'(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))(?:\s+(?:AS\s+)?(?:`([^`]+)`|"([^"]+)"|\'([^\']+)\'|([a-zA-Z0-9_]+))?'
- for match in re.finditer(table_pattern, from_clause):
- table_name = match.group(1) or match.group(2) or match.group(3) or match.group(4)
- if table_name:
- tables.append(table_name)
-
- return tables
-
- except Exception as e:
- logger.error(f"解析SELECT查询语句失败: {str(e)}")
- # logger.error(traceback.format_exc())
- return None
- def model_resource_list(page, page_size, name_filter=None):
- """获取模型资源列表"""
- try:
- with neo4j_driver.get_session() as session:
- # 构建查询条件
- match_clause = "MATCH (n:model_resource)"
- where_clause = ""
-
- if name_filter:
- where_clause = f" WHERE n.name CONTAINS '{name_filter}'"
-
- # 计算总数
- count_cypher = f"{match_clause}{where_clause} RETURN count(n) as count"
- count_result = session.run(count_cypher)
- total_count = count_result.single()["count"]
-
- # 分页查询
- skip = (page - 1) * page_size
- cypher = f"""
- {match_clause}{where_clause}
- RETURN n
- ORDER BY n.createTime DESC
- SKIP {skip} LIMIT {page_size}
- """
- result = session.run(cypher)
-
- # 格式化结果
- resources = []
- for record in result:
- node = dict(record["n"])
- node["id"] = record["n"].id
- resources.append(node)
-
- return resources, total_count
- except Exception as e:
- logger.error(f"获取模型资源列表失败: {str(e)}")
- return [], 0
- def data_resource_edit(data):
- """编辑数据资源"""
- try:
- resource_id = data.get("id")
- if not resource_id:
- raise ValueError("缺少资源ID")
-
- with neo4j_driver.get_session() as session:
- # 更新节点属性
- update_fields = {}
- for key, value in data.items():
- if key != "id" and key != "tag":
- update_fields[key] = value
-
- # 添加更新时间
- update_fields["updateTime"] = get_formatted_time()
-
- # 构建更新语句
- set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
- cypher = f"""
- MATCH (n:data_resource)
- WHERE id(n) = $resource_id
- SET {set_clause}
- RETURN n
- """
-
- result = session.run(cypher, resource_id=int(resource_id), **update_fields)
- updated_node = result.single()
-
- if not updated_node:
- raise ValueError("资源不存在")
-
- # 处理标签关系
- tag_id = data.get("tag")
- if tag_id:
- # 删除旧的标签关系
- delete_rel_cypher = """
- MATCH (n:data_resource)-[r:label]->()
- WHERE id(n) = $resource_id
- DELETE r
- """
- session.run(delete_rel_cypher, resource_id=int(resource_id))
-
- # 创建新的标签关系
- create_rel_cypher = """
- MATCH (n:data_resource), (t:data_label)
- WHERE id(n) = $resource_id AND id(t) = $tag_id
- CREATE (n)-[r:label]->(t)
- RETURN r
- """
-
- session.run(create_rel_cypher, resource_id=int(resource_id), tag_id=int(tag_id))
-
- # 返回更新后的节点
- return dict(updated_node["n"])
- except Exception as e:
- logger.error(f"编辑数据资源失败: {str(e)}")
- raise
- def handle_data_source(data_source):
- """处理数据源的检查和创建
- """
- try:
- # 1. 检查en_name是否存在
- ds_en_name = data_source.get("en_name")
- if not ds_en_name:
- raise ValueError("数据源信息不完整,缺少名称(en_name)")
-
- # 2. 处理name字段
- if "name" not in data_source or not data_source["name"]:
- data_source["name"] = ds_en_name
- logger.debug(f"数据源name为空,使用en_name作为替代: {ds_en_name}")
-
- # 3. 检查是否为简单查询模式
- required_fields = ["type", "host", "port", "database", "username"]
- has_required_fields = all(data_source.get(field) for field in required_fields)
-
- with neo4j_driver.get_session() as session:
- # 简单查询模式:只通过en_name查找已有数据源
- if not has_required_fields:
- logger.info(f"简单数据源查询模式,查找en_name为: {ds_en_name}")
- check_name_cypher = """
- MATCH (ds:data_source {en_name: $en_name})
- RETURN ds
- """
- check_result = session.run(check_name_cypher, en_name=ds_en_name)
- existing_record = check_result.single()
-
- if existing_record:
- # 数据源已存在,返回其名称
- existing_data_source = dict(existing_record["ds"])
- logger.info(f"根据名称找到现有数据源: {existing_data_source.get('en_name')}")
- return existing_data_source.get("en_name")
- else:
- # 数据源不存在,抛出异常
- raise ValueError(f"未找到名称为 {ds_en_name} 的数据源,请先创建该数据源或提供完整的数据源信息")
-
-
- except Exception as e:
- logger.error(f"处理数据源失败: {str(e)}")
- raise RuntimeError(f"处理数据源失败: {str(e)}")
|