Chat History Indexing for RAG

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
Chat History Indexing for RAG
Medium
from 1 week to 3 months
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823
  • image_logo-aider_0.jpg
    AIDER company logo development
    762
  • image_crm_chasseurs_493_0.webp
    CRM development for Chasseurs
    848

Chat History Indexing for RAG

Indexing chat history (Slack, Teams, Telegram, Discord) allows RAG systems to answer questions using accumulated informal team knowledge — problem solutions, architecture discussions, expert answers. Main challenges: chat unstructuredness, conversational context, and confidentiality.

Slack Integration

from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

class SlackIndexer:
    def __init__(self, token: str):
        self.client = WebClient(token=token)

    def get_messages(self, channel_id: str,
                     oldest: float = None,
                     limit: int = 1000) -> list[dict]:
        messages = []
        cursor = None

        while True:
            params = {
                'channel': channel_id,
                'limit': 200,
                'oldest': oldest
            }
            if cursor:
                params['cursor'] = cursor

            result = self.client.conversations_history(**params)
            messages.extend(result['messages'])

            if not result.get('has_more') or len(messages) >= limit:
                break
            cursor = result['response_metadata']['next_cursor']

        return messages

    def reconstruct_thread(self, channel_id: str,
                           thread_ts: str) -> list[dict]:
        """Loading a complete thread"""
        result = self.client.conversations_replies(
            channel=channel_id,
            ts=thread_ts
        )
        return result['messages']

    def messages_to_document(self, messages: list[dict],
                              channel_name: str) -> dict:
        """Converting a set of messages into an indexable document"""
        # Filtering service messages
        relevant = [
            m for m in messages
            if m.get('type') == 'message'
            and not m.get('subtype')  # Remove channel_join, bot_message, etc.
            and len(m.get('text', '')) > 20
        ]

        if not relevant:
            return None

        # Grouping into sessions (messages within 1 hour)
        sessions = self._group_into_sessions(relevant, gap_hours=1)
        documents = []

        for session in sessions:
            text = '\n'.join([
                f"[{self._get_username(m['user'])}]: {m['text']}"
                for m in session
                if m.get('user')
            ])

            # Resolving user and channel mentions
            text = self._resolve_mentions(text)

            documents.append({
                'text': text,
                'channel': channel_name,
                'timestamp_start': session[0]['ts'],
                'timestamp_end': session[-1]['ts'],
                'participants': list(set(m.get('user') for m in session if m.get('user'))),
                'message_count': len(session)
            })

        return documents

Smart Chunking Strategy for Chats

class ChatChunker:
    def chunk_by_topic(self, messages: list[dict],
                        similarity_threshold: float = 0.6) -> list[list]:
        """Splitting into topic groups, not by fixed size"""
        from sentence_transformers import SentenceTransformer
        model = SentenceTransformer('all-MiniLM-L6-v2')

        texts = [m.get('text', '') for m in messages]
        embeddings = model.encode(texts)

        # Splitting where topic changes dramatically
        chunks = [[messages[0]]]
        for i in range(1, len(messages)):
            sim = np.dot(embeddings[i], embeddings[i-1]) / (
                np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i-1])
            )
            if sim < similarity_threshold:
                chunks.append([])
            chunks[-1].append(messages[i])

        return chunks

Anonymization Before Indexing

class ChatAnonymizer:
    def anonymize(self, text: str, user_mapping: dict) -> str:
        """Replacing usernames with anonymous IDs"""
        for real_name, anon_id in user_mapping.items():
            text = text.replace(f"@{real_name}", f"@user_{anon_id}")
            text = text.replace(real_name, f"[User {anon_id}]")
        return text

For corporate Slack, indexing should: exclude private messages (DM), respect retention policies (messages older than N days are deleted), and provide the ability to exclude specific channels or users by request.