Development of Automatic Model Retraining System
ML models for trading degrade over time: market regimes change, relationships between variables shift. An automatic retraining system detects degradation and triggers new training without manual intervention.
Retraining Triggers
Performance-based trigger: model's directional accuracy falls below threshold over a rolling window:
class RetrainingTrigger:
def __init__(self, performance_threshold=0.52, window_days=14,
min_predictions=100):
self.threshold = performance_threshold
self.window = window_days
self.min_predictions = min_predictions
def should_retrain(self, recent_predictions, recent_actuals):
if len(recent_predictions) < self.min_predictions:
return False, 'insufficient_data'
accuracy = np.mean(
np.sign(recent_predictions) == np.sign(recent_actuals)
)
if accuracy < self.threshold:
return True, f'accuracy_{accuracy:.3f}_below_{self.threshold}'
return False, 'performance_ok'
def check_data_drift(self, train_features, current_features):
"""Population Stability Index (PSI) for feature drift"""
psi_values = {}
for col in train_features.columns:
# Divide into 10 bins based on training data
bins = np.percentile(train_features[col].dropna(),
np.linspace(0, 100, 11))
bins[0] -= 1e-8
train_counts = np.histogram(train_features[col], bins=bins)[0]
current_counts = np.histogram(current_features[col], bins=bins)[0]
# PSI
train_pct = train_counts / train_counts.sum()
current_pct = current_counts / current_counts.sum()
# Avoid log(0)
train_pct = np.clip(train_pct, 1e-8, None)
current_pct = np.clip(current_pct, 1e-8, None)
psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
psi_values[col] = psi
# PSI > 0.2 = significant drift
max_psi = max(psi_values.values())
n_drifted = sum(1 for v in psi_values.values() if v > 0.2)
return {
'max_psi': max_psi,
'n_drifted_features': n_drifted,
'should_retrain': max_psi > 0.25 or n_drifted > 3,
'psi_by_feature': psi_values
}
def check_scheduled(self, last_training_date, retrain_frequency_days=7):
"""Scheduled retraining by schedule"""
days_since_training = (datetime.utcnow() - last_training_date).days
return days_since_training >= retrain_frequency_days
Automatic Training Pipeline
import mlflow
from prefect import flow, task
@task
def fetch_training_data(symbol, lookback_days=365):
"""Load data for retraining"""
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=lookback_days)
# Load from ClickHouse/PostgreSQL
return load_ohlcv_data(symbol, start_date, end_date)
@task
def prepare_features(raw_data):
"""Feature engineering"""
from feature_pipeline import FeatureEngineer
engineer = FeatureEngineer()
return engineer.create_all_features(raw_data)
@task
def train_and_evaluate(features_df, target_col, model_config):
"""Train model with walk-forward validation"""
from training import WalkForwardTrainer
trainer = WalkForwardTrainer(
n_splits=5,
test_size=60, # 60 days test set
gap=24 # gap between train and test (hours)
)
with mlflow.start_run():
model, metrics = trainer.fit_evaluate(features_df, target_col, model_config)
# Log metrics to MLflow
mlflow.log_metrics(metrics)
mlflow.log_params(model_config)
mlflow.sklearn.log_model(model, 'model')
run_id = mlflow.active_run().info.run_id
return model, metrics, run_id
@task
def validate_and_promote(model, metrics, run_id, min_metrics):
"""Check quality and decide on deployment"""
passes_validation = (
metrics.get('directional_accuracy', 0) >= min_metrics['accuracy'] and
metrics.get('sharpe_ratio', 0) >= min_metrics['sharpe'] and
metrics.get('max_drawdown', 1) <= min_metrics['max_drawdown']
)
if passes_validation:
# Register as new Production version
client = mlflow.tracking.MlflowClient()
model_version = client.create_model_version(
name='crypto_predictor',
source=f'runs:/{run_id}/model',
run_id=run_id
)
client.transition_model_version_stage(
'crypto_predictor', model_version.version, 'Production'
)
return True, model_version.version
return False, None
@flow(name="model_retraining_pipeline")
def retrain_model_pipeline(symbol, model_config, min_metrics):
raw_data = fetch_training_data(symbol)
features_df = prepare_features(raw_data)
model, metrics, run_id = train_and_evaluate(features_df, 'target', model_config)
promoted, version = validate_and_promote(model, metrics, run_id, min_metrics)
return {'promoted': promoted, 'version': version, 'metrics': metrics}
Zero-downtime Model Updates
Upon successful training of a new model, replace the old one without stopping trading:
class ModelHotSwapper:
def __init__(self):
self.current_model = None
self.model_version = None
self._lock = asyncio.Lock()
async def swap_model(self, new_model, new_version):
"""Thread-safe model replacement"""
async with self._lock:
old_model = self.current_model
old_version = self.model_version
self.current_model = new_model
self.model_version = new_version
# Log model swap
logger.info(f"Model swapped: {old_version} -> {new_version}")
# Old model can be unloaded from memory
del old_model
async def predict(self, features):
async with self._lock:
return self.current_model.predict(features)
Retraining Schedule
Prefect or Airflow for orchestration:
Daily at 00:00 UTC:
1. Check performance trigger
2. Check PSI drift trigger
3. Check schedule trigger (if > 7 days since last training)
→ If any trigger fires → run retraining pipeline
→ On successful training → hot swap model
→ Notification to Telegram: "Model updated: v15 → v16, accuracy 0.567"
Developing an automatic retraining system with PSI drift detection, performance monitoring trigger, Prefect/Airflow orchestration, MLflow tracking and zero-downtime hot swap.







