Przeglądaj źródła

修改默认建表语句,保留create_dt字段,删除insert_dt字段

wangxq 1 miesiąc temu
rodzic
commit
5787d5a45c

+ 11 - 11
app/core/data_interface/interface.py

@@ -202,23 +202,23 @@ def standard_all_graph(nodeid):
     # 查询语句
     cql = """
     MATCH(da:data_standard)
-    WHERE id(da)=$nodeId
+    WHERE elementId(da)=$nodeId
     OPTIONAL MATCH(a:data_resource)-[:clean_resource]-(da)
     OPTIONAL MATCH(b:data_model)-[:clean_model]-(da)
     OPTIONAL MATCH(da)-[:clean_model]-(m1:meta_node)
     OPTIONAL MATCH(da)-[:clean_model]-(m2:meta_node)
     WITH 
-        collect({id:toString(id(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
-        collect({id:toString(id(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
-        collect({id:toString(id(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
-        collect({id:toString(id(m1)),text:m1.name})+
-        collect({id:toString(id(m2)),text:m2.name})as nodes,da,
-        collect({from:toString(id(a)),to:toString(id(da)),text:'标准'})+
-        collect({from:toString(id(b)),to:toString(id(da)),text:'标准'})+
-        collect({from:toString(id(da)),to:toString(id(m1)),text:'标准清洗'})+
-        collect({from:toString(id(da)),to:toString(id(m2)),text:'标准清洗'})as lines
+        collect({id:toString(elementId(a)),text:a.name,type:split(labels(a)[0],'_')[1]})+
+        collect({id:toString(elementId(b)),text:b.name,type:split(labels(b)[0],'_')[1]})+
+        collect({id:toString(elementId(da)),text:da.name,type:split(labels(da)[0],'_')[1]})+
+        collect({id:toString(elementId(m1)),text:m1.name})+
+        collect({id:toString(elementId(m2)),text:m2.name})as nodes,da,
+        collect({from:toString(elementId(a)),to:toString(elementId(da)),text:'标准'})+
+        collect({from:toString(elementId(b)),to:toString(elementId(da)),text:'标准'})+
+        collect({from:toString(elementId(da)),to:toString(elementId(m1)),text:'标准清洗'})+
+        collect({from:toString(elementId(da)),to:toString(elementId(m2)),text:'标准清洗'})as lines
     WITH  
-        toString(id(da)) as rootId,
+        toString(elementId(da)) as rootId,
         apoc.coll.toSet(lines) as lines,
         apoc.coll.toSet(nodes) as nodes
     RETURN nodes,lines,rootId

+ 17 - 17
app/core/data_resource/resource.py

@@ -17,7 +17,7 @@ def get_node_by_id(label, id):
     """根据ID获取指定标签的节点"""
     try:
         with neo4j_driver.get_session() as session:
-            cypher = f"MATCH (n:{label}) WHERE id(n) = $id RETURN n"
+            cypher = f"MATCH (n:{label}) WHERE elementId(n) = $id RETURN n"
             result = session.run(cypher, id=int(id))
             record = result.single()
             return record["n"] if record else None
@@ -29,7 +29,7 @@ def get_node_by_id_no_label(id):
     """根据ID获取节点,不限制标签"""
     try:
         with neo4j_driver.get_session() as session:
-            cypher = "MATCH (n) WHERE id(n) = $id RETURN n"
+            cypher = "MATCH (n) WHERE elementId(n) = $id RETURN n"
             result = session.run(cypher, id=int(id))
             record = result.single()
             return record["n"] if record else None
@@ -42,15 +42,15 @@ def delete_relationships(start_node, rel_type=None, end_node=None):
     try:
         with neo4j_driver.get_session() as session:
             if rel_type and end_node:
-                cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE id(a) = $start_id AND id(b) = $end_id DELETE r"
+                cypher = "MATCH (a)-[r:`{rel_type}`]->(b) WHERE elementId(a) = $start_id AND elementId(b) = $end_id DELETE r"
                 cypher = cypher.replace("{rel_type}", rel_type)
                 session.run(cypher, start_id=start_node.id, end_id=end_node.id)
             elif rel_type:
-                cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE id(a) = $start_id DELETE r"
+                cypher = "MATCH (a)-[r:`{rel_type}`]->() WHERE elementId(a) = $start_id DELETE r"
                 cypher = cypher.replace("{rel_type}", rel_type)
                 session.run(cypher, start_id=start_node.id)
             else:
-                cypher = "MATCH (a)-[r]->() WHERE id(a) = $start_id DELETE r"
+                cypher = "MATCH (a)-[r]->() WHERE elementId(a) = $start_id DELETE r"
                 session.run(cypher, start_id=start_node.id)
         return True
     except Exception as e:
@@ -65,7 +65,7 @@ def update_or_create_node(label, **properties):
             if node_id:
                 # 更新现有节点
                 set_clause = ", ".join([f"n.{k} = ${k}" for k in properties.keys()])
-                cypher = f"MATCH (n:{label}) WHERE id(n) = $id SET {set_clause} RETURN n"
+                cypher = f"MATCH (n:{label}) WHERE elementId(n) = $id SET {set_clause} RETURN n"
                 result = session.run(cypher, id=int(node_id), **properties)
             else:
                 # 创建新节点
@@ -114,7 +114,7 @@ def handle_node(receiver, head_data, data_resource):
                     # 检查关系是否存在
                     rel_check = """
                     MATCH (a:data_resource)-[r:label]->(b:data_label) 
-                    WHERE id(a) = $resource_id AND id(b) = $tag_id
+                    WHERE elementId(a) = $resource_id AND elementId(b) = $tag_id
                     RETURN r
                     """
                     rel_result = session.run(rel_check, resource_id=resource_id, tag_id=tag_node.id)
@@ -123,7 +123,7 @@ def handle_node(receiver, head_data, data_resource):
                     if not rel_result.single():
                         rel_create = """
                         MATCH (a:data_resource), (b:data_label)
-                        WHERE id(a) = $resource_id AND id(b) = $tag_id
+                        WHERE elementId(a) = $resource_id AND elementId(b) = $tag_id
                         CREATE (a)-[r:label]->(b)
                         RETURN r
                         """
@@ -155,7 +155,7 @@ def handle_node(receiver, head_data, data_resource):
                     # 创建关系
                     rel_cypher = """
                     MATCH (a:data_resource), (m:Metadata)
-                    WHERE id(a) = $resource_id AND id(m) = $meta_id
+                    WHERE elementId(a) = $resource_id AND elementId(m) = $meta_id
                     MERGE (a)-[r:contain]->(m)
                     RETURN r
                     """
@@ -178,7 +178,7 @@ def handle_id_resource(resource_id):
             # 查询数据资源节点
             cypher = """
             MATCH (n:data_resource)
-            WHERE id(n) = $resource_id
+            WHERE elementId(n) = $resource_id
             RETURN n
             """
             result = session.run(cypher, resource_id=int(resource_id))
@@ -348,7 +348,7 @@ def id_data_search_list(resource_id, page, page_size, en_name_filter=None,
             # 基本匹配语句
             match_clause = """
             MATCH (n:data_resource)-[:contain]->(m:Metadata)
-            WHERE id(n) = $resource_id
+            WHERE elementId(n) = $resource_id
             """
             where_conditions = []
             
@@ -411,7 +411,7 @@ def resource_kinship_graph(resource_id, include_meta=True):
         with neo4j_driver.get_session() as session:
             # 基本查询
             cypher_parts = [
-                "MATCH (n:data_resource) WHERE id(n) = $resource_id",
+                "MATCH (n:data_resource) WHERE elementId(n) = $resource_id",
                 "OPTIONAL MATCH (n)-[:label]->(l:data_label)",
             ]
             
@@ -487,13 +487,13 @@ def resource_impact_all_graph(resource_id, include_meta=True):
             if include_meta:
                 cypher = """
                 MATCH path = (n:data_resource)-[*1..3]-(m)
-                WHERE id(n) = $resource_id
+                WHERE elementId(n) = $resource_id
                 RETURN path
                 """
             else:
                 cypher = """
                 MATCH path = (n:data_resource)-[*1..2]-(m)
-                WHERE id(n) = $resource_id
+                WHERE elementId(n) = $resource_id
                 AND NOT (m:Metadata)
                 RETURN path
                 """
@@ -1012,7 +1012,7 @@ def data_resource_edit(data):
             set_clause = ", ".join([f"n.{k} = ${k}" for k in update_fields.keys()])
             cypher = f"""
             MATCH (n:data_resource)
-            WHERE id(n) = $resource_id
+            WHERE elementId(n) = $resource_id
             SET {set_clause}
             RETURN n
             """
@@ -1029,7 +1029,7 @@ def data_resource_edit(data):
                 # 删除旧的标签关系
                 delete_rel_cypher = """
                 MATCH (n:data_resource)-[r:label]->()
-                WHERE id(n) = $resource_id
+                WHERE elementId(n) = $resource_id
                 DELETE r
                 """
                 session.run(delete_rel_cypher, resource_id=int(resource_id))
@@ -1037,7 +1037,7 @@ def data_resource_edit(data):
                 # 创建新的标签关系
                 create_rel_cypher = """
                 MATCH (n:data_resource), (t:data_label)
-                WHERE id(n) = $resource_id AND id(t) = $tag_id
+                WHERE elementId(n) = $resource_id AND elementId(t) = $tag_id
                 CREATE (n)-[r:label]->(t)
                 RETURN r
                 """

+ 32 - 23
app/core/production_line/production_line.py

@@ -293,7 +293,6 @@ def check_and_create_table(table_name, metadata_list):
             CREATE TABLE ods.{table_name} (
                 id SERIAL PRIMARY KEY,
                 {", ".join(columns)},
-                insert_dt TIMESTAMP,
                 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
             )
             """
@@ -305,7 +304,7 @@ def check_and_create_table(table_name, metadata_list):
         else:
             logger.info(f"表 ods.{table_name} 已存在")
             
-            # 检查是否存在insert_dt列
+            # 检查是否存在insert_dt列,如果存在,移除它(因为我们只使用created_at)
             cur.execute(f"""
                 SELECT EXISTS (
                     SELECT FROM information_schema.columns 
@@ -316,10 +315,25 @@ def check_and_create_table(table_name, metadata_list):
             """)
             insert_dt_exists = cur.fetchone()[0]
             
-            # 如果insert_dt列不存在,添加它
-            if not insert_dt_exists:
-                alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN insert_dt TIMESTAMP;"
-                logger.info(f"添加insert_dt列: {alter_sql}")
+            # 如果insert_dt列存在,记录警告但不进行删除(删除列可能导致数据丢失)
+            if insert_dt_exists:
+                logger.warning(f"表 ods.{table_name} 存在冗余的insert_dt列,请考虑后续手动删除")
+            
+            # 检查是否存在created_at列,如果不存在,添加它
+            cur.execute(f"""
+                SELECT EXISTS (
+                    SELECT FROM information_schema.columns 
+                    WHERE table_schema = 'ods' 
+                    AND table_name = '{table_name}'
+                    AND column_name = 'created_at'
+                );
+            """)
+            created_at_exists = cur.fetchone()[0]
+            
+            # 如果created_at列不存在,添加它
+            if not created_at_exists:
+                alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
+                logger.info(f"添加created_at列: {alter_sql}")
                 cur.execute(alter_sql)
                 conn.commit()
             
@@ -422,12 +436,11 @@ def load_excel_to_postgresql(file_path, table_name, metadata_list):
                         
                     columns.append(f"{sql_col_name} {sql_type}")
                 
-                # 创建表,包含insert_dt时间戳字段
+                # 创建表,只包含created_at时间戳字段
                 create_sql = f"""
                 CREATE TABLE ods.{table_name} (
                     id SERIAL PRIMARY KEY,
                     {', '.join(columns)},
-                    insert_dt TIMESTAMP,
                     created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                 );
                 """
@@ -436,21 +449,21 @@ def load_excel_to_postgresql(file_path, table_name, metadata_list):
                 conn.commit()
                 logger.info(f"表 ods.{table_name} 自动创建成功")
             else:
-                # 检查是否存在insert_dt列
+                # 检查是否存在created_at列
                 cur.execute(f"""
                     SELECT EXISTS (
                         SELECT FROM information_schema.columns 
                         WHERE table_schema = 'ods' 
                         AND table_name = '{table_name}'
-                        AND column_name = 'insert_dt'
+                        AND column_name = 'created_at'
                     );
                 """)
-                insert_dt_exists = cur.fetchone()[0]
+                created_at_exists = cur.fetchone()[0]
                 
-                # 如果insert_dt列不存在,添加它
-                if not insert_dt_exists:
-                    alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN insert_dt TIMESTAMP;"
-                    logger.info(f"添加insert_dt列: {alter_sql}")
+                # 如果created_at列不存在,添加它
+                if not created_at_exists:
+                    alter_sql = f"ALTER TABLE ods.{table_name} ADD COLUMN created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP;"
+                    logger.info(f"添加created_at列: {alter_sql}")
                     cur.execute(alter_sql)
                     conn.commit()
             
@@ -491,27 +504,23 @@ def load_excel_to_postgresql(file_path, table_name, metadata_list):
             logger.warning("没有有效记录可插入")
             return 0
         
-        # 获取列名列表,包括所有元数据列和insert_dt
+        # 获取列名列表,只包括元数据列(不再包括insert_dt)
         columns = [meta['en_name'] for meta in metadata_list if 'en_name' in meta]
         if not columns:
             logger.warning("没有有效列名")
             return 0
             
-        # 添加insert_dt列名
-        columns.append('insert_dt')
-        
         # 正确使用execute_values的方式
         insert_sql = f"""
         INSERT INTO ods.{table_name} ({", ".join(columns)})
         VALUES %s
         """
         
-        # 准备要插入的数据元组,包括当前时间戳
-        current_timestamp = datetime.now()
+        # 准备要插入的数据元组
         values = []
         for record in records:
-            # 为每条记录添加当前时间戳
-            row_values = tuple(list(record.get(col, None) for col in columns[:-1]) + [current_timestamp])
+            # 只包含数据列的值,不再需要添加时间戳
+            row_values = tuple(record.get(col, None) for col in columns)
             values.append(row_values)
         
         # 执行批量插入