返回顶部
e

etl-generatorETL生成器

大数据 ETL 流程生成器 - 根据源表 DDL 生成标准化 ETL 加工 SQL(HiveSQL/MySQL)

作者: admin | 来源: ClawHub
源自
ClawHub
版本
V 1.0.0
安全检测
已通过
131
下载量
免费
免费
0
收藏
概述
安装方式
版本历史

etl-generator

ETL 流程生成器 - 大数据专家版

根据源表 DDL 自动生成标准化的 ETL 加工 SQL,支持 HiveSQL、MySQL、ODPS。

🎯 角色定位

大数据专家(20 年经验)

  • - 精通 HiveSQL、MySQL、Shell、Python
  • 严格遵守大数据 ETL 加工规范
  • 注意字段类型转换、时区处理、数据质量



🔧 核心功能

1️⃣ 表名规范

  • - 源表: ods[表名]di
  • 目标表: dwd[表名]di

2️⃣ 字段类型转换

  • - at 或 time 结尾的 TIMESTAMP 字段 → STRING(时区转换)
  • _date 结尾的字段 → STRING(不转换)
  • 其他字段 → 保持原类型

3️⃣ 时区转换

sql DATEFORMAT(FROMUTCTIMESTAMP(createdat, ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS created_at DATEFORMAT(FROMUTCTIMESTAMP(updatedat, ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS updated_at

4️⃣ 分区字段

sql DATEFORMAT(FROMUTCTIMESTAMP(createdat, ${timezone}), yyyy-MM-dd) AS ds

5️⃣ 增量数据处理

  • - 使用 odsdatabasedi 表
  • 支持 INSERT/UPDATE/DELETE 操作
  • 通过 operationafterimage 识别

6️⃣ 去重逻辑

sql ROWNUMBER() OVER(PARTITION BY id ORDER BY updatedat DESC) as rn WHERE rn = 1

7️⃣ 字段排除 rn

sql SELECT (rn)?+.+ FROM (...)

📋 使用方式

方式 1:命令行

bash

从文件读取 DDL


python3 skills/etl-generator/etlgenerator.py sourcetable.ddl > etl_sql.sql

从标准输入读取

cat sourcetable.ddl | python3 skills/etl-generator/etlgenerator.py > etl_sql.sql

方式 2:直接调用

python
from etlgenerator import parsetableddl, generatetargettableddl, generateetlsql

ddl =
CREATE TABLE IF NOT EXISTS odsdeliveryattempt_di(
id STRING COMMENT 主键,
pno STRING COMMENT 运单号,
client_id STRING COMMENT 客户 ID,
returned BIGINT COMMENT 是否退货件,
delivery_date STRING COMMENT 派送日期,
marker_id BIGINT COMMENT 标记原因,
store_id STRING COMMENT 网点 ID,
created_at TIMESTAMP COMMENT 创建时间,
updated_at TIMESTAMP COMMENT 更新时间
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES (columnar.nested.type=true, comment=有效尝试派送详情)
LIFECYCLE 36500;

tablename, fields, tablecomment = parsetableddl(ddl)
targetddl = generatetargettableddl(tablename, fields, tablecomment)
etlsql = generateetlsql(tablename, fields, table_comment)



📝 输出示例

输入(源表 DDL)

sql
CREATE TABLE IF NOT EXISTS odssapstorecashpayinfodi(
id STRING COMMENT 主键,
store_id STRING COMMENT 网点编号,
business_date STRING COMMENT 业务日期,
sap_state BIGINT COMMENT 0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常,
created_at TIMESTAMP COMMENT 创建时间,
updated_at TIMESTAMP COMMENT 更新时间
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES (columnar.nested.type=true, comment=SAP 门店现金支付信息)
LIFECYCLE 36500;

输出(目标表 DDL + ETL SQL)

sql
-- 目标表 DDL
CREATE TABLE IF NOT EXISTS dwdsapstorecashpayinfodi(
id STRING COMMENT 主键,
store_id STRING COMMENT 网点编号,
business_date STRING COMMENT 业务日期,
sap_state BIGINT COMMENT 0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常,
created_at STRING COMMENT 创建时间,
updated_at STRING COMMENT 更新时间
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES (columnar.nested.type=true, comment=SAP 门店现金支付信息)
LIFECYCLE 36500;

-- ETL 加工 SQL
WITH ods_data AS (
SELECT
id,
store_id,
business_date,
sap_state,
DATEFORMAT(FROMUTCTIMESTAMP(createdat, ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS created_at,
DATEFORMAT(FROMUTCTIMESTAMP(updatedat, ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS updated_at,
DATEFORMAT(FROMUTCTIMESTAMP(createdat, ${timezone}), yyyy-MM-dd) AS ds
FROM odssapstorecashpayinfodi
WHERE ds >= ${y-m-d}
UNION ALL
SELECT
getjsonobject(values, $.id) as id,
getjsonobject(values, $.storeid) as storeid,
getjsonobject(values, $.businessdate) as businessdate,
getjsonobject(values, $.sapstate) as sapstate,
DATEFORMAT(FROMUTCTIMESTAMP(getjsonobject(values, $.createdat), ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS created_at,
DATEFORMAT(FROMUTCTIMESTAMP(getjsonobject(values, $.updatedat), ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS updated_at,
DATEFORMAT(FROMUTCTIMESTAMP(getjsonobject(values, $.createdat), ${timezone}), yyyy-MM-dd) AS ds
FROM odsdatabase_di
WHERE (
(afterimage = Y AND operation_ IN (INSERT, UPDATE))
OR (operation = DELETE AND beforeimage_ = Y)
OR id IS NULL
)
AND ds >= ${y-m-d}
AND tablename = sapstorecashpay_info
AND dbname = sourcedb
)
INSERT OVERWRITE TABLE dwdsapstorecashpayinfodi PARTITION(ds)
SELECT (rn)?+.+ FROM (
SELECT
*,
ROWNUMBER() OVER(PARTITION BY id ORDER BY updatedat DESC) as rn
FROM (
SELECT * FROM dwdsapstorecashpayinfodi WHERE ds IN (
SELECT DISTINCT ds FROM ods_data
)
UNION ALL
SELECT * FROM ods_data
) a
) t1
WHERE rn = 1;



🧪 数据质量检查

自动生成以下检查 SQL:

  1. 1. 主键空值检查
  2. 退货件比例检查(如果有 returned 字段)
  3. 数据量对比(源表 vs 目标表)

📋 字段映射说明

自动生成字段映射文档:

sql
-- ============================================
-- 字段映射说明
-- ============================================
-- 源表字段 (7 个): id, storeid, businessdate, sapstate, createdat, updated_at
-- 目标表字段 (7 个): id, storeid, businessdate, sapstate, createdat, updated_at
-- 分区字段:ds
--
-- 字段转换规则:
-- created_at: TIMESTAMP → STRING, 时区转换
-- updated_at: TIMESTAMP → STRING, 时区转换
-- business_date: 直接映射
-- ============================================



⚙️ 配置参数


参数说明默认值
${timezone}时区UTC
${y-m-d}
业务日期 | ${yyyymmdd-1} |
| ${bizdate} | 业务日期(质量检查) | ${yyyymmdd-1} |


📁 文件结构

skills/etl-generator/
├── SKILL.md # 技能说明
├── etl_generator.py # 核心脚本
├── README.md

标签

skill ai

通过对话安装

该技能支持在以下平台通过对话安装:

OpenClaw WorkBuddy QClaw Kimi Claude

方式一:安装 SkillHub 和技能

帮我安装 SkillHub 和 etl-generator-1776009203 技能

方式二:设置 SkillHub 为优先技能安装源

设置 SkillHub 为我的优先技能安装源,然后帮我安装 etl-generator-1776009203 技能

通过命令行安装

skillhub install etl-generator-1776009203

下载

⬇ 下载 etl-generator v1.0.0(免费)

文件大小: 9.73 KB | 发布时间: 2026-4-13 10:11

v1.0.0 最新 2026-4-13 10:11
- Initial release of etl-generator skill.
- Generates standardized ETL SQL from source table DDL, supporting HiveSQL, MySQL, ODPS.
- Implements field type conversion, time zone handling, partition generation, and de-duplication logic.
- Supports command-line and direct Python usage.
- Generates target table DDL, ETL processing SQL, data quality check SQL, and field mapping documentation.
- Provides usage instructions, configuration parameters, and guidelines for batch and advanced scenarios.

Archiver·手机版·闲社网·闲社论坛·羊毛社区· 多链控股集团有限公司 · 苏ICP备2025199260号-1

Powered by Discuz! X5.0   © 2024-2025 闲社网·线报更新论坛·羊毛分享社区·http://xianshe.com

p2p_official_large
返回顶部