Workflow Engine Development Based on Apache Airflow

Our company is engaged in the development, support and maintenance of sites of any complexity. From simple one-page sites to large-scale cluster systems built on micro services. Experience of developers is confirmed by certificates from vendors.
Development and maintenance of all types of websites:
Informational websites or web applications
Business card websites, landing pages, corporate websites, online catalogs, quizzes, promo websites, blogs, news resources, informational portals, forums, aggregators
E-commerce websites or web applications
Online stores, B2B portals, marketplaces, online exchanges, cashback websites, exchanges, dropshipping platforms, product parsers
Business process management web applications
CRM systems, ERP systems, corporate portals, production management systems, information parsers
Electronic service websites or web applications
Classified ads platforms, online schools, online cinemas, website builders, portals for electronic services, video hosting platforms, thematic portals

These are just some of the technical types of websites we work with, and each of them can have its own specific features and functionality, as well as be customized to meet the specific needs and goals of the client.

Showing 1 of 1 servicesAll 2065 services
Workflow Engine Development Based on Apache Airflow
Complex
~2-4 weeks
FAQ
Our competencies:
Development stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848
  • image_website-sbh_0.png
    Website development for SBH Partners
    999
  • image_website-_0.png
    Website development for Red Pear
    451

Developing Workflow Engine with Apache Airflow

Apache Airflow is a platform for orchestrating data pipelines and ETL processes. Workflows are defined as Python code in the form of DAGs (Directed Acyclic Graphs). Airflow stores execution history, supports backfill, monitors task state, and supports parallel execution.

When to Use Airflow vs Temporal/Camunda

Airflow is optimized for batch data processing:

  • ETL/ELT pipelines (PostgreSQL → transformation → Data Warehouse)
  • Daily reports and exports
  • ML pipelines (data preparation → training → model deployment)
  • Periodic aggregations and synchronizations

For event-driven business processes with human tasks — use Temporal or Camunda.

Installation via Helm (Kubernetes)

helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow \
  --namespace airflow \
  --create-namespace \
  --set executor=KubernetesExecutor \
  --set postgresql.enabled=true \
  --set redis.enabled=true \
  --values airflow-values.yaml
# airflow-values.yaml
airflow:
  image:
    repository: apache/airflow
    tag: 2.8.0
  config:
    AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
    AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG: "3"
    AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "30"

dags:
  gitSync:
    enabled: true
    repo: https://github.com/company/airflow-dags.git
    branch: main
    subPath: dags/

DAG — Example ETL Pipeline

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime, timedelta
import pandas as pd

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email': ['[email protected]'],
}

with DAG(
    'daily_orders_etl',
    default_args=default_args,
    schedule_interval='0 2 * * *',  # every day at 02:00 UTC
    catchup=False,
    tags=['etl', 'orders'],
    description='Load and transform orders to DWH',
) as dag:

    # Step 1: Extract data from production DB
    def extract_orders(**context):
        hook = PostgresHook(postgres_conn_id='production_db')
        ds = context['ds']  # execution date: 2026-03-28

        df = hook.get_pandas_df(f"""
            SELECT o.id, o.customer_id, o.total, o.status,
                   o.created_at, c.email, c.country
            FROM orders o
            JOIN customers c ON c.id = o.customer_id
            WHERE o.created_at::date = '{ds}'
              AND o.status IN ('paid', 'shipped', 'delivered')
        """)

        # Save to XCom for next step
        context['ti'].xcom_push(key='orders_count', value=len(df))
        df.to_parquet(f'/tmp/orders_{ds}.parquet')
        return len(df)

    # Step 2: Transform
    def transform_orders(**context):
        ds = context['ds']
        df = pd.read_parquet(f'/tmp/orders_{ds}.parquet')

        # Transformations
        df['order_date'] = pd.to_datetime(df['created_at']).dt.date
        df['revenue_usd'] = df['total'] / 100  # cents → dollars
        df['is_international'] = df['country'] != 'RU'
        df['customer_tier'] = df['revenue_usd'].apply(
            lambda x: 'vip' if x >= 500 else 'regular'
        )

        df.to_parquet(f'/tmp/orders_transformed_{ds}.parquet')

    # Step 3: Load to DWH
    def load_to_dwh(**context):
        ds = context['ds']
        df = pd.read_parquet(f'/tmp/orders_transformed_{ds}.parquet')

        hook = PostgresHook(postgres_conn_id='datawarehouse')
        engine = hook.get_sqlalchemy_engine()

        # Upsert to DWH
        df.to_sql('fact_orders', engine, schema='dwh',
                  if_exists='append', index=False,
                  method='multi', chunksize=1000)

    # Step 4: Aggregations for dashboard
    aggregate_metrics = PostgresOperator(
        task_id='aggregate_metrics',
        postgres_conn_id='datawarehouse',
        sql="""
        INSERT INTO dwh.daily_metrics (date, total_revenue, orders_count, avg_order)
        SELECT
          '{{ ds }}'::date,
          SUM(revenue_usd),
          COUNT(*),
          AVG(revenue_usd)
        FROM dwh.fact_orders
        WHERE order_date = '{{ ds }}'
        ON CONFLICT (date) DO UPDATE SET
          total_revenue = EXCLUDED.total_revenue,
          orders_count = EXCLUDED.orders_count,
          avg_order = EXCLUDED.avg_order;
        """,
    )

    extract = PythonOperator(task_id='extract_orders', python_callable=extract_orders)
    transform = PythonOperator(task_id='transform_orders', python_callable=transform_orders)
    load = PythonOperator(task_id='load_to_dwh', python_callable=load_to_dwh)

    # Dependencies
    extract >> transform >> load >> aggregate_metrics

Parallel Execution

from airflow.utils.task_group import TaskGroup

with TaskGroup('process_regions') as process_regions:
    for region in ['EU', 'US', 'APAC']:
        PythonOperator(
            task_id=f'process_{region.lower()}',
            python_callable=process_region_data,
            op_kwargs={'region': region}
        )

# Process all regions in parallel, then aggregate
extract >> process_regions >> aggregate_all

Sensors — Waiting for Conditions

from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor

# Wait until file appears
wait_for_file = FileSensor(
    task_id='wait_for_export',
    filepath='/data/exports/daily_export_{{ ds }}.csv',
    timeout=3600,
    poke_interval=60,
)

# Wait until API returns success
wait_for_api = HttpSensor(
    task_id='wait_for_processing',
    http_conn_id='data_api',
    endpoint='/status/{{ ds }}',
    response_check=lambda response: response.json()['status'] == 'ready',
    timeout=1800,
    poke_interval=120,
)

KubernetesExecutor

With KubernetesExecutor each task runs in a separate Pod:

# Pod configuration for specific task
executor_config = {
    'KubernetesExecutor': {
        'request_memory': '2Gi',
        'request_cpu': '500m',
        'limit_memory': '4Gi',
        'image': 'custom-airflow:2.8.0-pandas',  # custom image with dependencies
    }
}

heavy_transform = PythonOperator(
    task_id='heavy_transform',
    python_callable=transform_large_dataset,
    executor_config=executor_config
)

Backfill

Recalculate historical data for past period:

airflow dags backfill daily_orders_etl \
  --start-date 2026-01-01 \
  --end-date 2026-03-27 \
  --reset-dagruns

Implementation Timeframe

  • Airflow deployment (Helm/Docker) + first DAG — 3–5 days
  • ETL pipeline with 5–8 tasks, transformations, and DWH load — 1–2 weeks
  • Complex pipeline with parallelism, sensors, and backfill — 2–4 weeks