Developing Workflow Engine with Apache Airflow
Apache Airflow is a platform for orchestrating data pipelines and ETL processes. Workflows are defined as Python code in the form of DAGs (Directed Acyclic Graphs). Airflow stores execution history, supports backfill, monitors task state, and supports parallel execution.
When to Use Airflow vs Temporal/Camunda
Airflow is optimized for batch data processing:
- ETL/ELT pipelines (PostgreSQL → transformation → Data Warehouse)
- Daily reports and exports
- ML pipelines (data preparation → training → model deployment)
- Periodic aggregations and synchronizations
For event-driven business processes with human tasks — use Temporal or Camunda.
Installation via Helm (Kubernetes)
helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--set executor=KubernetesExecutor \
--set postgresql.enabled=true \
--set redis.enabled=true \
--values airflow-values.yaml
# airflow-values.yaml
airflow:
image:
repository: apache/airflow
tag: 2.8.0
config:
AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: "3"
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "30"
dags:
gitSync:
enabled: true
repo: https://github.com/company/airflow-dags.git
branch: main
subPath: dags/
DAG — Example ETL Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['[email protected]'],
}
with DAG(
'daily_orders_etl',
default_args=default_args,
schedule_interval='0 2 * * *', # every day at 02:00 UTC
catchup=False,
tags=['etl', 'orders'],
description='Load and transform orders to DWH',
) as dag:
# Step 1: Extract data from production DB
def extract_orders(**context):
hook = PostgresHook(postgres_conn_id='production_db')
ds = context['ds'] # execution date: 2026-03-28
df = hook.get_pandas_df(f"""
SELECT o.id, o.customer_id, o.total, o.status,
o.created_at, c.email, c.country
FROM orders o
JOIN customers c ON c.id = o.customer_id
WHERE o.created_at::date = '{ds}'
AND o.status IN ('paid', 'shipped', 'delivered')
""")
# Save to XCom for next step
context['ti'].xcom_push(key='orders_count', value=len(df))
df.to_parquet(f'/tmp/orders_{ds}.parquet')
return len(df)
# Step 2: Transform
def transform_orders(**context):
ds = context['ds']
df = pd.read_parquet(f'/tmp/orders_{ds}.parquet')
# Transformations
df['order_date'] = pd.to_datetime(df['created_at']).dt.date
df['revenue_usd'] = df['total'] / 100 # cents → dollars
df['is_international'] = df['country'] != 'RU'
df['customer_tier'] = df['revenue_usd'].apply(
lambda x: 'vip' if x >= 500 else 'regular'
)
df.to_parquet(f'/tmp/orders_transformed_{ds}.parquet')
# Step 3: Load to DWH
def load_to_dwh(**context):
ds = context['ds']
df = pd.read_parquet(f'/tmp/orders_transformed_{ds}.parquet')
hook = PostgresHook(postgres_conn_id='datawarehouse')
engine = hook.get_sqlalchemy_engine()
# Upsert to DWH
df.to_sql('fact_orders', engine, schema='dwh',
if_exists='append', index=False,
method='multi', chunksize=1000)
# Step 4: Aggregations for dashboard
aggregate_metrics = PostgresOperator(
task_id='aggregate_metrics',
postgres_conn_id='datawarehouse',
sql="""
INSERT INTO dwh.daily_metrics (date, total_revenue, orders_count, avg_order)
SELECT
'{{ ds }}'::date,
SUM(revenue_usd),
COUNT(*),
AVG(revenue_usd)
FROM dwh.fact_orders
WHERE order_date = '{{ ds }}'
ON CONFLICT (date) DO UPDATE SET
total_revenue = EXCLUDED.total_revenue,
orders_count = EXCLUDED.orders_count,
avg_order = EXCLUDED.avg_order;
""",
)
extract = PythonOperator(task_id='extract_orders', python_callable=extract_orders)
transform = PythonOperator(task_id='transform_orders', python_callable=transform_orders)
load = PythonOperator(task_id='load_to_dwh', python_callable=load_to_dwh)
# Dependencies
extract >> transform >> load >> aggregate_metrics
Parallel Execution
from airflow.utils.task_group import TaskGroup
with TaskGroup('process_regions') as process_regions:
for region in ['EU', 'US', 'APAC']:
PythonOperator(
task_id=f'process_{region.lower()}',
python_callable=process_region_data,
op_kwargs={'region': region}
)
# Process all regions in parallel, then aggregate
extract >> process_regions >> aggregate_all
Sensors — Waiting for Conditions
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
# Wait until file appears
wait_for_file = FileSensor(
task_id='wait_for_export',
filepath='/data/exports/daily_export_{{ ds }}.csv',
timeout=3600,
poke_interval=60,
)
# Wait until API returns success
wait_for_api = HttpSensor(
task_id='wait_for_processing',
http_conn_id='data_api',
endpoint='/status/{{ ds }}',
response_check=lambda response: response.json()['status'] == 'ready',
timeout=1800,
poke_interval=120,
)
KubernetesExecutor
With KubernetesExecutor each task runs in a separate Pod:
# Pod configuration for specific task
executor_config = {
'KubernetesExecutor': {
'request_memory': '2Gi',
'request_cpu': '500m',
'limit_memory': '4Gi',
'image': 'custom-airflow:2.8.0-pandas', # custom image with dependencies
}
}
heavy_transform = PythonOperator(
task_id='heavy_transform',
python_callable=transform_large_dataset,
executor_config=executor_config
)
Backfill
Recalculate historical data for past period:
airflow dags backfill daily_orders_etl \
--start-date 2026-01-01 \
--end-date 2026-03-27 \
--reset-dagruns
Implementation Timeframe
- Airflow deployment (Helm/Docker) + first DAG — 3–5 days
- ETL pipeline with 5–8 tasks, transformations, and DWH load — 1–2 weeks
- Complex pipeline with parallelism, sensors, and backfill — 2–4 weeks







