Detect anomalies and outliers in construction data: unusual costs, schedule variances, productivity spikes. Statistical and ML-based detection methods."
技能名称: data-anomaly-detector
详细描述:
检测建筑数据中的异常模式、离群点和异常值。在成本超支、进度延误、生产力问题和数据质量问题影响项目之前,提前识别它们。
建筑数据中常包含表明以下问题的异常值:
早期检测可避免代价高昂的修正和项目延误。
python
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple
from enum import Enum
import pandas as pd
import numpy as np
from datetime import datetime
from scipy import stats
class AnomalyType(Enum):
OUTLIER = outlier
PATTERNBREAK = patternbreak
MISSINGSEQUENCE = missingsequence
DUPLICATE = duplicate
IMPOSSIBLEVALUE = impossiblevalue
TRENDDEVIATION = trenddeviation
class AnomalySeverity(Enum):
CRITICAL = critical
HIGH = high
MEDIUM = medium
LOW = low
@dataclass
class Anomaly:
id: str
anomaly_type: AnomalyType
severity: AnomalySeverity
field: str
value: Any
expected_range: Optional[Tuple[float, float]] = None
description: str =
row_index: Optional[int] = None
detection_method: str =
confidence: float = 0.0
suggested_action: str =
@dataclass
class AnomalyReport:
source: str
detected_at: datetime
total_records: int
anomalies: List[Anomaly]
summary: Dict[str, int]
class ConstructionAnomalyDetector:
检测建筑数据中的异常值。
# 建筑行业特定阈值
COST_THRESHOLDS = {
concretepercy: (200, 800),
steelperton: (1500, 4000),
laborperhour: (25, 150),
overhead_percentage: (5, 25),
contingency_percentage: (3, 20),
}
SCHEDULE_THRESHOLDS = {
maxactivityduration: 365, # 天
max_lag: 30, # 天
min_productivity: 0.1,
max_productivity: 10.0,
}
def init(self):
self.anomalies: List[Anomaly] = []
self.detection_history: List[AnomalyReport] = []
def detectcostanomalies(self, df: pd.DataFrame, cost_column: str,
group_by: str = None) -> List[Anomaly]:
检测成本数据中的异常值。
anomalies = []
# 统计离群点检测(IQR方法)
Q1 = df[cost_column].quantile(0.25)
Q3 = df[cost_column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[(df[costcolumn] < lowerbound) | (df[costcolumn] > upperbound)]
for idx, row in outliers.iterrows():
value = row[cost_column]
severity = AnomalySeverity.HIGH if abs(value - df[cost_column].median()) > 3 * IQR else AnomalySeverity.MEDIUM
anomalies.append(Anomaly(
id=fCOST-{idx},
anomaly_type=AnomalyType.OUTLIER,
severity=severity,
field=cost_column,
value=value,
expectedrange=(lowerbound, upper_bound),
description=f成本值 {value:,.2f} 超出预期范围,
row_index=idx,
detection_method=IQR,
confidence=0.95,
suggested_action=检查成本估算是否存在错误
))
# 负成本检查
negatives = df[df[cost_column] < 0]
for idx, row in negatives.iterrows():
anomalies.append(Anomaly(
id=fCOST-NEG-{idx},
anomalytype=AnomalyType.IMPOSSIBLEVALUE,
severity=AnomalySeverity.CRITICAL,
field=cost_column,
value=row[cost_column],
expected_range=(0, None),
description=检测到负成本值,
row_index=idx,
detection_method=业务规则,
confidence=1.0,
suggested_action=纠正数据录入错误或调查贷项
))
# 基于分组的异常值(如果分组)
if groupby and groupby in df.columns:
groupstats = df.groupby(groupby)[cost_column].agg([mean, std])
for groupname, stats in groupstats.iterrows():
groupdata = df[df[groupby] == group_name]
zscores = np.abs((groupdata[cost_column] - stats[mean]) / stats[std])
for idx, z in z_scores.items():
if z > 3:
anomalies.append(Anomaly(
id=fCOST-GROUP-{idx},
anomaly_type=AnomalyType.OUTLIER,
severity=AnomalySeverity.MEDIUM,
field=cost_column,
value=df.loc[idx, cost_column],
description=f分组 {group_name} 的成本异常 (z-score: {z:.2f}),
row_index=idx,
detection_method=按分组Z分数,
confidence=min(z / 5, 1.0)
))
return anomalies
def detectscheduleanomalies(self, df: pd.DataFrame) -> List[Anomaly]:
检测进度数据中的异常值。
anomalies = []
# 检查必需的列
required = [startdate, enddate]
if not all(col in df.columns for col in required):
return anomalies
# 转换日期
df[startdate] = pd.todatetime(df[start_date])
df[enddate] = pd.todatetime(df[end_date])
# 计算工期
df[duration] = (df[enddate] - df[startdate]).dt.days
# 负工期(结束日期早于开始日期)
negative_duration = df[df[duration] < 0]
for idx, row in negative_duration.iterrows():
anomalies.append(Anomaly(
id=fSCHED-NEG-{idx},
anomalytype=AnomalyType.IMPOSSIBLEVALUE,
severity=AnomalySeverity.CRITICAL,
field=duration,
value=row[duration],
description=结束日期早于开始日期,
row_index=idx,
detection_method=业务规则,
confidence=1.0,
suggested_action=纠正日期
))
# 极长工期
longtasks = df[df[duration] > self.SCHEDULETHRESHOLDS[maxactivityduration]]
for idx, row in long_tasks.iterrows():
anomalies.append(Anomaly(
id=fSCHED-LONG-{idx},
anomaly_type=AnomalyType.OUTLIER,
severity=AnomalySeverity.MEDIUM,
field=duration,
value=row[duration],
expectedrange=(0, self.SCHEDULETHRESHOLDS[maxactivityduration]),
description=f任务工期 {row[duration]} 天超过阈值,
row_index=idx,
detection_method=阈值,
confidence=0.9,
suggested_action=检查是否应分解任务
))
# 零工期的非里程碑任务
if is_milestone in df.columns:
zeroduration = df[(df[duration] == 0) & (~df[ismilestone])]
for idx, row in zero_duration.iterrows():
anomalies.append(Anomaly(
id=fSCHED-ZERO-{idx},
anomalytype=AnomalyType.IMPOSSIBLEVALUE,
severity=AnomalySeverity.HIGH,
field=duration,
value=0,
description=零工期任务,但并非里程碑,
row_index=idx,
detection_method=业务规则,
confidence=1.0,
suggested_action=添加工期或标记为里程碑
))
return anomalies
def detectproductivityanomalies(self, df: pd.DataFrame,
quantity_col: str,
hours_col
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 data-anomaly-detector-1776344598 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 data-anomaly-detector-1776344598 技能
skillhub install data-anomaly-detector-1776344598
文件大小: 6.82 KB | 发布时间: 2026-4-17 14:30