Automated model retraining system development

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
Automated model retraining system development
Complex
~1-2 weeks
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1170
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1092
  • image_logo-advance_0.png
    B2B Advance company logo design
    563
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    830
  • image_logo-aider_0.jpg
    AIDER company logo development
    763
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    878

Development of Automatic Model Retraining System

ML models for trading degrade over time: market regimes change, relationships between variables shift. An automatic retraining system detects degradation and triggers new training without manual intervention.

Retraining Triggers

Performance-based trigger: model's directional accuracy falls below threshold over a rolling window:

class RetrainingTrigger:
    def __init__(self, performance_threshold=0.52, window_days=14, 
                 min_predictions=100):
        self.threshold = performance_threshold
        self.window = window_days
        self.min_predictions = min_predictions
    
    def should_retrain(self, recent_predictions, recent_actuals):
        if len(recent_predictions) < self.min_predictions:
            return False, 'insufficient_data'
        
        accuracy = np.mean(
            np.sign(recent_predictions) == np.sign(recent_actuals)
        )
        
        if accuracy < self.threshold:
            return True, f'accuracy_{accuracy:.3f}_below_{self.threshold}'
        
        return False, 'performance_ok'
    
    def check_data_drift(self, train_features, current_features):
        """Population Stability Index (PSI) for feature drift"""
        psi_values = {}
        
        for col in train_features.columns:
            # Divide into 10 bins based on training data
            bins = np.percentile(train_features[col].dropna(), 
                                np.linspace(0, 100, 11))
            bins[0] -= 1e-8
            
            train_counts = np.histogram(train_features[col], bins=bins)[0]
            current_counts = np.histogram(current_features[col], bins=bins)[0]
            
            # PSI
            train_pct = train_counts / train_counts.sum()
            current_pct = current_counts / current_counts.sum()
            
            # Avoid log(0)
            train_pct = np.clip(train_pct, 1e-8, None)
            current_pct = np.clip(current_pct, 1e-8, None)
            
            psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
            psi_values[col] = psi
        
        # PSI > 0.2 = significant drift
        max_psi = max(psi_values.values())
        n_drifted = sum(1 for v in psi_values.values() if v > 0.2)
        
        return {
            'max_psi': max_psi,
            'n_drifted_features': n_drifted,
            'should_retrain': max_psi > 0.25 or n_drifted > 3,
            'psi_by_feature': psi_values
        }
    
    def check_scheduled(self, last_training_date, retrain_frequency_days=7):
        """Scheduled retraining by schedule"""
        days_since_training = (datetime.utcnow() - last_training_date).days
        return days_since_training >= retrain_frequency_days

Automatic Training Pipeline

import mlflow
from prefect import flow, task

@task
def fetch_training_data(symbol, lookback_days=365):
    """Load data for retraining"""
    end_date = datetime.utcnow()
    start_date = end_date - timedelta(days=lookback_days)
    # Load from ClickHouse/PostgreSQL
    return load_ohlcv_data(symbol, start_date, end_date)

@task
def prepare_features(raw_data):
    """Feature engineering"""
    from feature_pipeline import FeatureEngineer
    engineer = FeatureEngineer()
    return engineer.create_all_features(raw_data)

@task
def train_and_evaluate(features_df, target_col, model_config):
    """Train model with walk-forward validation"""
    from training import WalkForwardTrainer
    
    trainer = WalkForwardTrainer(
        n_splits=5,
        test_size=60,  # 60 days test set
        gap=24  # gap between train and test (hours)
    )
    
    with mlflow.start_run():
        model, metrics = trainer.fit_evaluate(features_df, target_col, model_config)
        
        # Log metrics to MLflow
        mlflow.log_metrics(metrics)
        mlflow.log_params(model_config)
        mlflow.sklearn.log_model(model, 'model')
        
        run_id = mlflow.active_run().info.run_id
    
    return model, metrics, run_id

@task
def validate_and_promote(model, metrics, run_id, min_metrics):
    """Check quality and decide on deployment"""
    passes_validation = (
        metrics.get('directional_accuracy', 0) >= min_metrics['accuracy'] and
        metrics.get('sharpe_ratio', 0) >= min_metrics['sharpe'] and
        metrics.get('max_drawdown', 1) <= min_metrics['max_drawdown']
    )
    
    if passes_validation:
        # Register as new Production version
        client = mlflow.tracking.MlflowClient()
        model_version = client.create_model_version(
            name='crypto_predictor',
            source=f'runs:/{run_id}/model',
            run_id=run_id
        )
        client.transition_model_version_stage(
            'crypto_predictor', model_version.version, 'Production'
        )
        return True, model_version.version
    
    return False, None

@flow(name="model_retraining_pipeline")
def retrain_model_pipeline(symbol, model_config, min_metrics):
    raw_data = fetch_training_data(symbol)
    features_df = prepare_features(raw_data)
    model, metrics, run_id = train_and_evaluate(features_df, 'target', model_config)
    promoted, version = validate_and_promote(model, metrics, run_id, min_metrics)
    
    return {'promoted': promoted, 'version': version, 'metrics': metrics}

Zero-downtime Model Updates

Upon successful training of a new model, replace the old one without stopping trading:

class ModelHotSwapper:
    def __init__(self):
        self.current_model = None
        self.model_version = None
        self._lock = asyncio.Lock()
    
    async def swap_model(self, new_model, new_version):
        """Thread-safe model replacement"""
        async with self._lock:
            old_model = self.current_model
            old_version = self.model_version
            
            self.current_model = new_model
            self.model_version = new_version
            
            # Log model swap
            logger.info(f"Model swapped: {old_version} -> {new_version}")
            
            # Old model can be unloaded from memory
            del old_model
    
    async def predict(self, features):
        async with self._lock:
            return self.current_model.predict(features)

Retraining Schedule

Prefect or Airflow for orchestration:

Daily at 00:00 UTC:
    1. Check performance trigger
    2. Check PSI drift trigger
    3. Check schedule trigger (if > 7 days since last training)
    → If any trigger fires → run retraining pipeline
    → On successful training → hot swap model
    → Notification to Telegram: "Model updated: v15 → v16, accuracy 0.567"

Developing an automatic retraining system with PSI drift detection, performance monitoring trigger, Prefect/Airflow orchestration, MLflow tracking and zero-downtime hot swap.