Extract, clean, and organize legacy construction data from archives. Migrate historical project data, cost records, and schedules into modern formats."
管理来自档案、旧系统和历史记录中的遗留施工数据。提取、清洗、规范化并将数据迁移至现代格式,用于分析和基准测试。
施工公司积累了几十年的项目数据,格式多样:
该技能有助于从历史数据中提取价值,用于:
python
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
import pandas as pd
import re
import json
@dataclass
class HistoricalRecord:
project_id: str
project_name: str
year: int
data_type: str # cost, schedule, labor, material
original_format: str
extracted_data: Dict[str, Any]
quality_score: float
notes: List[str] = field(default_factory=list)
class HistoricalDataManager:
管理历史施工数据的提取和规范化。
def init(self, archive_path: str):
self.archivepath = Path(archivepath)
self.records: List[HistoricalRecord] = []
self.normalizationrules = self.loadnormalizationrules()
def scan_archive(self) -> Dict[str, int]:
扫描档案并按类型对文件进行分类。
file_types = {}
for filepath in self.archivepath.rglob(*):
if filepath.isfile():
ext = file_path.suffix.lower()
filetypes[ext] = filetypes.get(ext, 0) + 1
return file_types
def extractfromlegacyexcel(self, filepath: str, year: int) -> List[HistoricalRecord]:
从旧版Excel文件中提取数据。
records = []
try:
# 尝试不同引擎处理旧格式
try:
df = pd.readexcel(filepath, engine=openpyxl)
except:
df = pd.readexcel(filepath, engine=xlrd)
# 从内容检测数据类型
datatype = self.detectdatatype(df)
# 规范化列名
df = self.normalizecolumns(df)
# 提取项目信息
projectinfo = self.extractprojectinfo(df, file_path)
record = HistoricalRecord(
projectid=projectinfo.get(id, fLEGACY-{year}-{hash(file_path) % 10000}),
projectname=projectinfo.get(name, Path(file_path).stem),
year=year,
datatype=datatype,
original_format=excel,
extracteddata=df.todict(records),
qualityscore=self.assess_quality(df)
)
records.append(record)
except Exception as e:
print(f提取 {file_path} 时出错: {e})
return records
def extractfromcsv(self, file_path: str, year: int) -> HistoricalRecord:
从CSV文件中提取数据,带编码检测。
# 尝试不同编码
encodings = [utf-8, latin-1, cp1252, iso-8859-1]
for encoding in encodings:
try:
df = pd.readcsv(filepath, encoding=encoding)
break
except:
continue
df = self.normalizecolumns(df)
datatype = self.detectdatatype(df)
return HistoricalRecord(
projectid=fCSV-{year}-{hash(filepath) % 10000},
projectname=Path(filepath).stem,
year=year,
datatype=datatype,
original_format=csv,
extracteddata=df.todict(records),
qualityscore=self.assess_quality(df)
)
def extractfromdatabaseexport(self, filepath: str, db_type: str) -> List[HistoricalRecord]:
从遗留数据库导出中提取数据。
records = []
if db_type == access:
# 读取Access MDB/ACCDB文件
import pyodbc
connstr = fDRIVER={{Microsoft Access Driver (.mdb, .accdb)}};DBQ={filepath}
conn = pyodbc.connect(conn_str)
# 获取所有表
cursor = conn.cursor()
tables = [row.table_name for row in cursor.tables(tableType=TABLE)]
for table in tables:
df = pd.read_sql(fSELECT * FROM [{table}], conn)
# 处理每个表...
conn.close()
return records
def normalizecostdata(self, records: List[HistoricalRecord], base_year: int = 2026) -> pd.DataFrame:
将历史成本数据规范化为当前币值。
# RSMeans历史成本指数(示例值)
cost_indices = {
2015: 0.82, 2016: 0.84, 2017: 0.87, 2018: 0.90,
2019: 0.93, 2020: 0.95, 2021: 0.98, 2022: 1.02,
2023: 1.06, 2024: 1.10, 2025: 1.14, 2026: 1.18
}
normalized_data = []
for record in records:
if record.data_type == cost:
yearindex = costindices.get(record.year, 1.0)
baseindex = costindices.get(base_year, 1.18)
escalationfactor = baseindex / year_index
for item in record.extracted_data:
if amount in item or cost in item:
original_cost = item.get(amount) or item.get(cost, 0)
normalized_item = item.copy()
normalizeditem[originalcost] = original_cost
normalizeditem[normalizedcost] = originalcost * escalationfactor
normalizeditem[escalationfactor] = escalation_factor
normalizeditem[originalyear] = record.year
normalizeditem[projectid] = record.project_id
normalizeddata.append(normalizeditem)
return pd.DataFrame(normalized_data)
def detectdata_type(self, df: pd.DataFrame) -> str:
从列名和内容检测数据类型。
columns_lower = [c.lower() for c in df.columns]
if any(c in columns_lower for c in [cost, amount, price, total, budget]):
return cost
elif any(c in columns_lower for c in [start, finish, duration, task, activity]):
return schedule
elif any(c in columns_lower for c in [hours, labor, worker, crew]):
return labor
elif any(c in columns_lower for c in [material, quantity, unit, supplier]):
return material
else:
return unknown
def normalizecolumns(self, df: pd.DataFrame) -> pd.DataFrame:
将列名规范化为标准格式。
column_mapping = {
rproj.*id: project_id,
rproj.*name: project_name,
rdesc.*: description,
rqty|quantity: quantity,
runit.cost|unit.price: unit_cost,
rtotal|amount: amount,
rstart.*date: start_date,
rend.date|finish.date: end_date,
rdur.*: duration,
}
new_columns = {}
for col in df.columns:
col_lower = col.lower().strip()
for pattern, newname in columnmapping.items():
if re.match(pattern, col_lower):
newcolumns[col] = newname
break
return df.rename(columns=new_columns)
def assessquality(self, df: pd.DataFrame) -> float:
评估数据质量得分(0-1)。
if df.empty:
return 0.0
scores = []
# 完整性:非空值百分比
completeness = 1 - (df.isnull().sum().sum() / df.size)
scores.append(completeness)
# 列质量:有意义的列名
meaningful_cols = sum(1 for c in df.columns if len(c) > 2 and not c.startswith(Unnamed))
colquality = meaningfulcols / len(df.columns)
scores.append(col_quality)
# 行数:数据越多越好(上限1.0)
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 historical-data-manager-1776345131 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 historical-data-manager-1776345131 技能
skillhub install historical-data-manager-1776345131
文件大小: 6.58 KB | 发布时间: 2026-4-17 14:19