고급 과정으로 돌아가기
Chapter 2: Multi-Agent RAG Systems
분산 지능을 통한 복잡한 질의응답 시스템 구축
2.1 Multi-Agent RAG의 필요성
복잡한 질문을 전문 에이전트들이 협력하여 해결
단일 Agent vs Multi-Agent 접근법
복잡한 질문은 종종 여러 도메인의 전문 지식을 필요로 합니다.단일 RAG 시스템으로는 한계가 있는 상황에서, Multi-Agent RAG는 각 도메인에 특화된 에이전트들이 협력하여 더 정확하고 포괄적인 답변을 제공할 수 있습니다.
핵심 아키텍처 요소:
- Orchestrator Agent: 질문을 분석하고 적절한 전문 에이전트들에게 배분
- Specialist Agents: 각 도메인별 전문 지식을 보유한 RAG 시스템
- Synthesis Agent: 여러 에이전트의 결과를 통합하여 최종 답변 생성
- Quality Validator: 답변의 일관성과 품질을 검증
실제 활용 사례: 의료 진단 시스템
질문: "40세 남성 환자가 가슴 통증과 호흡곤란을 호소합니다. 최근 장거리 비행 후 발생했으며, 가족력상 심장병이 있습니다."
심장내과 Agent
심근경색, 협심증 가능성 분석
심근경색, 협심증 가능성 분석
호흡기내과 Agent
폐색전증, 기흉 가능성 검토
폐색전증, 기흉 가능성 검토
응급의학 Agent
중증도 판단 및 응급 처치 가이드
중증도 판단 및 응급 처치 가이드
Multi-Agent RAG의 장점
✅ 성능 향상
- • 도메인별 최적화된 검색 성능
- • 전문 지식의 깊이 증가
- • 교차 검증을 통한 정확도 향상
- • 편향 감소 (다양한 관점)
⚡ 확장성
- • 새로운 도메인 에이전트 쉽게 추가
- • 병렬 처리로 응답 속도 최적화
- • 모듈화된 유지보수
- • 독립적 에이전트 업데이트 가능
2.2 Multi-Agent 아키텍처 설계
효율적인 에이전트 협력을 위한 시스템 구조
핵심 아키텍처 패턴
1. 계층적 분산 (Hierarchical Distribution)
┌─────────────────┐
│ Orchestrator │
│ Agent │
└────────┬────────┘
│
┌─────────────────┼─────────────────┐
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Medical │ │ Legal │ │ Technical │
│ Specialist │ │ Specialist │ │ Specialist │
│ Agent │ │ Agent │ │ Agent │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ Cardiology │ │ Contract │ │ Software │
│ Sub-Agent │ │ Sub-Agent │ │ Sub-Agent │
└─────────────┘ └─────────────┘ └─────────────┘2. 파이프라인 기반 (Pipeline-based)
Query → Analysis → Routing → Parallel Processing → Synthesis → Validation → Response
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Agent A │ │ Agent B │ │ Agent C │
│ (Research) │ │ (Analysis) │ │ (Synthesis) │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Results A │ │ Results B │ │ Final │
│ │ │ │ │ Answer │
└─────────────┘ └─────────────┘ └─────────────┘에이전트 간 통신 프로토콜
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import asyncio
import json
from datetime import datetime
class MessageType(Enum):
QUERY = "query"
RESPONSE = "response"
REQUEST_HELP = "request_help"
PROVIDE_CONTEXT = "provide_context"
VALIDATE = "validate"
ERROR = "error"
@dataclass
class AgentMessage:
sender: str
receiver: str
message_type: MessageType
content: Dict[str, Any]
timestamp: datetime
conversation_id: str
priority: int = 1 # 1=low, 2=medium, 3=high
class MessageBus:
def __init__(self):
self.subscribers: Dict[str, List[callable]] = {}
self.message_history: List[AgentMessage] = []
def subscribe(self, agent_id: str, handler: callable):
if agent_id not in self.subscribers:
self.subscribers[agent_id] = []
self.subscribers[agent_id].append(handler)
async def publish(self, message: AgentMessage):
self.message_history.append(message)
# 특정 수신자에게 메시지 전달
if message.receiver in self.subscribers:
tasks = []
for handler in self.subscribers[message.receiver]:
tasks.append(handler(message))
await asyncio.gather(*tasks)
def get_conversation(self, conversation_id: str) -> List[AgentMessage]:
return [msg for msg in self.message_history
if msg.conversation_id == conversation_id]
class BaseAgent:
def __init__(self, agent_id: str, message_bus: MessageBus):
self.agent_id = agent_id
self.message_bus = message_bus
self.knowledge_base = None
self.capabilities = []
self.active_conversations = set()
# 메시지 핸들러 등록
message_bus.subscribe(agent_id, self.handle_message)
async def handle_message(self, message: AgentMessage):
"""메시지 처리 메인 로직"""
self.active_conversations.add(message.conversation_id)
try:
if message.message_type == MessageType.QUERY:
await self.process_query(message)
elif message.message_type == MessageType.REQUEST_HELP:
await self.provide_help(message)
elif message.message_type == MessageType.VALIDATE:
await self.validate_response(message)
except Exception as e:
await self.send_error(message, str(e))
async def process_query(self, message: AgentMessage):
"""쿼리 처리 - 각 에이전트에서 구현"""
raise NotImplementedError
async def provide_help(self, message: AgentMessage):
"""다른 에이전트 도움 요청 처리"""
raise NotImplementedError
async def validate_response(self, message: AgentMessage):
"""응답 검증"""
raise NotImplementedError
async def send_message(self, receiver: str, message_type: MessageType,
content: Dict[str, Any], conversation_id: str,
priority: int = 1):
"""메시지 전송"""
message = AgentMessage(
sender=self.agent_id,
receiver=receiver,
message_type=message_type,
content=content,
timestamp=datetime.now(),
conversation_id=conversation_id,
priority=priority
)
await self.message_bus.publish(message)
async def send_error(self, original_message: AgentMessage, error: str):
"""에러 메시지 전송"""
await self.send_message(
receiver=original_message.sender,
message_type=MessageType.ERROR,
content={"error": error, "original_message": original_message.content},
conversation_id=original_message.conversation_id
)
class OrchestratorAgent(BaseAgent):
def __init__(self, agent_id: str, message_bus: MessageBus):
super().__init__(agent_id, message_bus)
self.specialist_agents = {}
self.query_routing_rules = {}
def register_specialist(self, agent_id: str, capabilities: List[str]):
"""전문 에이전트 등록"""
self.specialist_agents[agent_id] = capabilities
for capability in capabilities:
if capability not in self.query_routing_rules:
self.query_routing_rules[capability] = []
self.query_routing_rules[capability].append(agent_id)
async def process_query(self, message: AgentMessage):
"""쿼리를 분석하고 적절한 전문 에이전트들에게 라우팅"""
query = message.content.get('query', '')
conversation_id = message.conversation_id
# 1. 쿼리 분석 - 필요한 도메인들 식별
required_domains = await self.analyze_query_domains(query)
# 2. 각 도메인별 전문 에이전트들에게 병렬 요청
specialist_tasks = []
for domain in required_domains:
if domain in self.query_routing_rules:
for agent_id in self.query_routing_rules[domain]:
task = self.send_message(
receiver=agent_id,
message_type=MessageType.QUERY,
content={"query": query, "domain_focus": domain},
conversation_id=conversation_id,
priority=2
)
specialist_tasks.append(task)
await asyncio.gather(*specialist_tasks)
# 3. 응답 수집 대기 로직은 별도 구현
await self.wait_and_synthesize_responses(conversation_id, len(specialist_tasks))
async def analyze_query_domains(self, query: str) -> List[str]:
"""쿼리에서 필요한 도메인들을 추출"""
# 실제로는 LLM 또는 분류 모델 사용
domains = []
# 간단한 키워드 기반 예시
if any(word in query.lower() for word in ['heart', 'cardiac', 'chest pain']):
domains.append('cardiology')
if any(word in query.lower() for word in ['legal', 'contract', 'law']):
domains.append('legal')
if any(word in query.lower() for word in ['code', 'programming', 'software']):
domains.append('technical')
return domains if domains else ['general']
async def wait_and_synthesize_responses(self, conversation_id: str, expected_responses: int):
"""응답들을 수집하고 통합"""
# 타임아웃과 함께 응답 대기
timeout = 30 # 30초
collected_responses = []
start_time = datetime.now()
while len(collected_responses) < expected_responses:
if (datetime.now() - start_time).seconds > timeout:
break
# 메시지 히스토리에서 응답 수집
conversation_messages = self.message_bus.get_conversation(conversation_id)
responses = [msg for msg in conversation_messages
if msg.message_type == MessageType.RESPONSE
and msg.receiver == self.agent_id]
if len(responses) > len(collected_responses):
collected_responses = responses
await asyncio.sleep(0.1)
# 응답 통합
synthesized_answer = await self.synthesize_responses(collected_responses)
# 최종 답변 전송 (원래 질문자에게)
original_query_message = next(
(msg for msg in conversation_messages
if msg.message_type == MessageType.QUERY), None
)
if original_query_message:
await self.send_message(
receiver=original_query_message.sender,
message_type=MessageType.RESPONSE,
content={"answer": synthesized_answer, "sources": [r.sender for r in collected_responses]},
conversation_id=conversation_id
)
async def synthesize_responses(self, responses: List[AgentMessage]) -> str:
"""여러 에이전트 응답을 통합하여 최종 답변 생성"""
if not responses:
return "죄송합니다. 적절한 답변을 생성할 수 없습니다."
# 실제로는 LLM을 사용하여 응답 통합
combined_content = []
for response in responses:
agent_name = response.sender
content = response.content.get('answer', '')
combined_content.append(f"**{agent_name}의 관점**: {content}")
return "\n\n".join(combined_content)
class SpecialistRAGAgent(BaseAgent):
def __init__(self, agent_id: str, message_bus: MessageBus,
domain: str, vector_db, llm_client):
super().__init__(agent_id, message_bus)
self.domain = domain
self.vector_db = vector_db
self.llm_client = llm_client
self.capabilities = [domain]
async def process_query(self, message: AgentMessage):
"""도메인별 특화된 RAG 처리"""
query = message.content.get('query', '')
domain_focus = message.content.get('domain_focus', self.domain)
conversation_id = message.conversation_id
try:
# 1. 도메인 특화 검색
relevant_docs = await self.search_domain_knowledge(query, domain_focus)
# 2. 도메인별 특화 프롬프트로 답변 생성
answer = await self.generate_domain_answer(query, relevant_docs, domain_focus)
# 3. 신뢰도 점수 계산
confidence = await self.calculate_confidence(query, answer, relevant_docs)
# 4. 응답 전송
await self.send_message(
receiver=message.sender,
message_type=MessageType.RESPONSE,
content={
"answer": answer,
"confidence": confidence,
"domain": self.domain,
"sources": [doc['id'] for doc in relevant_docs]
},
conversation_id=conversation_id
)
except Exception as e:
await self.send_error(message, f"Domain processing error: {str(e)}")
async def search_domain_knowledge(self, query: str, domain_focus: str) -> List[Dict]:
"""도메인별 특화 검색"""
# 도메인별 필터링과 가중치 적용
search_query = f"{domain_focus}: {query}"
# 벡터 데이터베이스에서 검색
results = await self.vector_db.search(
query=search_query,
filters={"domain": self.domain},
top_k=5
)
return results
async def generate_domain_answer(self, query: str, docs: List[Dict], domain: str) -> str:
"""도메인별 특화 답변 생성"""
context = "\n\n".join([doc['content'] for doc in docs])
domain_prompt = f"""
당신은 {domain} 분야의 전문가입니다.
제공된 컨텍스트를 바탕으로 질문에 대해 전문적이고 정확한 답변을 제공하세요.
질문: {query}
관련 자료:
{context}
{domain} 전문가로서의 답변:
"""
response = await self.llm_client.generate(domain_prompt)
return response
async def calculate_confidence(self, query: str, answer: str, docs: List[Dict]) -> float:
"""답변 신뢰도 계산"""
# 간단한 신뢰도 계산 로직
# 실제로는 더 복잡한 신뢰도 모델 사용
if not docs:
return 0.0
# 검색 결과 점수 기반
avg_score = sum(doc.get('score', 0) for doc in docs) / len(docs)
# 답변 길이 기반 (너무 짧으면 신뢰도 낮음)
length_factor = min(len(answer) / 100, 1.0)
return min(avg_score * length_factor, 1.0)
async def provide_help(self, message: AgentMessage):
"""다른 에이전트의 도움 요청 처리"""
request_type = message.content.get('help_type', '')
context = message.content.get('context', '')
if request_type == 'domain_expertise':
# 도메인 전문 지식 제공
expertise = await self.provide_domain_expertise(context)
await self.send_message(
receiver=message.sender,
message_type=MessageType.PROVIDE_CONTEXT,
content={"expertise": expertise, "domain": self.domain},
conversation_id=message.conversation_id
)
# 사용 예시
async def setup_multi_agent_rag():
# 메시지 버스 초기화
message_bus = MessageBus()
# Orchestrator 에이전트 생성
orchestrator = OrchestratorAgent("orchestrator", message_bus)
# 전문 에이전트들 생성
medical_agent = SpecialistRAGAgent("medical_agent", message_bus, "medical", medical_vector_db, llm_client)
legal_agent = SpecialistRAGAgent("legal_agent", message_bus, "legal", legal_vector_db, llm_client)
technical_agent = SpecialistRAGAgent("technical_agent", message_bus, "technical", tech_vector_db, llm_client)
# 전문 에이전트들을 orchestrator에 등록
orchestrator.register_specialist("medical_agent", ["medical", "cardiology", "health"])
orchestrator.register_specialist("legal_agent", ["legal", "contract", "law"])
orchestrator.register_specialist("technical_agent", ["technical", "programming", "software"])
return orchestrator, message_bus2.3 에이전트 협력 패턴
효과적인 멀티 에이전트 협력을 위한 설계 패턴
주요 협력 패턴
🔄 Sequential Pattern
에이전트들이 순차적으로 작업을 수행하여 점진적으로 답변을 개선
Research Agent → Analysis Agent → Synthesis Agent
⚡ Parallel Pattern
여러 에이전트가 동시에 작업하여 다양한 관점의 답변을 수집
Agent A + Agent B + Agent C → Synthesizer
🔀 Debate Pattern
에이전트들이 서로 다른 관점에서 토론하여 최적의 답변 도출
Pro Agent ↔ Con Agent → Moderator → Final Answer
🎯 Expert Consultation
일반 에이전트가 전문 에이전트에게 자문을 구하는 패턴
General Agent → Expert1, Expert2 → Integrated Answer
실제 구현 사례: 의사결정 지원 시스템
class DecisionSupportSystem:
def __init__(self):
self.message_bus = MessageBus()
self.agents = {}
self.setup_agents()
def setup_agents(self):
# 다양한 역할의 에이전트 생성
self.agents['researcher'] = ResearchAgent("researcher", self.message_bus)
self.agents['analyst'] = AnalysisAgent("analyst", self.message_bus)
self.agents['validator'] = ValidationAgent("validator", self.message_bus)
self.agents['synthesizer'] = SynthesisAgent("synthesizer", self.message_bus)
self.agents['devil_advocate'] = DevilAdvocateAgent("devil_advocate", self.message_bus)
async def process_complex_query(self, query: str) -> str:
conversation_id = f"decision_{datetime.now().timestamp()}"
# Phase 1: 초기 연구 및 분석
await self.agents['researcher'].send_message(
receiver="researcher",
message_type=MessageType.QUERY,
content={"query": query, "phase": "initial_research"},
conversation_id=conversation_id
)
# 연구 결과 대기
research_results = await self.wait_for_response("researcher", conversation_id)
# Phase 2: 심화 분석
await self.agents['analyst'].send_message(
receiver="analyst",
message_type=MessageType.QUERY,
content={"query": query, "research_data": research_results},
conversation_id=conversation_id
)
analysis_results = await self.wait_for_response("analyst", conversation_id)
# Phase 3: 반대 의견 검토
await self.agents['devil_advocate'].send_message(
receiver="devil_advocate",
message_type=MessageType.QUERY,
content={"analysis": analysis_results, "task": "find_weaknesses"},
conversation_id=conversation_id
)
critique_results = await self.wait_for_response("devil_advocate", conversation_id)
# Phase 4: 검증 및 통합
await self.agents['validator'].send_message(
receiver="validator",
message_type=MessageType.VALIDATE,
content={
"original_query": query,
"research": research_results,
"analysis": analysis_results,
"critique": critique_results
},
conversation_id=conversation_id
)
validation_results = await self.wait_for_response("validator", conversation_id)
# Phase 5: 최종 종합
await self.agents['synthesizer'].send_message(
receiver="synthesizer",
message_type=MessageType.QUERY,
content={
"query": query,
"all_perspectives": {
"research": research_results,
"analysis": analysis_results,
"critique": critique_results,
"validation": validation_results
}
},
conversation_id=conversation_id
)
final_answer = await self.wait_for_response("synthesizer", conversation_id)
return final_answer
class ResearchAgent(BaseAgent):
async def process_query(self, message: AgentMessage):
query = message.content.get('query', '')
# 다양한 소스에서 기초 정보 수집
research_results = await self.comprehensive_search(query)
# 정보의 신뢰성 평가
credibility_scores = await self.evaluate_source_credibility(research_results)
# 구조화된 연구 결과 생성
structured_research = {
"key_findings": research_results[:5],
"supporting_evidence": research_results[5:10],
"credibility_analysis": credibility_scores,
"research_gaps": await self.identify_gaps(research_results),
"recommended_next_steps": await self.suggest_further_research(query, research_results)
}
await self.send_message(
receiver=message.sender,
message_type=MessageType.RESPONSE,
content=structured_research,
conversation_id=message.conversation_id
)
class DevilAdvocateAgent(BaseAgent):
async def process_query(self, message: AgentMessage):
analysis = message.content.get('analysis', {})
# 분석의 약점 찾기
weaknesses = await self.find_logical_weaknesses(analysis)
alternative_perspectives = await self.generate_counterarguments(analysis)
potential_biases = await self.identify_biases(analysis)
critique = {
"logical_weaknesses": weaknesses,
"alternative_viewpoints": alternative_perspectives,
"identified_biases": potential_biases,
"risk_assessment": await self.assess_risks(analysis),
"improvement_suggestions": await self.suggest_improvements(analysis)
}
await self.send_message(
receiver=message.sender,
message_type=MessageType.RESPONSE,
content=critique,
conversation_id=message.conversation_id
)
# 사용 예시
async def main():
decision_system = DecisionSupportSystem()
complex_query = """
우리 회사가 AI 기반 의료 진단 시스템을 개발해야 할지 결정해주세요.
시장 기회, 기술적 feasibility, 규제 리스크, 경쟁 환경을 고려해주세요.
"""
final_recommendation = await decision_system.process_complex_query(complex_query)
print("최종 의사결정 권고사항:", final_recommendation)2.4 Multi-Agent 성능 최적화
대규모 멀티 에이전트 시스템의 효율성 극대화
핵심 최적화 전략
🚀 병렬 처리 최적화
- • 에이전트별 독립적 리소스 할당
- • 비동기 메시지 처리
- • 로드 밸런싱 구현
- • 작업 큐 관리
💾 메모리 관리
- • 컨텍스트 윈도우 최적화
- • 메시지 히스토리 압축
- • 에이전트별 메모리 풀
- • 가비지 컬렉션 튜닝
⚡ 통신 최적화
- • 메시지 압축
- • 배치 처리
- • 우선순위 큐
- • 연결 풀링
성능 벤치마크 결과
| 시스템 구성 | 응답 시간 | 정확도 | 처리량 (QPS) |
|---|---|---|---|
| Single RAG Agent | 2.1초 | 78% | 45 |
| 3-Agent System | 3.8초 | 89% | 32 |
| 5-Agent System | 4.2초 | 94% | 28 |
| Optimized 5-Agent | 2.9초 | 93% | 38 |
💡 핵심 인사이트: 적절한 최적화를 통해 Multi-Agent 시스템은 높은 정확도를 유지하면서도 단일 에이전트 대비 합리적인 성능을 달성할 수 있습니다.
실습 과제
Multi-Agent RAG 시스템 구축
🏗️ 시스템 설계
- 1. 의료진단 지원을 위한 3-Agent 시스템 설계
- 2. 메시지 버스 기반 비동기 통신 구현
- 3. 도메인별 전문 에이전트 개발 (내과, 영상의학, 응급의학)
- 4. Orchestrator의 지능적 라우팅 알고리즘 구현
- 5. 답변 통합 및 신뢰도 평가 시스템 개발
🎯 평가 기준
- • 에이전트 간 협력 효율성 (응답 시간, 메시지 교환 횟수)
- • 최종 답변의 종합성과 정확도
- • 시스템 확장성 (새로운 전문 도메인 추가 용이성)
- • 장애 복구 능력 (일부 에이전트 실패 시 대응)
🚀 고급 과제
자가 학습 Multi-Agent 시스템: 에이전트들이 상호작용 과정에서 서로의 강점을 학습하고, 협력 패턴을 최적화하는 강화학습 기반 시스템을 구현해보세요.