Designing Data Pipeline Architecture for AI
A Data Pipeline for AI is a system that transforms raw data from various sources into datasets and features ready for model training and inference. Proper architecture determines scalability, reliability, and the speed of ML team iteration.
Types of AI Data Pipelines
Batch Pipeline — processing data in large batches on a schedule. Suitable for model training, preparing training datasets, daily recommendations.
Streaming Pipeline — processing events in real-time. Suitable for real-time recommendations, fraud detection, dynamic pricing.
Lambda Architecture — combination of batch and streaming. The batch layer provides accuracy, speed layer provides freshness.
Kappa Architecture — everything through streaming, batch is simply a replay of historical events. Simpler operationally, requires a more powerful streaming engine.
Architecture Components
Data Sources:
├── Operational DBs (PostgreSQL, MySQL) → CDC (Debezium)
├── Event Streams (Kafka) → Direct consumption
├── File Storage (S3) → Batch ingestion
├── APIs (REST, GraphQL) → Connectors (Airbyte, Fivetran)
└── ML Feedback (predictions) → Kafka events
Processing Layer:
├── Batch: Apache Spark / dbt / pandas
├── Streaming: Apache Flink / Spark Structured Streaming
└── Feature computation: Feast / Tecton
Storage Layer:
├── Raw Zone (S3/GCS): исходные данные без изменений
├── Curated Zone (Delta Lake/Iceberg): очищенные данные
├── Feature Store: готовые признаки для ML
└── ML Artifacts (S3 + DVC): датасеты для обучения
Orchestration:
└── Apache Airflow / Prefect / Dagster
Incremental Processing
from airflow.decorators import dag, task
from datetime import datetime, timedelta
@dag(
schedule_interval='@hourly',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)}
)
def user_features_pipeline():
@task
def extract_events(execution_date=None):
# Инкрементальная загрузка: только новые события
watermark = get_watermark('user_events')
events = clickhouse.query(
"SELECT * FROM user_events WHERE event_time > %(watermark)s",
{'watermark': watermark}
)
update_watermark('user_events', events['event_time'].max())
return events.to_parquet()
@task
def compute_features(events_path: str):
events = pd.read_parquet(events_path)
# Вычисление признаков
features = events.groupby('user_id').agg({
'event_time': 'max',
'event_type': 'count',
'session_duration': ['mean', 'sum'],
}).reset_index()
features.columns = [
'user_id', 'last_activity', 'event_count',
'avg_session_duration', 'total_session_time'
]
return features.to_parquet()
@task
def materialize_to_feature_store(features_path: str):
features = pd.read_parquet(features_path)
feast_store.write_to_online_store('user_features', features)
feast_store.write_to_offline_store('user_features', features)
events = extract_events()
features = compute_features(events)
materialize_to_feature_store(features)
pipeline = user_features_pipeline()
Data Quality in Pipelines
from great_expectations.core import ExpectationSuite
class DataQualityValidator:
def __init__(self, suite_name: str):
self.context = great_expectations.get_context()
self.suite = self.context.get_expectation_suite(suite_name)
def validate(self, df: pd.DataFrame) -> ValidationResult:
validator = self.context.get_validator(
batch_request=RuntimeBatchRequest(
datasource_name="pandas_datasource",
data_connector_name="runtime",
data_asset_name="ml_features",
runtime_parameters={"batch_data": df},
batch_identifiers={"run_id": str(uuid.uuid4())}
),
expectation_suite=self.suite
)
results = validator.validate()
if not results.success:
failed = [r for r in results.results if not r.success]
raise DataQualityError(f"Validation failed: {failed}")
return results
Schema Evolution Handling
# Delta Lake supports schema evolution:
from delta import DeltaTable
DeltaTable.forPath(spark, "s3://bucket/user_features") \
.toDF() \
.mergeSchema(new_schema) \
.write \
.option("mergeSchema", "true") \
.format("delta") \
.mode("append") \
.save("s3://bucket/user_features")
Pipeline Monitoring
Key metrics: freshness (data lag relative to source), completeness (% of expected records received), latency (execution time of each stage), error rate. Alerts: data not updated for >N hours, uptime fell below threshold, record count deviates anomalously from expected.
Typical design outcome: ML teams receive fresh features every 15-60 minutes instead of daily batch calculations, time to prepare new training datasets reduces from hours to 10-15 minutes.







