返回顶部
d

data-lineage-tracker" 数据血缘追踪

Track data origin, transformations, and flow through construction systems. Essential for audit trails, compliance, and debugging data issues."

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 2.1.0
安全检测
已通过
3,210
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

data-lineage-tracker"

建筑行业数据血缘追踪器

概述

追踪建筑数据在系统中的来源、转换和流向。为合规性提供审计追踪,帮助调试数据问题,并确保数据治理。

商业案例

建筑项目需要数据问责制:

  • - 审计合规:了解每个数字的来源
  • 问题解决:将数据问题追溯到源头
  • 变更影响:了解哪些下游系统受到影响
  • 监管要求:为法律/保险维护数据来源

技术实现

python
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Set
from datetime import datetime
from enum import Enum
import json
import hashlib
import uuid

class TransformationType(Enum):
EXTRACT = extract
TRANSFORM = transform
LOAD = load
AGGREGATE = aggregate
JOIN = join
FILTER = filter
CALCULATE = calculate
MANUALEDIT = manualedit
IMPORT = import
EXPORT = export

@dataclass
class DataSource:
id: str
name: str
system: str
location: str
owner: str
created_at: datetime

@dataclass
class TransformationStep:
id: str
transformation_type: TransformationType
description: str
input_entities: List[str]
output_entities: List[str]
logic: str # SQL, Python, or description
performed_by: str # user or system
performed_at: datetime
parameters: Dict[str, Any] = field(default_factory=dict)

@dataclass
class DataEntity:
id: str
name: str
source_id: str
entity_type: str # table, file, field, record
created_at: datetime
version: int = 1
checksum: Optional[str] = None
parententities: List[str] = field(defaultfactory=list)
metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class LineageRecord:
id: str
entity_id: str
transformation_id: str
upstream_entities: List[str]
downstream_entities: List[str]
recorded_at: datetime

class ConstructionDataLineageTracker:
追踪建筑数据流的数据血缘关系。

def init(self, project_id: str):
self.projectid = projectid
self.sources: Dict[str, DataSource] = {}
self.entities: Dict[str, DataEntity] = {}
self.transformations: Dict[str, TransformationStep] = {}
self.lineage_records: List[LineageRecord] = []

def register_source(self, name: str, system: str, location: str, owner: str) -> DataSource:
注册新的数据源。
source = DataSource(
id=fSRC-{uuid.uuid4().hex[:8]},
name=name,
system=system,
location=location,
owner=owner,
created_at=datetime.now()
)
self.sources[source.id] = source
return source

def registerentity(self, name: str, sourceid: str, entity_type: str,
parent_entities: List[str] = None,
metadata: Dict = None) -> DataEntity:
注册数据实体(表、文件、字段)。
entity = DataEntity(
id=fENT-{uuid.uuid4().hex[:8]},
name=name,
sourceid=sourceid,
entitytype=entitytype,
created_at=datetime.now(),
parententities=parententities or [],
metadata=metadata or {}
)
self.entities[entity.id] = entity
return entity

def calculate_checksum(self, data: Any) -> str:
计算数据验证的校验和。
if isinstance(data, str):
content = data
else:
content = json.dumps(data, sort_keys=True, default=str)
return hashlib.sha256(content.encode()).hexdigest()[:16]

def record_transformation(self,
transformation_type: TransformationType,
description: str,
input_entities: List[str],
output_entities: List[str],
logic: str,
performed_by: str,
parameters: Dict = None) -> TransformationStep:
记录数据转换。
transformation = TransformationStep(
id=fTRF-{uuid.uuid4().hex[:8]},
transformationtype=transformationtype,
description=description,
inputentities=inputentities,
outputentities=outputentities,
logic=logic,
performedby=performedby,
performed_at=datetime.now(),
parameters=parameters or {}
)
self.transformations[transformation.id] = transformation

# 创建血缘记录
for outputid in outputentities:
record = LineageRecord(
id=fLIN-{uuid.uuid4().hex[:8]},
entityid=outputid,
transformation_id=transformation.id,
upstreamentities=inputentities,
downstream_entities=[],
recorded_at=datetime.now()
)
self.lineage_records.append(record)

# 更新输入实体的下游引用
for inputid in inputentities:
for existingrecord in self.lineagerecords:
if existingrecord.entityid == input_id:
existingrecord.downstreamentities.append(output_id)

return transformation

def traceupstream(self, entityid: str, depth: int = None) -> List[Dict]:
追踪实体的所有上游来源。
visited = set()
lineage = []

def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return

visited.add(eid)

entity = self.entities.get(eid)
if not entity:
return

# 查找生成此实体的转换
for record in self.lineage_records:
if record.entity_id == eid:
transformation = self.transformations.get(record.transformation_id)
if transformation:
lineage.append({
entity: entity.name,
entity_id: eid,
depth: current_depth,
transformation: transformation.description,
transformationtype: transformation.transformationtype.value,
performedat: transformation.performedat.isoformat(),
performedby: transformation.performedby,
upstream: record.upstream_entities
})

for upstreamid in record.upstreamentities:
trace(upstreamid, currentdepth + 1)

trace(entity_id, 0)
return sorted(lineage, key=lambda x: x[depth])

def tracedownstream(self, entityid: str, depth: int = None) -> List[Dict]:
追踪实体的所有下游依赖。
visited = set()
dependencies = []

def trace(eid: str, current_depth: int):
if eid in visited:
return
if depth is not None and current_depth > depth:
return

visited.add(eid)

entity = self.entities.get(eid)
if not entity:
return

# 查找使用此实体的实体
for record in self.lineage_records:
if eid in record.upstream_entities:
transformation = self.transformations.get(record.transformation_id)
if transformation:
dependencies.append({
entity: self.entities[record.entityid].name if record.entityid in self.entities else record.entity_id,
entityid: record.entityid,
depth: current_depth,
transformation: transformation.description,
transformationtype: transformation.transformationtype.value
})

trace(record.entityid, currentdepth + 1)

trace(entity_id, 0)
return sorted(dependencies, key=lambda x: x[depth])

def getentityhistory(self, entity_id: str) -> List[Dict]:
获取实体的完整变更历史。
history = []

for record in self.lineage_records:
if record.entityid == entityid:
transformation = self.transformations.get(record.transformation_id)
if transformation:
history.append({
timestamp: transformation.performed_at.isoformat(),
action: transformation.transformation_type.value,
description: transformation.description,
performedby: transformation.performedby,
inputs: [
self.entities[eid].name if eid in self.entities else eid
for eid in record.upstream_entities
]
})

return sorted(history, key=lambda x: x[timestamp])

def impactanalysis(self, entityid: str) -> Dict:
分析实体变更的影响。
downstream = self.tracedownstream(entityid)

impact = {
entity: self.entities[entityid].name if entityid in self.entities else entity_id,
total_affected: len(downstream),
affectedbydepth:

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 data-lineage-tracker-1776344588 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 data-lineage-tracker-1776344588 技能

通过命令行安装

skillhub install data-lineage-tracker-1776344588

下载

⬇ 下载 data-lineage-tracker" v2.1.0(免费)

文件大小: 6.2 KB | 发布时间: 2026-4-17 14:58

v2.1.0 最新 2026-4-17 14:58
Version 2.1.0 of data-lineage-tracker

- Introduces detailed documentation in SKILL.md, including business case, technical overview, and code snippets.
- Extensively describes the skill’s purpose, focusing on tracking data origin, transformations, and flow for improved compliance and debugging in construction.
- Outlines main use cases: audit trails, issue resolution, change impact analysis, and meeting regulatory requirements.
- Provides example Python classes and methods for implementing data lineage tracking, covering data sources, transformations, and lineage record-keeping.

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部