Compare commits
No commits in common. "338a06e583a1d7e71cdc4159f555f3276e987770" and "15b3b94d12e8934c274a50e042301c468e370eff" have entirely different histories.
338a06e583
...
15b3b94d12
@ -7,24 +7,24 @@ This manual guides data engineers & data analysts (DA/DE) through using Airflow,
|
|||||||
## Table of Contents
|
## Table of Contents
|
||||||
|
|
||||||
- [Components](#components)
|
- [Components](#components)
|
||||||
- [Airflow](#airflow)
|
- [Airflow](#airflow)
|
||||||
- [Superset](#superset)
|
- [Superset](#superset)
|
||||||
- [Trino](#trino)
|
- [Trino](#trino)
|
||||||
- [Object Storage](#object-storage)
|
- [Object Storage](#object-storage)
|
||||||
- [Workflow](#workflow)
|
- [Workflow](#workflow)
|
||||||
- [Data Pipeline](#data-pipeline)
|
- [Data Pipeline](#data-pipeline)
|
||||||
- [1. Data Ingestion](#1-data-ingestion)
|
- [1. Data Ingestion](#1-data-ingestion)
|
||||||
- [2. Raw Data Storage](#2-raw-data-storage)
|
- [2. Raw Data Storage](#2-raw-data-storage)
|
||||||
- [3. Data Transformation / ETL](#3-data-transformation--etl)
|
- [3. Data Transformation / ETL](#3-data-transformation--etl)
|
||||||
- [4. Processed Data Storage](#4-processed-data-storage)
|
- [4. Processed Data Storage](#4-processed-data-storage)
|
||||||
- [5. Data Visualization](#5-data-visualization)
|
- [5. Data Visualization](#5-data-visualization)
|
||||||
- [Example](#example)
|
- [Example](#example)
|
||||||
- [sample_dag.py](#sample_dagpy)
|
- [sample_dat.py](#sample_dagpy)
|
||||||
- [Dockerfile](#dockerfile)
|
- [Dockerfile](#dockerfile)
|
||||||
- [Build & publish container image](#build--publish-container-image)
|
- [Build & publish container image](#build--publish-container-image)
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
|
||||||
## Components
|
## Components
|
||||||
|
|
||||||
### Airflow
|
### Airflow
|
||||||
@ -46,33 +46,41 @@ An **S3-compatible storage provider** (e.g., MinIO) used to store and retrieve u
|
|||||||
## Workflow
|
## Workflow
|
||||||
|
|
||||||
```mermaid
|
```mermaid
|
||||||
flowchart TB
|
flowchart TD
|
||||||
|
|
||||||
subgraph src ["Data source"]
|
%% STAGE 1: DATA SOURCES
|
||||||
direction LR
|
A["Data Sources
|
||||||
ext_api[/"API<br>(HTTP, REST, Graph)"/]
|
(S3 / MinIO, DBs, APIs)"] -->|Ingestion Jobs| B[Apache Airflow]
|
||||||
ext_s3@{ shape: cyl, label: "Object Storage<br>(S3, MinIO, GCS)" }
|
|
||||||
ext_db@{ shape: cyl, label: "Database<br>(MySQL, PostgreSQL)" }
|
|
||||||
ext_fs@{ shape: cyl, label: "Filesystem<br>(HDFS, NAS)" }
|
|
||||||
end
|
|
||||||
|
|
||||||
subgraph emgr ["Data Platform"]
|
%% STAGE 2: RAW STORAGE
|
||||||
dag@{ shape: docs, label: "Python DAG" }
|
B -->|Store Raw Data| C["Raw Zone
|
||||||
af["Airflow"]
|
(S3 / MinIO)"]
|
||||||
tr["Trino"]
|
|
||||||
ss("Superset")
|
|
||||||
end
|
|
||||||
|
|
||||||
s3@{ shape: cyl, label: "S3<br>(MinIO)" }
|
%% STAGE 3: TRANSFORMATION
|
||||||
|
C -->|DAG / ETL / SQL Queries| D["Trino
|
||||||
|
(Query Engine)"]
|
||||||
|
B -->|Workflow Orchestration| D
|
||||||
|
|
||||||
dag -- (1a)<br>Fetch<br>raw data<br>(API, SDK) --> src
|
%% STAGE 4: PROCESSED STORAGE
|
||||||
dag -- (1b) --> tr
|
D -->|Write Processed Data| E["Processed / Curated Zone
|
||||||
tr -- (1b)<br>Fetch<br>raw data<br>(Trino connector) --> src
|
(S3 / MinIO)"]
|
||||||
af -- (2)<br>Execute<br>script --> dag
|
|
||||||
dag -- (3)<br>Store<br>processed<br>data<br>(SQL) --> tr
|
|
||||||
s3 <-- (4)<br>Read/write data<br>(Hive / Iceberg format) --> tr
|
|
||||||
ss -- (5)<br>Query<br>processed<br>data<br>(SQL) --> tr
|
|
||||||
|
|
||||||
|
%% STAGE 5: QUERY LAYER
|
||||||
|
E -->|Query Interface| F["Trino
|
||||||
|
(SQL Access Layer)"]
|
||||||
|
|
||||||
|
%% STAGE 6: VISUALIZATION
|
||||||
|
F -->|Data Access| G["Apache Superset
|
||||||
|
(Dashboarding & Analytics)"]
|
||||||
|
|
||||||
|
%% LABELS
|
||||||
|
classDef core fill:#4a90e2,stroke:#2c3e50,stroke-width:1px,color:white;
|
||||||
|
classDef storage fill:#6dbf4b,stroke:#2c3e50,stroke-width:1px,color:white;
|
||||||
|
classDef optional fill:#aaaaaa,stroke:#333,stroke-width:0.5px,color:white;
|
||||||
|
|
||||||
|
class B,D,F,G core;
|
||||||
|
class C,E storage;
|
||||||
|
class H1,H2,H3,H4 optional;
|
||||||
```
|
```
|
||||||
|
|
||||||
## Data Pipeline
|
## Data Pipeline
|
||||||
@ -82,22 +90,18 @@ ss -- (5)<br>Query<br>processed<br>data<br>(SQL) --> tr
|
|||||||
Collect raw data from multiple sources and bring it into the platform in a structured workflow.
|
Collect raw data from multiple sources and bring it into the platform in a structured workflow.
|
||||||
|
|
||||||
Components:
|
Components:
|
||||||
|
|
||||||
- Apache Airflow (orchestrates ingestion pipelines)
|
- Apache Airflow (orchestrates ingestion pipelines)
|
||||||
|
|
||||||
Data sources:
|
Data sources:
|
||||||
|
|
||||||
- Object storage (S3 / MinIO)
|
- Object storage (S3 / MinIO)
|
||||||
- files: csv, xlsx, txt etc.
|
- files: csv, xlsx, txt etc.
|
||||||
- APIs (REST/GraphQL endpoints)
|
- APIs (REST/GraphQL endpoints)
|
||||||
- Databases (PostgreSQL, MySQL, etc.)
|
- Databases (PostgreSQL, MySQL, etc.)
|
||||||
|
|
||||||
DA/DE tasks:
|
DA/DE tasks:
|
||||||
|
|
||||||
- Create DAGs in Airflow to pull data periodically.
|
- Create DAGs in Airflow to pull data periodically.
|
||||||
|
|
||||||
Sample Airflow DAG (Python):
|
Sample Airflow DAG (Python):
|
||||||
|
|
||||||
- See [sample_dag.py](#sample_dagpy)
|
- See [sample_dag.py](#sample_dagpy)
|
||||||
|
|
||||||
### 2. Raw Data Storage
|
### 2. Raw Data Storage
|
||||||
@ -105,15 +109,12 @@ Sample Airflow DAG (Python):
|
|||||||
Store the raw, unprocessed data in a centralized location for auditing and reprocessing.
|
Store the raw, unprocessed data in a centralized location for auditing and reprocessing.
|
||||||
|
|
||||||
Components:
|
Components:
|
||||||
|
|
||||||
- Object storage (S3 / MinIO)
|
- Object storage (S3 / MinIO)
|
||||||
|
|
||||||
DA/DE tasks:
|
DA/DE tasks:
|
||||||
|
|
||||||
- Organize data using bucket/folder structures.
|
- Organize data using bucket/folder structures.
|
||||||
|
|
||||||
Sample S3 Folder Structure:
|
Sample S3 Folder Structure:
|
||||||
|
|
||||||
- See [Data source files](#data-source-files)
|
- See [Data source files](#data-source-files)
|
||||||
|
|
||||||
### 3. Data Transformation / ETL
|
### 3. Data Transformation / ETL
|
||||||
@ -121,20 +122,16 @@ Sample S3 Folder Structure:
|
|||||||
Clean, enrich, and transform raw data into structured, query-ready form.
|
Clean, enrich, and transform raw data into structured, query-ready form.
|
||||||
|
|
||||||
Components:
|
Components:
|
||||||
|
|
||||||
- Apache Airflow (orchestration)
|
- Apache Airflow (orchestration)
|
||||||
- Trino (SQL engine for transformations)
|
- Trino (SQL engine for transformations)
|
||||||
|
|
||||||
DA/DE tasks:
|
DA/DE tasks:
|
||||||
|
|
||||||
- Schedule transformation jobs in Airflow DAGs.
|
- Schedule transformation jobs in Airflow DAGs.
|
||||||
|
|
||||||
Airflow DAG Snippet for ETL:
|
Airflow DAG Snippet for ETL:
|
||||||
|
|
||||||
- see [sample_dag.py](#sample_dagpy)
|
- see [sample_dag.py](#sample_dagpy)
|
||||||
|
|
||||||
S3 Folder Structure:
|
S3 Folder Structure:
|
||||||
|
|
||||||
- see [Python DAG files](#python-dag-files)
|
- see [Python DAG files](#python-dag-files)
|
||||||
|
|
||||||
### 4. Processed Data Storage
|
### 4. Processed Data Storage
|
||||||
@ -142,12 +139,10 @@ S3 Folder Structure:
|
|||||||
Store the transformed and curated datasets in a queryable format for analytics and dashboarding.
|
Store the transformed and curated datasets in a queryable format for analytics and dashboarding.
|
||||||
|
|
||||||
Components:
|
Components:
|
||||||
|
|
||||||
- Trino (query engine / SQL layer)
|
- Trino (query engine / SQL layer)
|
||||||
- S3 (object storage for processed datasets)
|
- S3 (object storage for processed datasets)
|
||||||
|
|
||||||
DA/DE tasks:
|
DA/DE tasks:
|
||||||
|
|
||||||
- Partition tables by date, region, or other dimensions for fast queries.
|
- Partition tables by date, region, or other dimensions for fast queries.
|
||||||
- Grant read access to Superset.
|
- Grant read access to Superset.
|
||||||
|
|
||||||
@ -156,20 +151,17 @@ DA/DE tasks:
|
|||||||
Provide dashboards and reports to enable insights and business decision-making.
|
Provide dashboards and reports to enable insights and business decision-making.
|
||||||
|
|
||||||
Components:
|
Components:
|
||||||
|
|
||||||
- Apache Superset (dashboarding / BI tool)
|
- Apache Superset (dashboarding / BI tool)
|
||||||
|
|
||||||
DA/DE tasks:
|
DA/DE tasks:
|
||||||
|
|
||||||
- Create datasets and charts (bar, line, heatmaps).
|
- Create datasets and charts (bar, line, heatmaps).
|
||||||
- Build dashboards combining multiple metrics.
|
- Build dashboards combining multiple metrics.
|
||||||
- Apply filters and access controls for different users.
|
- Apply filters and access controls for different users.
|
||||||
|
|
||||||
Data source connections:
|
Data source connections:
|
||||||
|
|
||||||
- 'Data Platform' service already configured these database connections in Superset:
|
- 'Data Platform' service already configured these database connections in Superset:
|
||||||
- iceberg
|
- iceberg
|
||||||
- hive
|
- hive
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
||||||
@ -217,7 +209,7 @@ with DAG(
|
|||||||
region_name=os.getenv("S3_REGION"),
|
region_name=os.getenv("S3_REGION"),
|
||||||
)
|
)
|
||||||
|
|
||||||
bucket_name = os.getenv("S3_BUCKET")
|
bucket_name = 'emgr'
|
||||||
key = 'airflow/excel/computer-parts-sales.xlsx'
|
key = 'airflow/excel/computer-parts-sales.xlsx'
|
||||||
sheet_name = 'Sheet1'
|
sheet_name = 'Sheet1'
|
||||||
columns = ['Date', 'Part', 'Quantity_Sold', 'Unit_Price', 'Total_Sale']
|
columns = ['Date', 'Part', 'Quantity_Sold', 'Unit_Price', 'Total_Sale']
|
||||||
@ -258,7 +250,7 @@ with DAG(
|
|||||||
region_name=os.getenv("S3_REGION"),
|
region_name=os.getenv("S3_REGION"),
|
||||||
)
|
)
|
||||||
|
|
||||||
bucket_name = os.getenv("S3_BUCKET")
|
bucket_name = 'emgr'
|
||||||
key = data.get('key')
|
key = data.get('key')
|
||||||
|
|
||||||
# read csv file from s3
|
# read csv file from s3
|
||||||
@ -310,7 +302,6 @@ with DAG(
|
|||||||
```
|
```
|
||||||
|
|
||||||
Image `azwan082/python:3.11-airflow-dag-3` used in example above contains these Python libraries:
|
Image `azwan082/python:3.11-airflow-dag-3` used in example above contains these Python libraries:
|
||||||
|
|
||||||
- boto3 - to connect to S3-compatible object storage
|
- boto3 - to connect to S3-compatible object storage
|
||||||
- pandas - to process data using DataFrame
|
- pandas - to process data using DataFrame
|
||||||
- requests - to perform HTTP requests to REST API or webpage
|
- requests - to perform HTTP requests to REST API or webpage
|
||||||
@ -324,11 +315,11 @@ If you need more libraries, or want to customize the image, refer to [Dockerfile
|
|||||||
Notes:
|
Notes:
|
||||||
|
|
||||||
- XCom means cross-communication, where one task can return values to be consumed by another task:
|
- XCom means cross-communication, where one task can return values to be consumed by another task:
|
||||||
- Sample code above has two tasks, to demo how XCom works. For simple DAG, one task is enough.
|
- Sample code above has two tasks, to demo how XCom works. For simple DAG, one task is enough.
|
||||||
- Do not return large data between task through XCom, the pod may fail to start. Store resulting data in object storage.
|
- Do not return large data between task through XCom, the pod may fail to start. Store resulting data in object storage.
|
||||||
- Since Airflow is configured to use KubernetesExecutor, each tasks in a DAG will be executed on a new pod. In order to reduce impact of pods startup overhead:
|
- Since Airflow is configured to use KubernetesExecutor, each tasks in a DAG will be executed on a new pod. In order to reduce impact of pods startup overhead:
|
||||||
- Design your DAGs with fewer tasks.
|
- Design your DAGs with fewer tasks.
|
||||||
- Avoid scheduling DAGs too frequently, set at least 5 minutes apart.
|
- Avoid scheduling DAGs too frequently, set at least 5 minutes apart.
|
||||||
|
|
||||||
### Dockerfile
|
### Dockerfile
|
||||||
|
|
||||||
@ -352,24 +343,18 @@ CMD ["python3"]
|
|||||||
|
|
||||||
- Requirement: Docker installed & Docker hub account
|
- Requirement: Docker installed & Docker hub account
|
||||||
- Build image (run in folder containing the Dockerfile):
|
- Build image (run in folder containing the Dockerfile):
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker build -t <username>/python:3.11-airflow-dag .
|
docker build -t <username>/python:3.11-airflow-dag .
|
||||||
```
|
```
|
||||||
|
|
||||||
- Push image to Docker hub:
|
- Push image to Docker hub:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
docker login
|
docker login
|
||||||
docker push <username>/python:3.11-airflow-dag
|
docker push <username>/python:3.11-airflow-dag
|
||||||
```
|
```
|
||||||
|
|
||||||
- Update dag file to use this new image
|
- Update dag file to use this new image
|
||||||
|
|
||||||
```python
|
```python
|
||||||
@task.kubernetes(image="<username>/python:3.11-airflow-dag")
|
@task.kubernetes(image="<username>/python:3.11-airflow-dag")
|
||||||
```
|
```
|
||||||
|
|
||||||
- Note: update the image tag everytime you build a new image. E.g `python:3.11-airflow-dag-1.1`
|
- Note: update the image tag everytime you build a new image. E.g `python:3.11-airflow-dag-1.1`
|
||||||
|
|
||||||
## Object Storage Folder Structure
|
## Object Storage Folder Structure
|
||||||
@ -387,7 +372,6 @@ Assuming the 'Data Platform' service is deployed with 'Object Storage' configura
|
|||||||
|
|
||||||
- **MUST** be stored in `airflow/dags` folder in the target bucket, in order to be automatically synced to Airflow.
|
- **MUST** be stored in `airflow/dags` folder in the target bucket, in order to be automatically synced to Airflow.
|
||||||
- Example object path:
|
- Example object path:
|
||||||
|
|
||||||
```
|
```
|
||||||
s3://s3.example.net/emgr/airflow/dags/sample_dag.py
|
s3://s3.example.net/emgr/airflow/dags/sample_dag.py
|
||||||
s3://s3.example.net/emgr/airflow/dags/monthly_sales.py
|
s3://s3.example.net/emgr/airflow/dags/monthly_sales.py
|
||||||
@ -397,14 +381,13 @@ s3://s3.example.net/emgr/airflow/dags/monthly_sales.py
|
|||||||
|
|
||||||
- Example data source files are xlsx, csv or txt files, for both raw & processed data.
|
- Example data source files are xlsx, csv or txt files, for both raw & processed data.
|
||||||
- They can be stored in any location within the target bucket, **EXCEPT** locations from sections above:
|
- They can be stored in any location within the target bucket, **EXCEPT** locations from sections above:
|
||||||
- `warehouses`
|
- `warehouses`
|
||||||
- `airflow/dags` (specifically)
|
- `airflow/dags` (specifically)
|
||||||
- However, you may, and encouraged, to store the data source files inside the `airflow` folder.
|
- However, you may, and encouraged, to store the data source files inside the `airflow` folder.
|
||||||
- Example object path:
|
- Example object path:
|
||||||
|
|
||||||
```
|
```
|
||||||
s3://s3.example.net/emgr/airflow/raw/sample.csv
|
s3://s3.example.net/emgr/airflow/raw/sample.csv
|
||||||
s3://s3.example.net/emgr/airflow/output/voters.csv
|
s3://s3.example.net/emgr/airflow/output/voters.csv
|
||||||
s3://s3.example.net/emgr/2025-11-11/data.json
|
s3://s3.example.net/emgr/2025-11-11/data.json
|
||||||
s3://s3.example.net/emgr/raw/db/orders_20251111.csv
|
s3://s3.example.net/emgr/raw/db/orders_20251111.csv
|
||||||
```
|
```
|
||||||
Loading…
x
Reference in New Issue
Block a user