大数据 ETL 流程生成器 - 根据源表 DDL 生成标准化 ETL 加工 SQL(HiveSQL/MySQL)
根据源表 DDL 自动生成标准化的 ETL 加工 SQL,支持 HiveSQL、MySQL、ODPS。
大数据专家(20 年经验)
bash
python
from etlgenerator import parsetableddl, generatetargettableddl, generateetlsql
ddl =
CREATE TABLE IF NOT EXISTS odsdeliveryattempt_di(
id STRING COMMENT 主键,
pno STRING COMMENT 运单号,
client_id STRING COMMENT 客户 ID,
returned BIGINT COMMENT 是否退货件,
delivery_date STRING COMMENT 派送日期,
marker_id BIGINT COMMENT 标记原因,
store_id STRING COMMENT 网点 ID,
created_at TIMESTAMP COMMENT 创建时间,
updated_at TIMESTAMP COMMENT 更新时间
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES (columnar.nested.type=true, comment=有效尝试派送详情)
LIFECYCLE 36500;
tablename, fields, tablecomment = parsetableddl(ddl)
targetddl = generatetargettableddl(tablename, fields, tablecomment)
etlsql = generateetlsql(tablename, fields, table_comment)
sql
CREATE TABLE IF NOT EXISTS odssapstorecashpayinfodi(
id STRING COMMENT 主键,
store_id STRING COMMENT 网点编号,
business_date STRING COMMENT 业务日期,
sap_state BIGINT COMMENT 0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常,
created_at TIMESTAMP COMMENT 创建时间,
updated_at TIMESTAMP COMMENT 更新时间
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES (columnar.nested.type=true, comment=SAP 门店现金支付信息)
LIFECYCLE 36500;
sql
-- 目标表 DDL
CREATE TABLE IF NOT EXISTS dwdsapstorecashpayinfodi(
id STRING COMMENT 主键,
store_id STRING COMMENT 网点编号,
business_date STRING COMMENT 业务日期,
sap_state BIGINT COMMENT 0:待处理 1:待发送 2:不需要发送 3:已发送 4:异常,
created_at STRING COMMENT 创建时间,
updated_at STRING COMMENT 更新时间
)
PARTITIONED BY (ds STRING)
STORED AS ALIORC
TBLPROPERTIES (columnar.nested.type=true, comment=SAP 门店现金支付信息)
LIFECYCLE 36500;
-- ETL 加工 SQL
WITH ods_data AS (
SELECT
id,
store_id,
business_date,
sap_state,
DATEFORMAT(FROMUTCTIMESTAMP(createdat, ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS created_at,
DATEFORMAT(FROMUTCTIMESTAMP(updatedat, ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS updated_at,
DATEFORMAT(FROMUTCTIMESTAMP(createdat, ${timezone}), yyyy-MM-dd) AS ds
FROM odssapstorecashpayinfodi
WHERE ds >= ${y-m-d}
UNION ALL
SELECT
getjsonobject(values, $.id) as id,
getjsonobject(values, $.storeid) as storeid,
getjsonobject(values, $.businessdate) as businessdate,
getjsonobject(values, $.sapstate) as sapstate,
DATEFORMAT(FROMUTCTIMESTAMP(getjsonobject(values, $.createdat), ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS created_at,
DATEFORMAT(FROMUTCTIMESTAMP(getjsonobject(values, $.updatedat), ${timezone}), yyyy-MM-dd HH:mm:ss.SSS) AS updated_at,
DATEFORMAT(FROMUTCTIMESTAMP(getjsonobject(values, $.createdat), ${timezone}), yyyy-MM-dd) AS ds
FROM odsdatabase_di
WHERE (
(afterimage = Y AND operation_ IN (INSERT, UPDATE))
OR (operation = DELETE AND beforeimage_ = Y)
OR id IS NULL
)
AND ds >= ${y-m-d}
AND tablename = sapstorecashpay_info
AND dbname = sourcedb
)
INSERT OVERWRITE TABLE dwdsapstorecashpayinfodi PARTITION(ds)
SELECT (rn)?+.+ FROM (
SELECT
*,
ROWNUMBER() OVER(PARTITION BY id ORDER BY updatedat DESC) as rn
FROM (
SELECT * FROM dwdsapstorecashpayinfodi WHERE ds IN (
SELECT DISTINCT ds FROM ods_data
)
UNION ALL
SELECT * FROM ods_data
) a
) t1
WHERE rn = 1;
自动生成以下检查 SQL:
自动生成字段映射文档:
sql
-- ============================================
-- 字段映射说明
-- ============================================
-- 源表字段 (7 个): id, storeid, businessdate, sapstate, createdat, updated_at
-- 目标表字段 (7 个): id, storeid, businessdate, sapstate, createdat, updated_at
-- 分区字段:ds
--
-- 字段转换规则:
-- created_at: TIMESTAMP → STRING, 时区转换
-- updated_at: TIMESTAMP → STRING, 时区转换
-- business_date: 直接映射
-- ============================================
| 参数 | 说明 | 默认值 |
|---|---|---|
| ${timezone} | 时区 | UTC |
| ${y-m-d} |
skills/etl-generator/
├── SKILL.md # 技能说明
├── etl_generator.py # 核心脚本
├── README.md
该技能支持在以下平台通过对话安装:
帮我安装 SkillHub 和 etl-generator-1776009203 技能
设置 SkillHub 为我的优先技能安装源,然后帮我安装 etl-generator-1776009203 技能
skillhub install etl-generator-1776009203
文件大小: 9.73 KB | 发布时间: 2026-4-13 10:11