Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues."
追踪建筑数据在系统中的来源、转换和流向。为合规性提供审计追踪,帮助调试数据问题,并确保数据治理。
建筑项目需要数据问责制:
python
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
import json
import hashlib
import uuid
class TransformationType(Enum):
EXTRACT = extract
TRANSFORM = transform
LOAD = load
AGGREGATE = aggregate
JOIN = join
FILTER = filter
CALCULATE = calculate
MANUALEDIT = manualedit
IMPORT = import
EXPORT = export
@dataclass
class DataSource:
id: str
name: str
system: str
location: str
owner: str
created_at: datetime
@dataclass
class TransformationStep:
id: str
transformation_type: TransformationType
description: str
input_entities: List[str]
output_entities: List[str]
logic: str # SQL, Python, or description
performed_by: str # user or system
performed_at: datetime
parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DataEntity:
id: str
name: str
source_id: str
entity_type: str # table, file, field, record
created_at: datetime
version: int = 1
checksum: Optional[str] = None
parententities: List[str] = field(defaultfactory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class LineageRecord:
id: str
entity_id: str
transformation_id: str
upstream_entities: List[str]
downstream_entities: List[str]
recorded_at: datetime
class ConstructionDataLineageTracker:
追踪建筑数据流的数据血缘关系。
def init(self, project_id: str):
self.projectid = projectid
self.sources: Dict[str, DataSource] = {}
self.entities: Dict[str, DataEntity] = {}
self.transformations: Dict[str, TransformationStep] = {}
self.lineage_records: List[LineageRecord] = []
def register_source(self, name: str, system: str, location: str, owner: str) -> DataSource:
注册新的数据源。
source = DataSource(
id=fSRC-{uuid.uuid4().hex[:8]},
name=name,
system=system,
location=location,
owner=owner,
created_at=datetime.now()
)
self.sources[source.id] = source
return source
def registerentity(self, name: str, sourceid: str, entity_type: str,
parent_entities: List[str] = None,
metadata: Dict = None) -> DataEntity:
注册数据实体(表、文件、字段)。
entity = DataEntity(
id=fENT-{uuid.uuid4().hex[:8]},
name=name,
sourceid=sourceid,
entitytype=entitytype,
created_at=datetime.now(),
parententities=parententities or [],
metadata=metadata or {}
)
self.entities[entity.id] = entity
return entity
def calculate_checksum(self, data: Any) -> str:
计算数据验证的校验和。
if isinstance(data, str):
content = data
else:
content = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def record_transformation(self,
transformation_type: TransformationType,
description: str,
input_entities: List[str],
output_entities: List[str],
logic: str,
performed_by: str,
parameters: Dict = None) -> TransformationStep:
记录数据转换。
transformation = TransformationStep(
id=fTRF-{uuid.uuid4().hex[:8]},
transformationtype=transformationtype,
description=description,
inputentities=inputentities,
outputentities=outputentities,
logic=logic,
performedby=performedby,
performed_at=datetime.now(),
parameters=parameters or {}
)
self.transformations[transformation.id] = transformation
# 创建血缘记录
for outputid in outputentities:
record = LineageRecord(
id=fLIN-{uuid.uuid4().hex[:8]},
entityid=outputid,
transformation_id=transformation.id,
upstreamentities=inputentities,
downstream_entities=[],
recorded_at=datetime.now()
)
self.lineage_records.append(record)
# 更新输入实体的下游引用
for inputid in inputentities:
for existingrecord in self.lineagerecords:
if existingrecord.entityid == input_id:
existingrecord.downstreamentities.append(output_id)
return transformation
def traceupstream(self, entityid: str, depth: int = None) -> List[Dict]:
追踪实体的所有上游来源。
visited = set()
lineage = []
def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return
visited.add(eid)
entity = self.entities.get(eid)
if not entity:
return
# 查找生成此实体的转换
for record in self.lineage_records:
if record.entity_id == eid:
transformation = self.transformations.get(record.transformation_id)
if transformation:
lineage.append({
entity: entity.name,
entity_id: eid,
depth: current_depth,
transformation: transformation.description,
transformationtype: transformation.transformationtype.value,
performedat: transformation.performedat.isoformat(),
performedby: transformation.performedby,
upstream: record.upstream_entities
})
for upstreamid in record.upstreamentities:
trace(upstreamid, currentdepth + 1)
trace(entity_id, 0)
return sorted(lineage, key=lambda x: x[depth])
def tracedownstream(self, entityid: str, depth: int = None) -> List[Dict]:
追踪实体的所有下游依赖。
visited = set()
dependencies = []
def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return
visited.add(eid)
entity = self.entities.get(eid)
if not entity:
return
# 查找使用此实体的实体
for record in self.lineage_records:
if eid in record.upstream_entities:
transformation = self.transformations.get(record.transformation_id)
if transformation:
dependencies.append({
entity: self.entities[record.entityid].name if record.entityid in self.entities else record.entity_id,
entityid: record.entityid,
depth: current_depth,
transformation: transformation.description,
transformationtype: transformation.transformationtype.value
})
trace(record.entityid, currentdepth + 1)
trace(entity_id, 0)
return sorted(dependencies, key=lambda x: x[depth])
def getentityhistory(self, entity_id: str) -> List[Dict]:
获取实体的完整变更历史。
history = []
for record in self.lineage_records:
if record.entityid == entityid:
transformation = self.transformations.get(record.transformation_id)
if transformation:
history.append({
timestamp: transformation.performed_at.isoformat(),
action: transformation.transformation_type.value,
description: transformation.description,
performedby: transformation.performedby,
inputs: [
self.entities[eid].name if eid in self.entities else eid
for eid in record.upstream_entities
]
})
return sorted(history, key=lambda x: x[timestamp])
def impactanalysis(self, entityid: str) -> Dict:
分析实体变更的影响。
downstream = self.tracedownstream(entityid)
impact = {
entity: self.entities[entityid].name if entityid in self.entities else entity_id,
total_affected: len(downstream),
affectedbydepth:
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 data-lineage-tracker-1776344588 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 data-lineage-tracker-1776344588 技能
skillhub install data-lineage-tracker-1776344588
文件大小: 6.2 KB | 发布时间: 2026-4-17 14:58