AI Data Analyst — Digital Worker
AI Data Analyst autonomously answers business questions through data: generates SQL, executes code, builds visualizations, interprets results, prepares reports. Unlike BI tools with fixed dashboards, AI analyst handles arbitrary ad-hoc queries in natural language.
Text-to-SQL Core
from openai import AsyncOpenAI
from typing import Optional
import pandas as pd
import json
client = AsyncOpenAI()
class SQLGenerator:
def __init__(self, schema: dict):
"""
schema: {
"table_name": {
"columns": [{"name": "...", "type": "...", "description": "..."}],
"description": "...",
"relationships": [...]
}
}
"""
self.schema = schema
self.schema_context = self._format_schema()
def _format_schema(self) -> str:
parts = []
for table, info in self.schema.items():
cols = ", ".join(
f"{c['name']} {c['type']} -- {c.get('description', '')}"
for c in info["columns"]
)
parts.append(f"-- {info.get('description', '')}\nCREATE TABLE {table} ({cols});")
return "\n\n".join(parts)
async def generate_sql(self, question: str) -> dict:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "system",
"content": f"""You are a data analyst. Generate SELECT queries only.
Database schema:
{self.schema_context}
Rules:
- Always use explicit JOINs (not implicit)
- For time series — GROUP BY date with appropriate granularity
- If question is ambiguous — choose most likely interpretation and state assumption
- Return JSON: {{"sql": "...", "assumption": "...", "chart_type": "bar|line|pie|table"}}"""
}, {
"role": "user",
"content": question,
}],
response_format={"type": "json_object"},
)
return json.loads(response.choices[0].message.content)
class DataAnalystAgent:
def __init__(self, db_connection, schema: dict):
self.db = db_connection
self.sql_gen = SQLGenerator(schema)
async def answer(self, question: str) -> dict:
"""Full cycle: question → SQL → data → interpretation"""
# SQL generation
sql_result = await self.sql_gen.generate_sql(question)
sql = sql_result["sql"]
# Query execution
try:
df = await asyncio.get_event_loop().run_in_executor(
None, pd.read_sql, sql, self.db
)
except Exception as e:
# Attempt to fix SQL
fixed = await self.fix_sql_error(sql, str(e))
df = await asyncio.get_event_loop().run_in_executor(
None, pd.read_sql, fixed, self.db
)
# Result interpretation
interpretation = await self.interpret_results(question, df)
return {
"question": question,
"sql": sql,
"data": df.to_dict("records")[:100],
"summary": df.describe().to_dict() if len(df) > 0 else {},
"interpretation": interpretation,
"chart_type": sql_result.get("chart_type", "table"),
"assumption": sql_result.get("assumption"),
}
async def interpret_results(self, question: str, df: pd.DataFrame) -> str:
if df.empty:
return "Query returned no data. Check filter conditions."
stats = df.describe().to_string() if df.select_dtypes(include="number").shape[1] > 0 else ""
sample = df.head(10).to_string()
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "system",
"content": "Interpret query results for business audience. Highlight key insights, anomalies, trends. Use specific numbers."
}, {
"role": "user",
"content": f"Question: {question}\nStatistics:\n{stats}\nData sample:\n{sample}",
}],
)
return response.choices[0].message.content
Automated Reporting
class AutomatedReportingSystem:
"""System for automatic analytical reports"""
REPORT_SCHEDULE = {
"daily_sales": {
"cron": "0 8 * * *",
"questions": [
"Revenue yesterday vs week ago",
"Top 10 products by revenue yesterday",
"Transaction anomalies yesterday",
],
"recipients": ["[email protected]", "[email protected]"],
},
"weekly_cohort": {
"cron": "0 9 * * 1",
"questions": [
"Retention cohorts last 8 weeks",
"LTV by acquisition channel",
"Churn rate this week vs previous 4 weeks",
],
"recipients": ["[email protected]"],
},
}
async def generate_scheduled_report(self, report_name: str) -> str:
config = self.REPORT_SCHEDULE[report_name]
analyst = DataAnalystAgent(self.db, self.schema)
sections = []
for question in config["questions"]:
result = await analyst.answer(question)
chart = await self.create_visualization(result)
sections.append({
"question": question,
"interpretation": result["interpretation"],
"chart_url": chart,
})
return await self.format_report(report_name, sections)
Anomaly Alerts
class AnomalyDetector:
async def detect_and_alert(self) -> list[dict]:
"""Daily detection of statistical anomalies in key metrics"""
metrics_to_monitor = [
{"name": "daily_revenue", "query": "SELECT SUM(amount) FROM orders WHERE date = CURRENT_DATE"},
{"name": "conversion_rate", "query": "..."},
{"name": "api_error_rate", "query": "..."},
]
alerts = []
for metric in metrics_to_monitor:
current_value = await self.db.fetchval(metric["query"])
historical = await self.db.fetch(metric["history_query"])
mean = statistics.mean(historical)
stdev = statistics.stdev(historical)
z_score = (current_value - mean) / stdev if stdev > 0 else 0
if abs(z_score) > 2.5:
# Ask LLM to interpret anomaly
interpretation = await self.interpret_anomaly(metric, current_value, mean, z_score)
alerts.append({
"metric": metric["name"],
"current": current_value,
"expected_range": (mean - 2 * stdev, mean + 2 * stdev),
"z_score": z_score,
"interpretation": interpretation,
})
return alerts
Case Study: E-Commerce, 15 Analytical Queries/Day
Situation: marketing team of 5, 15–20 ad-hoc analytical questions daily to 2 analysts. Average response time — 4 hours.
AI Data Analyst:
- Access to PostgreSQL with 12 tables (orders, customers, products, traffic)
- SQL generation + execution + visualization (matplotlib/plotly)
- Slack bot for ad-hoc queries
Results:
- Average response time: 4 hours → 2 minutes
- First-attempt SQL correctness: 81% (remainder auto-corrected)
- Analysts refocused on: complex analysis, experiments, forecasting
- Team satisfaction: 4.3/5.0
Timeline
- Text-to-SQL with your schema: 1–2 weeks
- Automated reports and visualizations: 1–2 weeks
- Slack/Teams integration for ad-hoc queries: 1 week
- Anomaly detection: 1 week
- Total: 4–6 weeks







