Extract structured data from unstructured sources. Parse JSON, CSV, logs, and mixed formats into clean, usable data. Handle malformed data, nested structures, and large files efficiently. Use when extracting data from messy inputs, parsing logs, or cleaning datasets. Triggers on "extract data", "parse json", "parse csv", "clean data", "log parser".
从非结构化或混乱的源中提取结构化、干净的数据。将混乱转化为可用的数据。
输入: 响应是 {status: ok, data: [1, 2, 3]} 然后...
输出: {status: ok, data: [1, 2, 3]}
python
import re
import json
def extract_json(text):
# 查找类似JSON的结构
pattern = r\{[^{}](?:\{[^{}]\}[^{}])\}
matches = re.findall(pattern, text)
for match in matches:
try:
return json.loads(match)
except json.JSONDecodeError:
continue
return None
python
import csv
from io import StringIO
def parsemessycsv(text):
lines = text.strip().split(\n)
# 检测分隔符
delimiters = [,, ;, \t, |]
delimiter = ,
for d in delimiters:
if lines[0].count(d) > lines[0].count(delimiter):
delimiter = d
# 带错误处理的解析
reader = csv.reader(StringIO(text), delimiter=delimiter)
rows = []
for row in reader:
# 清理每个字段
cleaned = [field.strip().strip().strip() for field in row]
rows.append(cleaned)
return rows
输入: name: John, age: 30, city: New York
输出: {name: John, age: 30, city: New York}
python
import re
def extractkeyvalue(text):
patterns = [
r(\w+)\s:\s([^,\n]+), # key: value
r(\w+)\s=\s([^,\n]+), # key=value
r?(\w+)?\s[:=]\s?([^,\n]+)?, # 带引号的变体
]
result = {}
for pattern in patterns:
matches = re.findall(pattern, text)
for key, value in matches:
result[key.strip()] = value.strip()
return result
python
import re
from datetime import datetime
def parselogline(line):
# 尝试常见模式
# Apache/Nginx 访问日志
pattern = r(\S+) \S+ \S+ \[([^\]]+)\] (\S+) ([^]+) HTTP/\d\.\d (\d+) (\d+)
match = re.match(pattern, line)
if match:
return {
ip: match.group(1),
timestamp: match.group(2),
method: match.group(3),
path: match.group(4),
status: int(match.group(5)),
size: int(match.group(6))
}
# JSON 日志
if line.startswith({):
try:
return json.loads(line)
except:
pass
# 键值对日志
if = in line:
return extractkeyvalue(line)
return {raw: line}
python
def fix_json(text):
# 常见修复
# 单引号转双引号
text = re.sub(r([^]*), r\1, text)
# 未加引号的键
text = re.sub(r(\w+):, r\1:, text)
# 尾随逗号
text = re.sub(r,\s*([}\]]), r\1, text)
# 值周围缺少引号
text = re.sub(r:\s([a-zA-Z_]\w)(?=[,}\]]), r: \1, text)
return text
python
def streamjsonl(filepath):
流式处理JSON Lines (JSONL)格式
with open(file_path, r) as f:
for line in f:
try:
yield json.loads(line)
except json.JSONDecodeError:
continue
def streamcsv(filepath, chunk_size=1000):
分块流式处理CSV
with open(file_path, r) as f:
reader = csv.reader(f)
headers = next(reader)
chunk = []
for row in reader:
chunk.append(dict(zip(headers, row)))
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
python
def detectandparse(content):
自动检测格式并解析
content = content.strip()
# JSON
if content.startswith({) or content.startswith([):
try:
return json.loads(content)
except:
pass
# JSONL
if \n{ in content:
try:
return [json.loads(line) for line in content.split(\n) if line.strip()]
except:
pass
# CSV
if , in content and \n in content:
lines = content.split(\n)
if len(lines) > 1:
return parsemessycsv(content)
# 键值对
if = in content or : in content:
return extractkeyvalue(content)
# 行
return content.split(\n)
python
def deduplicate(data, key=None):
if isinstance(data, list):
if key:
seen = set()
result = []
for item in data:
val = item.get(key) if isinstance(item, dict) else item
if val not in seen:
seen.add(val)
result.append(item)
return result
return list(set(data))
return data
python
def normalize(data):
if isinstance(data, dict):
return {k: normalize(v) for k, v in data.items()}
elif isinstance(data, list):
return [normalize(item) for item in data]
elif isinstance(data, str):
# 小写、修剪、标准化空白
data = data.lower().strip()
data = re.sub(r\s+, , data)
# 转换常见值
if data in (true, yes, on):
return True
if data in (false, no, off):
return False
if data in (null, none, n/a, ):
return None
# 尝试数值
try:
return int(data)
except:
try:
return float(data)
except:
pass
return data
return data
python
def validate(data, schema):
errors = []
# 必填字段
for field in schema.get(required, []):
if field not in data:
errors.append(f缺少必填字段: {field})
# 类型检查
for field, expected_type in schema.get(types, {}).items():
if field in data and not isinstance(data[field], expected_type):
errors.append(f字段 {field} 应为 {expected_type.name} 类型)
# 值范围
for field, (minval, maxval) in schema.get(ranges, {}).items():
if field in data:
if not (minval <= data[field] <= maxval):
errors.append(f字段 {field} 超出范围: {data[field]})
return len(errors) == 0, errors
python
import json
def to_json(data, pretty=True):
if pretty:
return json.dumps(data, indent=2, ensure_ascii=False)
return json.dumps(data, ensure_ascii=False)
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 data-extractor-1776373502 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 data-extractor-1776373502 技能
skillhub install data-extractor-1776373502
文件大小: 4.38 KB | 发布时间: 2026-4-17 14:35