AI Workflow Development with Branching and Conditional Logic

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Workflow Development with Branching and Conditional Logic
Medium
from 1 week to 3 months
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

AI Workflow with Branching and Conditional Logic

An AI workflow with branching is a system where the execution flow is determined dynamically based on intermediate results, input data, or LLM decisions. This is more complex than linear pipelines, but necessary for real business processes where different inputs require different processing paths.

Types of Conditional Branching

Deterministic branching: condition is determined by code based on data (if/else, switch).

LLM-based branching: condition is determined by language model decision (classification, routing).

Hybrid: code handles structured conditions, LLM handles unstructured ones.

Implementation with LangGraph

from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, Literal, Optional
import json

class WorkflowState(TypedDict):
    input_document: str
    document_type: Optional[str]
    extracted_data: Optional[dict]
    validation_errors: list[str]
    processing_path: str
    output: Optional[dict]

llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Node 1: Document classification
def classify_document(state: WorkflowState) -> WorkflowState:
    response = llm.invoke(f"""Determine the document type.
Document types: invoice, contract, complaint, inquiry, other

Document: {state['input_document'][:500]}

Answer with one word:""")

    return {**state, "document_type": response.content.strip().lower()}

# Node 2a: Invoice processing
def process_invoice(state: WorkflowState) -> WorkflowState:
    response = llm.invoke(f"""Extract invoice data.
{state['input_document']}
Return JSON: {{vendor, amount, date, due_date, items}}""")

    return {**state, "extracted_data": json.loads(response.content), "processing_path": "invoice"}

# Node 2b: Contract processing
def process_contract(state: WorkflowState) -> WorkflowState:
    response = llm.invoke(f"""Extract key contract terms.
{state['input_document']}
Return JSON: {{parties, subject, amount, duration, key_conditions}}""")

    return {**state, "extracted_data": json.loads(response.content), "processing_path": "contract"}

# Node 2c: Complaint processing
def process_complaint(state: WorkflowState) -> WorkflowState:
    response = llm.invoke(f"""Classify the complaint.
{state['input_document']}
Return JSON: {{category, severity: low/medium/high/critical, requires_immediate_action: bool}}""")

    return {**state, "extracted_data": json.loads(response.content), "processing_path": "complaint"}

# Node 3: Validation
def validate_data(state: WorkflowState) -> WorkflowState:
    errors = []
    data = state.get("extracted_data", {})

    if state["document_type"] == "invoice":
        if not data.get("amount"):
            errors.append("Missing invoice amount")
        if not data.get("vendor"):
            errors.append("Missing vendor information")

    return {**state, "validation_errors": errors}

# Node 4a: Successful completion
def finalize_success(state: WorkflowState) -> WorkflowState:
    return {**state, "output": {
        "status": "processed",
        "path": state["processing_path"],
        "data": state["extracted_data"],
    }}

# Node 4b: Error handling
def handle_validation_errors(state: WorkflowState) -> WorkflowState:
    return {**state, "output": {
        "status": "validation_failed",
        "errors": state["validation_errors"],
        "requires_manual_review": True,
    }}

# Routing functions
def route_by_document_type(state: WorkflowState) -> str:
    mapping = {
        "invoice": "process_invoice",
        "contract": "process_contract",
        "complaint": "process_complaint",
    }
    return mapping.get(state["document_type"], "process_unknown")

def route_after_validation(state: WorkflowState) -> str:
    return "handle_errors" if state["validation_errors"] else "finalize"

# Build the graph
graph = StateGraph(WorkflowState)

graph.add_node("classify", classify_document)
graph.add_node("process_invoice", process_invoice)
graph.add_node("process_contract", process_contract)
graph.add_node("process_complaint", process_complaint)
graph.add_node("validate", validate_data)
graph.add_node("finalize", finalize_success)
graph.add_node("handle_errors", handle_validation_errors)

graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_by_document_type, {
    "process_invoice": "process_invoice",
    "process_contract": "process_contract",
    "process_complaint": "process_complaint",
    "process_unknown": "handle_errors",
})
graph.add_edge("process_invoice", "validate")
graph.add_edge("process_contract", "validate")
graph.add_edge("process_complaint", "validate")
graph.add_conditional_edges("validate", route_after_validation, {
    "finalize": "finalize",
    "handle_errors": "handle_errors",
})
graph.add_edge("finalize", END)
graph.add_edge("handle_errors", END)

workflow = graph.compile()

Cyclic Branching: Retry and Correction Loops

MAX_RETRIES = 3

def check_quality_and_retry(state: WorkflowState) -> str:
    """Decides: accept result or send for reprocessing"""
    if state.get("retry_count", 0) >= MAX_RETRIES:
        return "accept"  # Accept even imperfect result

    quality = assess_output_quality(state["output"])
    if quality < 0.8:
        return "retry"
    return "accept"

def increment_retry(state: WorkflowState) -> WorkflowState:
    return {**state, "retry_count": state.get("retry_count", 0) + 1}

# Add loop to graph
graph.add_conditional_edges("quality_check", check_quality_and_retry, {
    "retry": "processing_node",
    "accept": "finalize",
})

Practical Case Study: Incoming Correspondence Processing Workflow

Task: automatic processing of 500+ incoming documents per day (email attachments, portal uploads).

Branching graph:

Incoming document
    → Classification (8 types)
    ↓
Type: invoice    → Extract details → Match with contract → [OK: 1C] / [error: accountant]
Type: contract   → Extract terms   → Legal risk score  → [low: archive] / [medium+: lawyer]
Type: complaint  → Severity assessment → [critical: immediate] / [normal: queue]
Type: request    → Subject matter   → Route to department

Metrics:

  • Auto-processing without manual intervention: 71%
  • Routing time: instant (vs 45 min manually)
  • Classification accuracy: 94%
  • Routing errors: 2.1%

Graph Visualization

LangGraph provides built-in visualization:

from IPython.display import Image

Image(workflow.get_graph().draw_mermaid_png())

Timeline

  • Workflow graph design: 1 week
  • Implementation of nodes and branching: 2–3 weeks
  • Testing edge cases: 1–2 weeks
  • Total: 4–6 weeks