fix sample dag py

This commit is contained in:
Azwan b. Amit 2025-11-19 10:29:08 +08:00
parent 15b3b94d12
commit 176a28e1a1

View File

@ -7,23 +7,23 @@ This manual guides data engineers & data analysts (DA/DE) through using Airflow,
## Table of Contents ## Table of Contents
- [Components](#components) - [Components](#components)
- [Airflow](#airflow) - [Airflow](#airflow)
- [Superset](#superset) - [Superset](#superset)
- [Trino](#trino) - [Trino](#trino)
- [Object Storage](#object-storage) - [Object Storage](#object-storage)
- [Workflow](#workflow) - [Workflow](#workflow)
- [Data Pipeline](#data-pipeline) - [Data Pipeline](#data-pipeline)
- [1. Data Ingestion](#1-data-ingestion) - [1. Data Ingestion](#1-data-ingestion)
- [2. Raw Data Storage](#2-raw-data-storage) - [2. Raw Data Storage](#2-raw-data-storage)
- [3. Data Transformation / ETL](#3-data-transformation--etl) - [3. Data Transformation / ETL](#3-data-transformation--etl)
- [4. Processed Data Storage](#4-processed-data-storage) - [4. Processed Data Storage](#4-processed-data-storage)
- [5. Data Visualization](#5-data-visualization) - [5. Data Visualization](#5-data-visualization)
- [Example](#example) - [Example](#example)
- [sample_dat.py](#sample_dagpy) - [sample_dag.py](#sample_dagpy)
- [Dockerfile](#dockerfile) - [Dockerfile](#dockerfile)
- [Build & publish container image](#build--publish-container-image) - [Build & publish container image](#build--publish-container-image)
---
---
## Components ## Components
@ -90,18 +90,22 @@ class H1,H2,H3,H4 optional;
Collect raw data from multiple sources and bring it into the platform in a structured workflow. Collect raw data from multiple sources and bring it into the platform in a structured workflow.
Components: Components:
- Apache Airflow (orchestrates ingestion pipelines) - Apache Airflow (orchestrates ingestion pipelines)
Data sources: Data sources:
- Object storage (S3 / MinIO) - Object storage (S3 / MinIO)
- files: csv, xlsx, txt etc. - files: csv, xlsx, txt etc.
- APIs (REST/GraphQL endpoints) - APIs (REST/GraphQL endpoints)
- Databases (PostgreSQL, MySQL, etc.) - Databases (PostgreSQL, MySQL, etc.)
DA/DE tasks: DA/DE tasks:
- Create DAGs in Airflow to pull data periodically. - Create DAGs in Airflow to pull data periodically.
Sample Airflow DAG (Python): Sample Airflow DAG (Python):
- See [sample_dag.py](#sample_dagpy) - See [sample_dag.py](#sample_dagpy)
### 2. Raw Data Storage ### 2. Raw Data Storage
@ -109,12 +113,15 @@ Sample Airflow DAG (Python):
Store the raw, unprocessed data in a centralized location for auditing and reprocessing. Store the raw, unprocessed data in a centralized location for auditing and reprocessing.
Components: Components:
- Object storage (S3 / MinIO) - Object storage (S3 / MinIO)
DA/DE tasks: DA/DE tasks:
- Organize data using bucket/folder structures. - Organize data using bucket/folder structures.
Sample S3 Folder Structure: Sample S3 Folder Structure:
- See [Data source files](#data-source-files) - See [Data source files](#data-source-files)
### 3. Data Transformation / ETL ### 3. Data Transformation / ETL
@ -122,16 +129,20 @@ Sample S3 Folder Structure:
Clean, enrich, and transform raw data into structured, query-ready form. Clean, enrich, and transform raw data into structured, query-ready form.
Components: Components:
- Apache Airflow (orchestration) - Apache Airflow (orchestration)
- Trino (SQL engine for transformations) - Trino (SQL engine for transformations)
DA/DE tasks: DA/DE tasks:
- Schedule transformation jobs in Airflow DAGs. - Schedule transformation jobs in Airflow DAGs.
Airflow DAG Snippet for ETL: Airflow DAG Snippet for ETL:
- see [sample_dag.py](#sample_dagpy) - see [sample_dag.py](#sample_dagpy)
S3 Folder Structure: S3 Folder Structure:
- see [Python DAG files](#python-dag-files) - see [Python DAG files](#python-dag-files)
### 4. Processed Data Storage ### 4. Processed Data Storage
@ -139,10 +150,12 @@ S3 Folder Structure:
Store the transformed and curated datasets in a queryable format for analytics and dashboarding. Store the transformed and curated datasets in a queryable format for analytics and dashboarding.
Components: Components:
- Trino (query engine / SQL layer) - Trino (query engine / SQL layer)
- S3 (object storage for processed datasets) - S3 (object storage for processed datasets)
DA/DE tasks: DA/DE tasks:
- Partition tables by date, region, or other dimensions for fast queries. - Partition tables by date, region, or other dimensions for fast queries.
- Grant read access to Superset. - Grant read access to Superset.
@ -151,17 +164,20 @@ DA/DE tasks:
Provide dashboards and reports to enable insights and business decision-making. Provide dashboards and reports to enable insights and business decision-making.
Components: Components:
- Apache Superset (dashboarding / BI tool) - Apache Superset (dashboarding / BI tool)
DA/DE tasks: DA/DE tasks:
- Create datasets and charts (bar, line, heatmaps). - Create datasets and charts (bar, line, heatmaps).
- Build dashboards combining multiple metrics. - Build dashboards combining multiple metrics.
- Apply filters and access controls for different users. - Apply filters and access controls for different users.
Data source connections: Data source connections:
- 'Data Platform' service already configured these database connections in Superset: - 'Data Platform' service already configured these database connections in Superset:
- iceberg - iceberg
- hive - hive
## Example ## Example
@ -209,7 +225,7 @@ with DAG(
region_name=os.getenv("S3_REGION"), region_name=os.getenv("S3_REGION"),
) )
bucket_name = 'emgr' bucket_name = os.getenv("S3_BUCKET")
key = 'airflow/excel/computer-parts-sales.xlsx' key = 'airflow/excel/computer-parts-sales.xlsx'
sheet_name = 'Sheet1' sheet_name = 'Sheet1'
columns = ['Date', 'Part', 'Quantity_Sold', 'Unit_Price', 'Total_Sale'] columns = ['Date', 'Part', 'Quantity_Sold', 'Unit_Price', 'Total_Sale']
@ -250,7 +266,7 @@ with DAG(
region_name=os.getenv("S3_REGION"), region_name=os.getenv("S3_REGION"),
) )
bucket_name = 'emgr' bucket_name = os.getenv("S3_BUCKET")
key = data.get('key') key = data.get('key')
# read csv file from s3 # read csv file from s3
@ -302,6 +318,7 @@ with DAG(
``` ```
Image `azwan082/python:3.11-airflow-dag-3` used in example above contains these Python libraries: Image `azwan082/python:3.11-airflow-dag-3` used in example above contains these Python libraries:
- boto3 - to connect to S3-compatible object storage - boto3 - to connect to S3-compatible object storage
- pandas - to process data using DataFrame - pandas - to process data using DataFrame
- requests - to perform HTTP requests to REST API or webpage - requests - to perform HTTP requests to REST API or webpage
@ -315,11 +332,11 @@ If you need more libraries, or want to customize the image, refer to [Dockerfile
Notes: Notes:
- XCom means cross-communication, where one task can return values to be consumed by another task: - XCom means cross-communication, where one task can return values to be consumed by another task:
- Sample code above has two tasks, to demo how XCom works. For simple DAG, one task is enough. - Sample code above has two tasks, to demo how XCom works. For simple DAG, one task is enough.
- Do not return large data between task through XCom, the pod may fail to start. Store resulting data in object storage. - Do not return large data between task through XCom, the pod may fail to start. Store resulting data in object storage.
- Since Airflow is configured to use KubernetesExecutor, each tasks in a DAG will be executed on a new pod. In order to reduce impact of pods startup overhead: - Since Airflow is configured to use KubernetesExecutor, each tasks in a DAG will be executed on a new pod. In order to reduce impact of pods startup overhead:
- Design your DAGs with fewer tasks. - Design your DAGs with fewer tasks.
- Avoid scheduling DAGs too frequently, set at least 5 minutes apart. - Avoid scheduling DAGs too frequently, set at least 5 minutes apart.
### Dockerfile ### Dockerfile
@ -343,18 +360,24 @@ CMD ["python3"]
- Requirement: Docker installed & Docker hub account - Requirement: Docker installed & Docker hub account
- Build image (run in folder containing the Dockerfile): - Build image (run in folder containing the Dockerfile):
```bash ```bash
docker build -t <username>/python:3.11-airflow-dag . docker build -t <username>/python:3.11-airflow-dag .
``` ```
- Push image to Docker hub: - Push image to Docker hub:
```bash ```bash
docker login docker login
docker push <username>/python:3.11-airflow-dag docker push <username>/python:3.11-airflow-dag
``` ```
- Update dag file to use this new image - Update dag file to use this new image
```python ```python
@task.kubernetes(image="<username>/python:3.11-airflow-dag") @task.kubernetes(image="<username>/python:3.11-airflow-dag")
``` ```
- Note: update the image tag everytime you build a new image. E.g `python:3.11-airflow-dag-1.1` - Note: update the image tag everytime you build a new image. E.g `python:3.11-airflow-dag-1.1`
## Object Storage Folder Structure ## Object Storage Folder Structure
@ -372,6 +395,7 @@ Assuming the 'Data Platform' service is deployed with 'Object Storage' configura
- **MUST** be stored in `airflow/dags` folder in the target bucket, in order to be automatically synced to Airflow. - **MUST** be stored in `airflow/dags` folder in the target bucket, in order to be automatically synced to Airflow.
- Example object path: - Example object path:
``` ```
s3://s3.example.net/emgr/airflow/dags/sample_dag.py s3://s3.example.net/emgr/airflow/dags/sample_dag.py
s3://s3.example.net/emgr/airflow/dags/monthly_sales.py s3://s3.example.net/emgr/airflow/dags/monthly_sales.py
@ -381,13 +405,14 @@ s3://s3.example.net/emgr/airflow/dags/monthly_sales.py
- Example data source files are xlsx, csv or txt files, for both raw & processed data. - Example data source files are xlsx, csv or txt files, for both raw & processed data.
- They can be stored in any location within the target bucket, **EXCEPT** locations from sections above: - They can be stored in any location within the target bucket, **EXCEPT** locations from sections above:
- `warehouses` - `warehouses`
- `airflow/dags` (specifically) - `airflow/dags` (specifically)
- However, you may, and encouraged, to store the data source files inside the `airflow` folder. - However, you may, and encouraged, to store the data source files inside the `airflow` folder.
- Example object path: - Example object path:
``` ```
s3://s3.example.net/emgr/airflow/raw/sample.csv s3://s3.example.net/emgr/airflow/raw/sample.csv
s3://s3.example.net/emgr/airflow/output/voters.csv s3://s3.example.net/emgr/airflow/output/voters.csv
s3://s3.example.net/emgr/2025-11-11/data.json s3://s3.example.net/emgr/2025-11-11/data.json
s3://s3.example.net/emgr/raw/db/orders_20251111.csv s3://s3.example.net/emgr/raw/db/orders_20251111.csv
``` ```