Merge pandas DataFrames from multiple construction sources. Handle different schemas, keys, and data quality issues."
技能名称: df-merger
详细描述:
python
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
from difflib import SequenceMatcher
class MergeStrategy(Enum):
DataFrame合并策略。
INNER = inner # 仅匹配行
LEFT = left # 保留左侧所有行,匹配右侧
RIGHT = right # 匹配左侧,保留右侧所有行
OUTER = outer # 保留两侧所有行
CROSS = cross # 笛卡尔积
@dataclass
class MergeResult:
合并操作的结果。
merged_df: pd.DataFrame
matched_rows: int
left_only: int
right_only: int
merge_quality: float # 0-1评分
class ConstructionDFMerger:
合并来自施工数据源的DataFrame。
# 常见施工列名映射
COLUMN_MAPPINGS = {
elementid: [elementid, elemid, id, guid, globalid],
typename: [typename, type, elementtype, category],
level: [level, floor, storey, building_storey],
material: [material, mat, material_name],
volume: [volume, vol, volumem3, qtyvolume],
area: [area, surfacearea, qtyarea, area_m2],
cost: [cost, price, total_cost, amount],
taskid: [taskid, activity_id, wbs, activity],
startdate: [start, startdate, planned_start, begin],
enddate: [end, enddate, planned_finish, finish]
}
def init(self):
self.column_cache: Dict[str, str] = {}
def findcommonkey(self, df1: pd.DataFrame,
df2: pd.DataFrame) -> Optional[str]:
查找DataFrame之间的公共键列。
# 首先检查精确匹配
common = set(df1.columns) & set(df2.columns)
if common:
# 优先选择类似ID的列
for col in common:
if id in col.lower() or code in col.lower():
return col
return list(common)[0]
# 尝试语义匹配
for col1 in df1.columns:
for col2 in df2.columns:
if self.columnsmatch(col1, col2):
return col1
return None
def columnsmatch(self, col1: str, col2: str) -> bool:
检查列名是否语义相似。
col1lower = col1.lower().replace(, ).replace(-, )
col2lower = col2.lower().replace(, ).replace(-, )
# 标准化后精确匹配
if col1lower == col2lower:
return True
# 检查映射关系
for standard, variants in self.COLUMN_MAPPINGS.items():
if col1lower in variants and col2lower in variants:
return True
# 相似度检查
similarity = SequenceMatcher(None, col1lower, col2lower).ratio()
return similarity > 0.8
def harmonize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
标准化列名。
df = df.copy()
rename_map = {}
for col in df.columns:
collower = col.lower().replace(, ).replace(-, )
for standard, variants in self.COLUMN_MAPPINGS.items():
if col_lower in variants:
rename_map[col] = standard
break
return df.rename(columns=rename_map)
def merge(self, left: pd.DataFrame,
right: pd.DataFrame,
on: Optional[str] = None,
left_on: Optional[str] = None,
right_on: Optional[str] = None,
how: MergeStrategy = MergeStrategy.LEFT,
harmonize: bool = True) -> MergeResult:
合并两个DataFrame。
if harmonize:
left = self.harmonize_columns(left)
right = self.harmonize_columns(right)
# 确定合并键
if on is None and lefton is None and righton is None:
commonkey = self.findcommon_key(left, right)
if common_key is None:
raise ValueError(未找到公共键。请手动指定合并键。)
on = common_key
# 执行合并
merged = pd.merge(
left, right,
on=on,
lefton=lefton,
righton=righton,
how=how.value,
indicator=True,
suffixes=(left, right)
)
# 计算统计信息
matched = len(merged[merged[_merge] == both])
leftonly = len(merged[merged[merge] == left_only])
rightonly = len(merged[merged[merge] == right_only])
# 质量评分
total = len(left) + len(right)
quality = (matched * 2) / total if total > 0 else 0
# 清理
merged = merged.drop(_merge, axis=1)
return MergeResult(
merged_df=merged,
matched_rows=matched,
leftonly=leftonly,
rightonly=rightonly,
merge_quality=round(quality, 2)
)
def merge_multiple(self, dfs: List[pd.DataFrame],
on: Optional[str] = None,
how: MergeStrategy = MergeStrategy.OUTER) -> pd.DataFrame:
顺序合并多个DataFrame。
if not dfs:
return pd.DataFrame()
result = dfs[0].copy()
for i, df in enumerate(dfs[1:], 1):
result_obj = self.merge(result, df, on=on, how=how)
result = resultobj.mergeddf
return result
def fuzzy_merge(self, left: pd.DataFrame,
right: pd.DataFrame,
left_on: str,
right_on: str,
threshold: float = 0.8) -> pd.DataFrame:
使用模糊字符串匹配进行合并。
matches = []
leftvalues = left[lefton].dropna().unique()
rightvalues = right[righton].dropna().unique()
for lval in left_values:
best_match = None
best_score = 0
for rval in right_values:
score = SequenceMatcher(None, str(lval).lower(),
str(rval).lower()).ratio()
if score > best_score and score >= threshold:
best_score = score
best_match = rval
if best_match:
matches.append({
left_key: lval,
rightkey: bestmatch,
matchscore: bestscore
})
match_df = pd.DataFrame(matches)
# 使用匹配映射进行连接
leftwithkey = left.merge(matchdf, lefton=lefton, righton=left_key, how=left)
result = leftwithkey.merge(right, lefton=rightkey, righton=righton, how=left)
return result
class BIMScheduleMerger(ConstructionDFMerger):
专门用于BIM和进度数据的合并器。
def mergebimschedule(self, bim_df: pd.DataFrame,
schedule_df: pd.DataFrame,
bimtypecol: str = Type Name,
schedulewbscol: str = WBS) -> pd.DataFrame:
合并BIM元素与进度活动。
# 这通常需要一个映射表
# 目前使用描述信息的模糊匹配
bimdf = self.harmonizecolumns(bim_df)
scheduledf = self.harmonizecolumns(schedule_df)
# 尝试将类型名称与WBS描述进行匹配
result = self.fuzzy_merge(
bimdf, scheduledf,
lefton=bimtype_col,
righton=schedulewbs_col,
threshold=0.6
)
return result
class CostQTOMerger(ConstructionDFMerger):
合并成本数据与工程量清单。
def mergecostqto(self, cost_df: pd.DataFrame,
qto_df: pd.DataFrame) -> pd.DataFrame:
合并成本费率与QTO工程量。
costdf = self.harmonizecolumns(cost_df)
qtodf = self.harmonizecolumns(qto_df)
#
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 df-merger-1776344845 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 df-merger-1776344845 技能
skillhub install df-merger-1776344845
文件大小: 4.93 KB | 发布时间: 2026-4-17 13:57