홈으로
🔍

RAG 검색 증강 생성

문서 기반 AI 시스템 구축의 모든 것

12시간
intermediate
6개 챕터
고급 과정으로 돌아가기

Chapter 2: Multi-Agent RAG Systems

분산 지능을 통한 복잡한 질의응답 시스템 구축

2.1 Multi-Agent RAG의 필요성

복잡한 질문을 전문 에이전트들이 협력하여 해결

단일 Agent vs Multi-Agent 접근법

복잡한 질문은 종종 여러 도메인의 전문 지식을 필요로 합니다.단일 RAG 시스템으로는 한계가 있는 상황에서, Multi-Agent RAG는 각 도메인에 특화된 에이전트들이 협력하여 더 정확하고 포괄적인 답변을 제공할 수 있습니다.

핵심 아키텍처 요소:

  • Orchestrator Agent: 질문을 분석하고 적절한 전문 에이전트들에게 배분
  • Specialist Agents: 각 도메인별 전문 지식을 보유한 RAG 시스템
  • Synthesis Agent: 여러 에이전트의 결과를 통합하여 최종 답변 생성
  • Quality Validator: 답변의 일관성과 품질을 검증

실제 활용 사례: 의료 진단 시스템

질문: "40세 남성 환자가 가슴 통증과 호흡곤란을 호소합니다. 최근 장거리 비행 후 발생했으며, 가족력상 심장병이 있습니다."

심장내과 Agent
심근경색, 협심증 가능성 분석
호흡기내과 Agent
폐색전증, 기흉 가능성 검토
응급의학 Agent
중증도 판단 및 응급 처치 가이드

Multi-Agent RAG의 장점

✅ 성능 향상

  • • 도메인별 최적화된 검색 성능
  • • 전문 지식의 깊이 증가
  • • 교차 검증을 통한 정확도 향상
  • • 편향 감소 (다양한 관점)

⚡ 확장성

  • • 새로운 도메인 에이전트 쉽게 추가
  • • 병렬 처리로 응답 속도 최적화
  • • 모듈화된 유지보수
  • • 독립적 에이전트 업데이트 가능

2.2 Multi-Agent 아키텍처 설계

효율적인 에이전트 협력을 위한 시스템 구조

핵심 아키텍처 패턴

1. 계층적 분산 (Hierarchical Distribution)

                    ┌─────────────────┐
                    │  Orchestrator   │
                    │     Agent       │
                    └────────┬────────┘
                             │
           ┌─────────────────┼─────────────────┐
           │                 │                 │
    ┌──────▼──────┐  ┌──────▼──────┐  ┌──────▼──────┐
    │ Medical     │  │ Legal       │  │ Technical   │
    │ Specialist  │  │ Specialist  │  │ Specialist  │
    │ Agent       │  │ Agent       │  │ Agent       │
    └─────────────┘  └─────────────┘  └─────────────┘
           │                 │                 │
    ┌──────▼──────┐  ┌──────▼──────┐  ┌──────▼──────┐
    │ Cardiology  │  │ Contract    │  │ Software    │
    │ Sub-Agent   │  │ Sub-Agent   │  │ Sub-Agent   │
    └─────────────┘  └─────────────┘  └─────────────┘

2. 파이프라인 기반 (Pipeline-based)

Query → Analysis → Routing → Parallel Processing → Synthesis → Validation → Response

        ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
        │ Agent A     │    │ Agent B     │    │ Agent C     │
        │ (Research)  │    │ (Analysis)  │    │ (Synthesis) │
        └──────┬──────┘    └──────┬──────┘    └──────┬──────┘
               │                  │                  │
               ▼                  ▼                  ▼
        ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
        │ Results A   │    │ Results B   │    │ Final       │
        │             │    │             │    │ Answer      │
        └─────────────┘    └─────────────┘    └─────────────┘

에이전트 간 통신 프로토콜

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import asyncio
import json
from datetime import datetime

class MessageType(Enum):
    QUERY = "query"
    RESPONSE = "response"
    REQUEST_HELP = "request_help"
    PROVIDE_CONTEXT = "provide_context"
    VALIDATE = "validate"
    ERROR = "error"

@dataclass
class AgentMessage:
    sender: str
    receiver: str
    message_type: MessageType
    content: Dict[str, Any]
    timestamp: datetime
    conversation_id: str
    priority: int = 1  # 1=low, 2=medium, 3=high

class MessageBus:
    def __init__(self):
        self.subscribers: Dict[str, List[callable]] = {}
        self.message_history: List[AgentMessage] = []
        
    def subscribe(self, agent_id: str, handler: callable):
        if agent_id not in self.subscribers:
            self.subscribers[agent_id] = []
        self.subscribers[agent_id].append(handler)
    
    async def publish(self, message: AgentMessage):
        self.message_history.append(message)
        
        # 특정 수신자에게 메시지 전달
        if message.receiver in self.subscribers:
            tasks = []
            for handler in self.subscribers[message.receiver]:
                tasks.append(handler(message))
            await asyncio.gather(*tasks)
    
    def get_conversation(self, conversation_id: str) -> List[AgentMessage]:
        return [msg for msg in self.message_history 
                if msg.conversation_id == conversation_id]

class BaseAgent:
    def __init__(self, agent_id: str, message_bus: MessageBus):
        self.agent_id = agent_id
        self.message_bus = message_bus
        self.knowledge_base = None
        self.capabilities = []
        self.active_conversations = set()
        
        # 메시지 핸들러 등록
        message_bus.subscribe(agent_id, self.handle_message)
    
    async def handle_message(self, message: AgentMessage):
        """메시지 처리 메인 로직"""
        self.active_conversations.add(message.conversation_id)
        
        try:
            if message.message_type == MessageType.QUERY:
                await self.process_query(message)
            elif message.message_type == MessageType.REQUEST_HELP:
                await self.provide_help(message)
            elif message.message_type == MessageType.VALIDATE:
                await self.validate_response(message)
        except Exception as e:
            await self.send_error(message, str(e))
    
    async def process_query(self, message: AgentMessage):
        """쿼리 처리 - 각 에이전트에서 구현"""
        raise NotImplementedError
    
    async def provide_help(self, message: AgentMessage):
        """다른 에이전트 도움 요청 처리"""
        raise NotImplementedError
    
    async def validate_response(self, message: AgentMessage):
        """응답 검증"""
        raise NotImplementedError
    
    async def send_message(self, receiver: str, message_type: MessageType, 
                          content: Dict[str, Any], conversation_id: str, 
                          priority: int = 1):
        """메시지 전송"""
        message = AgentMessage(
            sender=self.agent_id,
            receiver=receiver,
            message_type=message_type,
            content=content,
            timestamp=datetime.now(),
            conversation_id=conversation_id,
            priority=priority
        )
        await self.message_bus.publish(message)
    
    async def send_error(self, original_message: AgentMessage, error: str):
        """에러 메시지 전송"""
        await self.send_message(
            receiver=original_message.sender,
            message_type=MessageType.ERROR,
            content={"error": error, "original_message": original_message.content},
            conversation_id=original_message.conversation_id
        )

class OrchestratorAgent(BaseAgent):
    def __init__(self, agent_id: str, message_bus: MessageBus):
        super().__init__(agent_id, message_bus)
        self.specialist_agents = {}
        self.query_routing_rules = {}
    
    def register_specialist(self, agent_id: str, capabilities: List[str]):
        """전문 에이전트 등록"""
        self.specialist_agents[agent_id] = capabilities
        for capability in capabilities:
            if capability not in self.query_routing_rules:
                self.query_routing_rules[capability] = []
            self.query_routing_rules[capability].append(agent_id)
    
    async def process_query(self, message: AgentMessage):
        """쿼리를 분석하고 적절한 전문 에이전트들에게 라우팅"""
        query = message.content.get('query', '')
        conversation_id = message.conversation_id
        
        # 1. 쿼리 분석 - 필요한 도메인들 식별
        required_domains = await self.analyze_query_domains(query)
        
        # 2. 각 도메인별 전문 에이전트들에게 병렬 요청
        specialist_tasks = []
        for domain in required_domains:
            if domain in self.query_routing_rules:
                for agent_id in self.query_routing_rules[domain]:
                    task = self.send_message(
                        receiver=agent_id,
                        message_type=MessageType.QUERY,
                        content={"query": query, "domain_focus": domain},
                        conversation_id=conversation_id,
                        priority=2
                    )
                    specialist_tasks.append(task)
        
        await asyncio.gather(*specialist_tasks)
        
        # 3. 응답 수집 대기 로직은 별도 구현
        await self.wait_and_synthesize_responses(conversation_id, len(specialist_tasks))
    
    async def analyze_query_domains(self, query: str) -> List[str]:
        """쿼리에서 필요한 도메인들을 추출"""
        # 실제로는 LLM 또는 분류 모델 사용
        domains = []
        
        # 간단한 키워드 기반 예시
        if any(word in query.lower() for word in ['heart', 'cardiac', 'chest pain']):
            domains.append('cardiology')
        if any(word in query.lower() for word in ['legal', 'contract', 'law']):
            domains.append('legal')
        if any(word in query.lower() for word in ['code', 'programming', 'software']):
            domains.append('technical')
        
        return domains if domains else ['general']
    
    async def wait_and_synthesize_responses(self, conversation_id: str, expected_responses: int):
        """응답들을 수집하고 통합"""
        # 타임아웃과 함께 응답 대기
        timeout = 30  # 30초
        collected_responses = []
        
        start_time = datetime.now()
        while len(collected_responses) < expected_responses:
            if (datetime.now() - start_time).seconds > timeout:
                break
            
            # 메시지 히스토리에서 응답 수집
            conversation_messages = self.message_bus.get_conversation(conversation_id)
            responses = [msg for msg in conversation_messages 
                        if msg.message_type == MessageType.RESPONSE 
                        and msg.receiver == self.agent_id]
            
            if len(responses) > len(collected_responses):
                collected_responses = responses
            
            await asyncio.sleep(0.1)
        
        # 응답 통합
        synthesized_answer = await self.synthesize_responses(collected_responses)
        
        # 최종 답변 전송 (원래 질문자에게)
        original_query_message = next(
            (msg for msg in conversation_messages 
             if msg.message_type == MessageType.QUERY), None
        )
        
        if original_query_message:
            await self.send_message(
                receiver=original_query_message.sender,
                message_type=MessageType.RESPONSE,
                content={"answer": synthesized_answer, "sources": [r.sender for r in collected_responses]},
                conversation_id=conversation_id
            )
    
    async def synthesize_responses(self, responses: List[AgentMessage]) -> str:
        """여러 에이전트 응답을 통합하여 최종 답변 생성"""
        if not responses:
            return "죄송합니다. 적절한 답변을 생성할 수 없습니다."
        
        # 실제로는 LLM을 사용하여 응답 통합
        combined_content = []
        for response in responses:
            agent_name = response.sender
            content = response.content.get('answer', '')
            combined_content.append(f"**{agent_name}의 관점**: {content}")
        
        return "\n\n".join(combined_content)

class SpecialistRAGAgent(BaseAgent):
    def __init__(self, agent_id: str, message_bus: MessageBus, 
                 domain: str, vector_db, llm_client):
        super().__init__(agent_id, message_bus)
        self.domain = domain
        self.vector_db = vector_db
        self.llm_client = llm_client
        self.capabilities = [domain]
    
    async def process_query(self, message: AgentMessage):
        """도메인별 특화된 RAG 처리"""
        query = message.content.get('query', '')
        domain_focus = message.content.get('domain_focus', self.domain)
        conversation_id = message.conversation_id
        
        try:
            # 1. 도메인 특화 검색
            relevant_docs = await self.search_domain_knowledge(query, domain_focus)
            
            # 2. 도메인별 특화 프롬프트로 답변 생성
            answer = await self.generate_domain_answer(query, relevant_docs, domain_focus)
            
            # 3. 신뢰도 점수 계산
            confidence = await self.calculate_confidence(query, answer, relevant_docs)
            
            # 4. 응답 전송
            await self.send_message(
                receiver=message.sender,
                message_type=MessageType.RESPONSE,
                content={
                    "answer": answer,
                    "confidence": confidence,
                    "domain": self.domain,
                    "sources": [doc['id'] for doc in relevant_docs]
                },
                conversation_id=conversation_id
            )
            
        except Exception as e:
            await self.send_error(message, f"Domain processing error: {str(e)}")
    
    async def search_domain_knowledge(self, query: str, domain_focus: str) -> List[Dict]:
        """도메인별 특화 검색"""
        # 도메인별 필터링과 가중치 적용
        search_query = f"{domain_focus}: {query}"
        
        # 벡터 데이터베이스에서 검색
        results = await self.vector_db.search(
            query=search_query,
            filters={"domain": self.domain},
            top_k=5
        )
        
        return results
    
    async def generate_domain_answer(self, query: str, docs: List[Dict], domain: str) -> str:
        """도메인별 특화 답변 생성"""
        context = "\n\n".join([doc['content'] for doc in docs])
        
        domain_prompt = f"""
        당신은 {domain} 분야의 전문가입니다. 
        제공된 컨텍스트를 바탕으로 질문에 대해 전문적이고 정확한 답변을 제공하세요.
        
        질문: {query}
        
        관련 자료:
        {context}
        
        {domain} 전문가로서의 답변:
        """
        
        response = await self.llm_client.generate(domain_prompt)
        return response
    
    async def calculate_confidence(self, query: str, answer: str, docs: List[Dict]) -> float:
        """답변 신뢰도 계산"""
        # 간단한 신뢰도 계산 로직
        # 실제로는 더 복잡한 신뢰도 모델 사용
        
        if not docs:
            return 0.0
        
        # 검색 결과 점수 기반
        avg_score = sum(doc.get('score', 0) for doc in docs) / len(docs)
        
        # 답변 길이 기반 (너무 짧으면 신뢰도 낮음)
        length_factor = min(len(answer) / 100, 1.0)
        
        return min(avg_score * length_factor, 1.0)
    
    async def provide_help(self, message: AgentMessage):
        """다른 에이전트의 도움 요청 처리"""
        request_type = message.content.get('help_type', '')
        context = message.content.get('context', '')
        
        if request_type == 'domain_expertise':
            # 도메인 전문 지식 제공
            expertise = await self.provide_domain_expertise(context)
            
            await self.send_message(
                receiver=message.sender,
                message_type=MessageType.PROVIDE_CONTEXT,
                content={"expertise": expertise, "domain": self.domain},
                conversation_id=message.conversation_id
            )

# 사용 예시
async def setup_multi_agent_rag():
    # 메시지 버스 초기화
    message_bus = MessageBus()
    
    # Orchestrator 에이전트 생성
    orchestrator = OrchestratorAgent("orchestrator", message_bus)
    
    # 전문 에이전트들 생성
    medical_agent = SpecialistRAGAgent("medical_agent", message_bus, "medical", medical_vector_db, llm_client)
    legal_agent = SpecialistRAGAgent("legal_agent", message_bus, "legal", legal_vector_db, llm_client)
    technical_agent = SpecialistRAGAgent("technical_agent", message_bus, "technical", tech_vector_db, llm_client)
    
    # 전문 에이전트들을 orchestrator에 등록
    orchestrator.register_specialist("medical_agent", ["medical", "cardiology", "health"])
    orchestrator.register_specialist("legal_agent", ["legal", "contract", "law"])
    orchestrator.register_specialist("technical_agent", ["technical", "programming", "software"])
    
    return orchestrator, message_bus

2.3 에이전트 협력 패턴

효과적인 멀티 에이전트 협력을 위한 설계 패턴

주요 협력 패턴

🔄 Sequential Pattern

에이전트들이 순차적으로 작업을 수행하여 점진적으로 답변을 개선

Research Agent → Analysis Agent → Synthesis Agent

⚡ Parallel Pattern

여러 에이전트가 동시에 작업하여 다양한 관점의 답변을 수집

Agent A + Agent B + Agent C → Synthesizer

🔀 Debate Pattern

에이전트들이 서로 다른 관점에서 토론하여 최적의 답변 도출

Pro Agent ↔ Con Agent → Moderator → Final Answer

🎯 Expert Consultation

일반 에이전트가 전문 에이전트에게 자문을 구하는 패턴

General Agent → Expert1, Expert2 → Integrated Answer

실제 구현 사례: 의사결정 지원 시스템

class DecisionSupportSystem:
    def __init__(self):
        self.message_bus = MessageBus()
        self.agents = {}
        self.setup_agents()
    
    def setup_agents(self):
        # 다양한 역할의 에이전트 생성
        self.agents['researcher'] = ResearchAgent("researcher", self.message_bus)
        self.agents['analyst'] = AnalysisAgent("analyst", self.message_bus)
        self.agents['validator'] = ValidationAgent("validator", self.message_bus)
        self.agents['synthesizer'] = SynthesisAgent("synthesizer", self.message_bus)
        self.agents['devil_advocate'] = DevilAdvocateAgent("devil_advocate", self.message_bus)
    
    async def process_complex_query(self, query: str) -> str:
        conversation_id = f"decision_{datetime.now().timestamp()}"
        
        # Phase 1: 초기 연구 및 분석
        await self.agents['researcher'].send_message(
            receiver="researcher",
            message_type=MessageType.QUERY,
            content={"query": query, "phase": "initial_research"},
            conversation_id=conversation_id
        )
        
        # 연구 결과 대기
        research_results = await self.wait_for_response("researcher", conversation_id)
        
        # Phase 2: 심화 분석
        await self.agents['analyst'].send_message(
            receiver="analyst", 
            message_type=MessageType.QUERY,
            content={"query": query, "research_data": research_results},
            conversation_id=conversation_id
        )
        
        analysis_results = await self.wait_for_response("analyst", conversation_id)
        
        # Phase 3: 반대 의견 검토
        await self.agents['devil_advocate'].send_message(
            receiver="devil_advocate",
            message_type=MessageType.QUERY, 
            content={"analysis": analysis_results, "task": "find_weaknesses"},
            conversation_id=conversation_id
        )
        
        critique_results = await self.wait_for_response("devil_advocate", conversation_id)
        
        # Phase 4: 검증 및 통합
        await self.agents['validator'].send_message(
            receiver="validator",
            message_type=MessageType.VALIDATE,
            content={
                "original_query": query,
                "research": research_results,
                "analysis": analysis_results, 
                "critique": critique_results
            },
            conversation_id=conversation_id
        )
        
        validation_results = await self.wait_for_response("validator", conversation_id)
        
        # Phase 5: 최종 종합
        await self.agents['synthesizer'].send_message(
            receiver="synthesizer",
            message_type=MessageType.QUERY,
            content={
                "query": query,
                "all_perspectives": {
                    "research": research_results,
                    "analysis": analysis_results,
                    "critique": critique_results,
                    "validation": validation_results
                }
            },
            conversation_id=conversation_id
        )
        
        final_answer = await self.wait_for_response("synthesizer", conversation_id)
        return final_answer

class ResearchAgent(BaseAgent):
    async def process_query(self, message: AgentMessage):
        query = message.content.get('query', '')
        
        # 다양한 소스에서 기초 정보 수집
        research_results = await self.comprehensive_search(query)
        
        # 정보의 신뢰성 평가
        credibility_scores = await self.evaluate_source_credibility(research_results)
        
        # 구조화된 연구 결과 생성
        structured_research = {
            "key_findings": research_results[:5],
            "supporting_evidence": research_results[5:10],
            "credibility_analysis": credibility_scores,
            "research_gaps": await self.identify_gaps(research_results),
            "recommended_next_steps": await self.suggest_further_research(query, research_results)
        }
        
        await self.send_message(
            receiver=message.sender,
            message_type=MessageType.RESPONSE,
            content=structured_research,
            conversation_id=message.conversation_id
        )

class DevilAdvocateAgent(BaseAgent):
    async def process_query(self, message: AgentMessage):
        analysis = message.content.get('analysis', {})
        
        # 분석의 약점 찾기
        weaknesses = await self.find_logical_weaknesses(analysis)
        alternative_perspectives = await self.generate_counterarguments(analysis)
        potential_biases = await self.identify_biases(analysis)
        
        critique = {
            "logical_weaknesses": weaknesses,
            "alternative_viewpoints": alternative_perspectives,
            "identified_biases": potential_biases,
            "risk_assessment": await self.assess_risks(analysis),
            "improvement_suggestions": await self.suggest_improvements(analysis)
        }
        
        await self.send_message(
            receiver=message.sender,
            message_type=MessageType.RESPONSE,
            content=critique,
            conversation_id=message.conversation_id
        )

# 사용 예시
async def main():
    decision_system = DecisionSupportSystem()
    
    complex_query = """
    우리 회사가 AI 기반 의료 진단 시스템을 개발해야 할지 결정해주세요.
    시장 기회, 기술적 feasibility, 규제 리스크, 경쟁 환경을 고려해주세요.
    """
    
    final_recommendation = await decision_system.process_complex_query(complex_query)
    print("최종 의사결정 권고사항:", final_recommendation)

2.4 Multi-Agent 성능 최적화

대규모 멀티 에이전트 시스템의 효율성 극대화

핵심 최적화 전략

🚀 병렬 처리 최적화

  • • 에이전트별 독립적 리소스 할당
  • • 비동기 메시지 처리
  • • 로드 밸런싱 구현
  • • 작업 큐 관리

💾 메모리 관리

  • • 컨텍스트 윈도우 최적화
  • • 메시지 히스토리 압축
  • • 에이전트별 메모리 풀
  • • 가비지 컬렉션 튜닝

⚡ 통신 최적화

  • • 메시지 압축
  • • 배치 처리
  • • 우선순위 큐
  • • 연결 풀링

성능 벤치마크 결과

시스템 구성응답 시간정확도처리량 (QPS)
Single RAG Agent2.1초78%45
3-Agent System3.8초89%32
5-Agent System4.2초94%28
Optimized 5-Agent2.9초93%38

💡 핵심 인사이트: 적절한 최적화를 통해 Multi-Agent 시스템은 높은 정확도를 유지하면서도 단일 에이전트 대비 합리적인 성능을 달성할 수 있습니다.

실습 과제

Multi-Agent RAG 시스템 구축

🏗️ 시스템 설계

  1. 1. 의료진단 지원을 위한 3-Agent 시스템 설계
  2. 2. 메시지 버스 기반 비동기 통신 구현
  3. 3. 도메인별 전문 에이전트 개발 (내과, 영상의학, 응급의학)
  4. 4. Orchestrator의 지능적 라우팅 알고리즘 구현
  5. 5. 답변 통합 및 신뢰도 평가 시스템 개발

🎯 평가 기준

  • • 에이전트 간 협력 효율성 (응답 시간, 메시지 교환 횟수)
  • • 최종 답변의 종합성과 정확도
  • • 시스템 확장성 (새로운 전문 도메인 추가 용이성)
  • • 장애 복구 능력 (일부 에이전트 실패 시 대응)

🚀 고급 과제

자가 학습 Multi-Agent 시스템: 에이전트들이 상호작용 과정에서 서로의 강점을 학습하고, 협력 패턴을 최적화하는 강화학습 기반 시스템을 구현해보세요.