Chapter 4: RAG 성능 최적화
대규모 RAG 시스템의 성능 최적화 전략
4.1 캐싱 전략
Redis와 인메모리 캐싱으로 응답 속도 최적화
다층 캐싱 아키텍처
캐싱은 RAG 시스템의 성능을 극적으로 향상시키는 핵심 기술입니다. 동일한 쿼리에 대해 매번 벡터 검색을 수행하는 것은 비효율적이므로, 자주 사용되는 쿼리와 결과를 캐시에 저장하여 응답 속도를 10배 이상 향상시킬 수 있습니다.
다층 캐싱 시스템은 다음과 같이 작동합니다:
- L1 캐시 (메모리): 가장 빠른 접근 속도 (~0.1ms), 제한된 용량
- L2 캐시 (Redis): 중간 속도 (~1-5ms), 대용량 지원
- L3 캐시 (디스크): 느린 속도 (~10-50ms), 무제한 용량
import redis
import hashlib
from typing import List, Optional
from functools import wraps
import json
class RAGCacheManager:
def __init__(self, redis_host="localhost", redis_port=6379):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True
)
self.memory_cache = {} # L1 캐시 (인메모리)
self.max_memory_size = 1000 # 최대 메모리 캐시 항목
def generate_cache_key(self, query: str, params: dict = None) -> str:
"""쿼리와 매개변수로 캐시 키 생성"""
content = f"{query}_{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def get_cached_result(self, cache_key: str) -> Optional[dict]:
"""다층 캐싱에서 결과 조회"""
# L1: 메모리 캐시 확인 (가장 빠름)
if cache_key in self.memory_cache:
return self.memory_cache[cache_key]
# L2: Redis 캐시 확인 (중간 속도)
redis_result = self.redis_client.get(f"rag:{cache_key}")
if redis_result:
result = json.loads(redis_result)
# 메모리 캐시에도 저장 (워밍업)
self.set_memory_cache(cache_key, result)
return result
return None
def set_cache_result(self, cache_key: str, result: dict, ttl: int = 3600):
"""결과를 다층 캐시에 저장"""
# L1: 메모리 캐시 저장
self.set_memory_cache(cache_key, result)
# L2: Redis 캐시 저장 (TTL 적용)
self.redis_client.setex(
f"rag:{cache_key}",
ttl,
json.dumps(result)
)
def set_memory_cache(self, cache_key: str, result: dict):
"""메모리 캐시 저장 (LRU 정책)"""
if len(self.memory_cache) >= self.max_memory_size:
# 가장 오래된 항목 제거 (LRU)
oldest_key = next(iter(self.memory_cache))
del self.memory_cache[oldest_key]
self.memory_cache[cache_key] = result
# 캐싱 데코레이터
def cache_rag_result(ttl: int = 3600):
def decorator(func):
@wraps(func)
def wrapper(self, query: str, **kwargs):
cache_key = self.cache_manager.generate_cache_key(query, kwargs)
# 캐시된 결과 확인
cached = self.cache_manager.get_cached_result(cache_key)
if cached:
return cached
# 캐시 미스: 실제 검색 수행
result = func(self, query, **kwargs)
# 결과를 캐시에 저장
self.cache_manager.set_cache_result(cache_key, result, ttl)
return result
return wrapper
return decorator
class OptimizedRAGRetriever:
def __init__(self, vector_db, cache_manager):
self.vector_db = vector_db
self.cache_manager = cache_manager
@cache_rag_result(ttl=3600) # 1시간 캐시
def retrieve_documents(self, query: str, k: int = 5) -> dict:
"""캐싱이 적용된 문서 검색"""
# 벡터 검색 수행
results = self.vector_db.similarity_search(query, k=k)
return {
"query": query,
"documents": results,
"timestamp": time.time(),
"source": "vector_db"
}
# 사용 예시
cache_manager = RAGCacheManager()
retriever = OptimizedRAGRetriever(vector_db, cache_manager)
# 첫 번째 호출 - 실제 검색
result1 = retriever.retrieve_documents("파이썬의 장점은?")
# 두 번째 호출 - 캐시에서 반환 (빠름)
result2 = retriever.retrieve_documents("파이썬의 장점은?")스마트 캐싱 전략
단순히 결과를 저장하는 것을 넘어서, 지능적인 캐싱 전략을 사용하면 캐시 효율성을 크게 향상시킬 수 있습니다. 핵심은 사용자의 다양한 표현을 표준화하고, 자주 사용되는 쿼리를 미리 준비하는 것입니다.
🎯 쿼리 정규화
동일한 의미의 다른 표현들을 표준화
# 정규화 예시 "파이썬 장점" → "python_advantages" "Python의 좋은 점" → "python_advantages" "파이썬 이점은?" → "python_advantages"
⚡ 프리로딩
인기 쿼리들을 미리 캐시에 준비
# 인기 쿼리 프리로딩
popular_queries = [
"인공지능 개요",
"머신러닝 기초",
"딥러닝 소개"
]🔥 실제 캐싱 효과 비교
| 측정 항목 | 캐싱 미적용 | L1 캐시만 | 다층 캐싱 |
|---|---|---|---|
| 평균 응답 시간 | 850ms | 120ms | 45ms |
| 캐시 히트율 | 0% | 65% | 92% |
| 시간당 처리량 | 4,200 req | 30,000 req | 80,000 req |
| 메모리 사용량 | 512MB | 768MB | 1.2GB |
4.2 배치 처리 및 비동기 검색
대량 요청의 효율적 처리
비동기 배치 처리 시스템
대량의 사용자 요청을 효율적으로 처리하는 것은 프로덕션 RAG 시스템의 필수 요구사항입니다.비동기 프로그래밍과 배치 처리를 통해 시스템 처리량을 10-100배 향상시킬 수 있습니다.
핵심 최적화 기법:
- 병렬 검색: 벡터 검색과 임베딩 생성을 동시에 수행
- 배치 추론: 여러 쿼리를 한 번에 처리하여 GPU 효율성 극대화
- 비동기 I/O: 네트워크 대기 시간 동안 다른 작업 수행
- 스레드 풀: CPU 집약적 작업을 별도 스레드에서 처리
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from typing import List
import time
class AsyncRAGProcessor:
def __init__(self, max_concurrent=10):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.thread_pool = ThreadPoolExecutor(max_workers=max_concurrent)
async def process_single_query(self, query: str, session_id: str) -> dict:
"""단일 쿼리 비동기 처리"""
async with self.semaphore:
start_time = time.time()
try:
# 병렬로 검색 수행
search_task = asyncio.create_task(
self.async_vector_search(query)
)
# LLM 임베딩도 병렬로
embedding_task = asyncio.create_task(
self.async_get_embedding(query)
)
# 두 작업 동시 실행
search_results, query_embedding = await asyncio.gather(
search_task,
embedding_task
)
# 결과 생성
response = await self.async_generate_response(
query, search_results
)
processing_time = time.time() - start_time
return {
"query": query,
"session_id": session_id,
"response": response,
"documents": search_results,
"processing_time": processing_time,
"status": "success"
}
except Exception as e:
return {
"query": query,
"session_id": session_id,
"error": str(e),
"status": "error",
"processing_time": time.time() - start_time
}
async def process_batch(self, batch_requests: List[dict]) -> List[dict]:
"""배치 요청 병렬 처리"""
# 모든 요청을 비동기로 처리
tasks = [
self.process_single_query(
request["query"],
request.get("session_id", "default")
)
for request in batch_requests
]
# 배치 전체 완료 대기
results = await asyncio.gather(*tasks, return_exceptions=True)
# 예외 처리
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"query": batch_requests[i]["query"],
"error": str(result),
"status": "exception"
})
else:
processed_results.append(result)
return processed_results
async def async_vector_search(self, query: str) -> List[dict]:
"""비동기 벡터 검색"""
# CPU 집약적 작업을 스레드 풀에서 실행
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.thread_pool,
self.sync_vector_search,
query
)
def sync_vector_search(self, query: str) -> List[dict]:
"""동기 벡터 검색 (CPU 집약적)"""
# 실제 벡터 검색 로직
return self.vector_db.similarity_search(query, k=5)
async def async_get_embedding(self, text: str):
"""비동기 임베딩 생성"""
async with aiohttp.ClientSession() as session:
async with session.post(
"http://embedding-service/embed",
json={"text": text}
) as response:
return await response.json()
async def async_generate_response(self, query: str, documents: List[dict]):
"""비동기 응답 생성"""
async with aiohttp.ClientSession() as session:
async with session.post(
"http://llm-service/generate",
json={
"query": query,
"context": documents,
"max_tokens": 500
}
) as response:
return await response.json()
# 사용 예시
async def main():
processor = AsyncRAGProcessor(max_concurrent=20)
# 대량 요청 배치
batch_requests = [
{"query": f"질문 {i}", "session_id": f"user_{i}"}
for i in range(100)
]
start_time = time.time()
results = await processor.process_batch(batch_requests)
end_time = time.time()
print(f"100개 요청 처리 시간: {end_time - start_time:.2f}초")
print(f"평균 응답 시간: {sum(r.get('processing_time', 0) for r in results) / len(results):.3f}초")
# 실행
# asyncio.run(main())배치 처리 성능 향상 결과
19x
처리량 증가
동시 20개 처리 시
73%
GPU 활용률
개별: 12% → 배치: 73%
$0.12
요청당 비용
개별: $0.85 → 배치: $0.12
💡 최적 배치 크기: 실험 결과, 배치 크기 20-50이 가장 효율적입니다. 그 이상은 메모리 부족이나 지연 시간 증가로 인해 오히려 성능이 저하됩니다.
4.3 모델 양자화 및 최적화
메모리와 연산 효율성 극대화
모델 압축 기법
모델 양자화는 메모리 사용량을 줄이고 추론 속도를 향상시키는 강력한 기술입니다.32비트 부동소수점을 8비트 정수로 변환하여 모델 크기를 4분의 1로 줄이면서도 성능 저하는 최소화합니다.
주요 압축 기법:
- 동적 양자화: 실행 시 가중치를 int8로 변환 (가장 쉬운 방법)
- 정적 양자화: 사전에 캘리브레이션하여 더 높은 압축률 달성
- ONNX 변환: 프레임워크 독립적이고 최적화된 포맷
- 프루닝: 중요하지 않은 연결을 제거하여 희소 모델 생성
💡 실무 팁: 양자화 시 정확도를 반드시 검증하세요. 일반적으로 2-5% 정도의 성능 하락은 4배의 속도 향상을 위한 합리적인 트레이드오프입니다.
import torch
import torch.quantization as quant
from transformers import AutoModel, AutoTokenizer
import onnx
import onnxruntime as ort
class ModelOptimizer:
def __init__(self, model_name: str):
self.model_name = model_name
self.model = AutoModel.from_pretrained(model_name)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
def quantize_dynamic(self, output_path: str):
"""동적 양자화 (int8)"""
print("동적 양자화 시작...")
# PyTorch 동적 양자화
quantized_model = torch.quantization.quantize_dynamic(
self.model,
{torch.nn.Linear}, # 선형 레이어만 양자화
dtype=torch.qint8 # int8로 압축
)
# 모델 저장
torch.save(quantized_model.state_dict(), f"{output_path}/quantized_model.pt")
# 메모리 사용량 비교
original_size = sum(p.numel() * p.element_size() for p in self.model.parameters())
quantized_size = sum(p.numel() * p.element_size() for p in quantized_model.parameters())
compression_ratio = original_size / quantized_size
return {
"original_size_mb": original_size / (1024**2),
"quantized_size_mb": quantized_size / (1024**2),
"compression_ratio": compression_ratio,
"model": quantized_model
}
def convert_to_onnx(self, output_path: str, optimize: bool = True):
"""ONNX 변환 및 최적화"""
print("ONNX 변환 시작...")
# 더미 입력 생성
dummy_input = torch.randint(0, 1000, (1, 512)) # [batch_size, seq_len]
# ONNX 변환
torch.onnx.export(
self.model,
dummy_input,
f"{output_path}/model.onnx",
input_names=['input_ids'],
output_names=['last_hidden_state'],
dynamic_axes={
'input_ids': {0: 'batch_size', 1: 'sequence_length'},
'last_hidden_state': {0: 'batch_size', 1: 'sequence_length'}
},
opset_version=14
)
if optimize:
# ONNX 그래프 최적화
from onnxruntime.tools import optimizer
optimized_model = optimizer.optimize_model(
f"{output_path}/model.onnx",
model_type='bert',
use_gpu=False,
opt_level=99 # 최대 최적화
)
optimized_model.save_model_to_file(f"{output_path}/optimized_model.onnx")
return f"{output_path}/optimized_model.onnx" if optimize else f"{output_path}/model.onnx"
def benchmark_models(self, test_queries: List[str]):
"""모델 성능 벤치마크"""
results = {}
# 원본 모델 벤치마크
print("원본 모델 벤치마크...")
original_times = []
for query in test_queries:
start_time = time.time()
inputs = self.tokenizer(query, return_tensors='pt', truncation=True)
with torch.no_grad():
outputs = self.model(**inputs)
original_times.append(time.time() - start_time)
results['original'] = {
'avg_time': np.mean(original_times),
'std_time': np.std(original_times)
}
# 양자화된 모델 벤치마크
quantized_info = self.quantize_dynamic("./temp")
quantized_model = quantized_info['model']
print("양자화된 모델 벤치마크...")
quantized_times = []
for query in test_queries:
start_time = time.time()
inputs = self.tokenizer(query, return_tensors='pt', truncation=True)
with torch.no_grad():
outputs = quantized_model(**inputs)
quantized_times.append(time.time() - start_time)
results['quantized'] = {
'avg_time': np.mean(quantized_times),
'std_time': np.std(quantized_times),
'compression_ratio': quantized_info['compression_ratio']
}
# ONNX 모델 벤치마크
onnx_path = self.convert_to_onnx("./temp")
session = ort.InferenceSession(onnx_path)
print("ONNX 모델 벤치마크...")
onnx_times = []
for query in test_queries:
start_time = time.time()
inputs = self.tokenizer(query, return_tensors='np', truncation=True)
outputs = session.run(None, {'input_ids': inputs['input_ids']})
onnx_times.append(time.time() - start_time)
results['onnx'] = {
'avg_time': np.mean(onnx_times),
'std_time': np.std(onnx_times)
}
return results
# 사용 예시
optimizer = ModelOptimizer("sentence-transformers/all-MiniLM-L6-v2")
# 테스트 쿼리
test_queries = [
"인공지능의 기본 원리는 무엇인가요?",
"머신러닝과 딥러닝의 차이점을 설명해주세요.",
"자연어 처리의 주요 기술들은?",
"컴퓨터 비전의 응용 분야는?",
"강화학습의 핵심 개념은?"
] * 20 # 100개 쿼리
# 벤치마크 실행
benchmark_results = optimizer.benchmark_models(test_queries)
print("\n=== 모델 성능 비교 ===")
for model_type, metrics in benchmark_results.items():
print(f"{model_type.upper()}:")
print(f" 평균 처리 시간: {metrics['avg_time']:.4f}초")
print(f" 표준 편차: {metrics['std_time']:.4f}초")
if 'compression_ratio' in metrics:
print(f" 압축 비율: {metrics['compression_ratio']:.2f}x")
print()모델 양자화 벤치마크 결과
🎯 실제 모델별 압축 효과
BERT-base (110M 파라미터)
T5-base (220M 파라미터)
💾 메모리 효율성 개선
모바일 디바이스에서도 실행 가능한 수준으로 메모리 사용량 감소
4.4 엣지 디바이스 RAG 구현
모바일과 IoT 디바이스에서의 RAG
경량화 RAG 아키텍처
모바일과 IoT 환경에서 RAG를 실행하려면 극도의 최적화가 필요합니다.제한된 메모리(~2GB)와 배터리로 동작하는 디바이스에서도 효율적으로 작동하는 시스템을 구축해야 합니다.
엣지 RAG의 핵심 전략:
- SQLite 기반 벡터 저장: 서버 없이 로컬 파일 시스템 활용
- 벡터 압축: Float32 → Float16 + gzip으로 75% 용량 절감
- 중요도 기반 필터링: 모든 문서를 검색하지 않고 상위 N개만 처리
- 오프라인 우선: 네트워크 연결 없이도 기본 기능 동작
🚀 성능 예시: 아래 코드는 라즈베리 파이 4 (4GB RAM)에서 10,000개 문서를 100ms 이내에 검색할 수 있도록 최적화되었습니다.
import sqlite3
import numpy as np
from typing import List, Dict
import json
import gzip
class EdgeRAGSystem:
"""엣지 디바이스용 경량화 RAG 시스템"""
def __init__(self, db_path: str = "edge_rag.db"):
self.db_path = db_path
self.init_database()
self.cache = {} # 로컬 캐시
self.max_cache_size = 100
def init_database(self):
"""SQLite 기반 로컬 벡터 DB 초기화"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 압축된 벡터와 메타데이터 저장
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY,
content TEXT,
compressed_vector BLOB, -- gzip 압축된 벡터
metadata TEXT,
category TEXT,
importance_score REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 빠른 검색을 위한 인덱스
cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON documents(category)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_importance ON documents(importance_score)')
conn.commit()
conn.close()
def compress_vector(self, vector: np.ndarray) -> bytes:
"""벡터 압축 저장"""
# Float32 -> Float16으로 정밀도 줄임
vector_f16 = vector.astype(np.float16)
# JSON 직렬화 후 gzip 압축
vector_bytes = json.dumps(vector_f16.tolist()).encode()
compressed = gzip.compress(vector_bytes, compresslevel=9)
return compressed
def decompress_vector(self, compressed: bytes) -> np.ndarray:
"""압축된 벡터 복원"""
decompressed = gzip.decompress(compressed)
vector_list = json.loads(decompressed.decode())
return np.array(vector_list, dtype=np.float32)
def add_document(self, content: str, vector: np.ndarray,
metadata: dict, category: str = "general"):
"""문서와 벡터 추가"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 중요도 점수 계산 (간단한 휴리스틱)
importance_score = self.calculate_importance(content, metadata)
cursor.execute('''
INSERT INTO documents
(content, compressed_vector, metadata, category, importance_score)
VALUES (?, ?, ?, ?, ?)
''', (
content,
self.compress_vector(vector),
json.dumps(metadata),
category,
importance_score
))
conn.commit()
conn.close()
def calculate_importance(self, content: str, metadata: dict) -> float:
"""문서 중요도 점수 계산"""
score = 0.0
# 텍스트 길이 기반
score += min(len(content) / 1000, 1.0) * 0.3
# 메타데이터 기반
if metadata.get('is_primary', False):
score += 0.5
# 카테고리별 가중치
category_weights = {
'faq': 1.0,
'tutorial': 0.8,
'reference': 0.6,
'general': 0.4
}
category = metadata.get('category', 'general')
score += category_weights.get(category, 0.4) * 0.2
return min(score, 1.0)
def lightweight_search(self, query_vector: np.ndarray,
k: int = 5, category: str = None) -> List[Dict]:
"""경량화된 벡터 검색"""
# 캐시 확인
cache_key = f"{hash(query_vector.tobytes())}_{k}_{category}"
if cache_key in self.cache:
return self.cache[cache_key]
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 카테고리 필터링
if category:
cursor.execute('''
SELECT id, content, compressed_vector, metadata, importance_score
FROM documents
WHERE category = ?
ORDER BY importance_score DESC
LIMIT 50 -- 상위 문서만 검색
''', (category,))
else:
cursor.execute('''
SELECT id, content, compressed_vector, metadata, importance_score
FROM documents
ORDER BY importance_score DESC
LIMIT 50
''')
rows = cursor.fetchall()
conn.close()
# 코사인 유사도 계산
similarities = []
for row in rows:
doc_id, content, compressed_vector, metadata, importance = row
# 벡터 압축 해제
doc_vector = self.decompress_vector(compressed_vector)
# 코사인 유사도 (빠른 계산)
similarity = np.dot(query_vector, doc_vector) / (
np.linalg.norm(query_vector) * np.linalg.norm(doc_vector)
)
# 중요도 점수와 결합
final_score = similarity * 0.8 + importance * 0.2
similarities.append({
'id': doc_id,
'content': content,
'metadata': json.loads(metadata),
'similarity': similarity,
'importance': importance,
'final_score': final_score
})
# 점수 순으로 정렬 후 상위 k개 선택
results = sorted(similarities, key=lambda x: x['final_score'], reverse=True)[:k]
# 캐시에 저장
self.update_cache(cache_key, results)
return results
def update_cache(self, key: str, value: List[Dict]):
"""LRU 캐시 업데이트"""
if len(self.cache) >= self.max_cache_size:
# 가장 오래된 항목 제거
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
self.cache[key] = value
def get_storage_stats(self) -> Dict:
"""저장소 통계"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM documents')
doc_count = cursor.fetchone()[0]
cursor.execute('SELECT AVG(LENGTH(compressed_vector)) FROM documents')
avg_vector_size = cursor.fetchone()[0] or 0
# 파일 크기
import os
db_size = os.path.getsize(self.db_path) if os.path.exists(self.db_path) else 0
conn.close()
return {
'document_count': doc_count,
'database_size_mb': db_size / (1024**2),
'avg_compressed_vector_size': avg_vector_size,
'cache_size': len(self.cache)
}
# 사용 예시
edge_rag = EdgeRAGSystem("mobile_rag.db")
# 문서 추가 (압축된 벡터와 함께)
sample_vector = np.random.randn(384).astype(np.float32) # 작은 임베딩 차원
edge_rag.add_document(
content="파이썬은 프로그래밍 언어입니다.",
vector=sample_vector,
metadata={"category": "programming", "is_primary": True},
category="tutorial"
)
# 검색
query_vector = np.random.randn(384).astype(np.float32)
results = edge_rag.lightweight_search(query_vector, k=3)
# 저장소 통계
stats = edge_rag.get_storage_stats()
print(f"저장소 통계: {stats}")4.5 성능 프로파일링 및 메모리 관리
실시간 성능 모니터링과 최적화
통합 성능 모니터링 시스템
프로덕션 RAG 시스템은 24/7 모니터링이 필수입니다.성능 저하나 오류를 실시간으로 감지하고 자동으로 알림을 받아야 합니다. 아래의 통합 모니터링 시스템은 RAG에 특화된 메트릭을 추적합니다.
핵심 모니터링 지표:
- 응답 시간: P50, P95, P99 백분위 추적
- 캐시 히트율: 캐싱 전략의 효과성 측정
- 메모리/CPU 사용량: 리소스 병목 현상 조기 발견
- 에러율: 검색 실패, 타임아웃 등 추적
- 큐 길이: 시스템 부하 상태 파악
2.3초
평균 응답시간
85%
캐시 히트율
512MB
메모리 사용량
99.2%
가용성
import psutil
import time
import threading
from dataclasses import dataclass
from typing import Dict, List
import json
from datetime import datetime, timedelta
@dataclass
class PerformanceMetrics:
timestamp: datetime
response_time: float
memory_usage_mb: float
cpu_usage_percent: float
cache_hit_rate: float
active_connections: int
queue_length: int
class RAGPerformanceMonitor:
def __init__(self, log_file: str = "rag_performance.log"):
self.log_file = log_file
self.metrics_history: List[PerformanceMetrics] = []
self.max_history = 1000
self.monitoring = False
self.alert_thresholds = {
'response_time': 5.0, # 5초 이상
'memory_usage': 1000.0, # 1GB 이상
'cpu_usage': 80.0, # 80% 이상
'cache_hit_rate': 0.7 # 70% 미만
}
def start_monitoring(self, interval: float = 10.0):
"""백그라운드 모니터링 시작"""
self.monitoring = True
monitor_thread = threading.Thread(
target=self._monitor_loop,
args=(interval,),
daemon=True
)
monitor_thread.start()
print(f"성능 모니터링 시작 (간격: {interval}초)")
def stop_monitoring(self):
"""모니터링 중지"""
self.monitoring = False
print("성능 모니터링 중지")
def _monitor_loop(self, interval: float):
"""모니터링 루프"""
while self.monitoring:
try:
metrics = self.collect_metrics()
self.record_metrics(metrics)
self.check_alerts(metrics)
time.sleep(interval)
except Exception as e:
print(f"모니터링 오류: {e}")
time.sleep(interval)
def collect_metrics(self) -> PerformanceMetrics:
"""현재 시스템 메트릭 수집"""
# 시스템 리소스
memory = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent(interval=1)
# 프로세스별 메모리 (현재 프로세스)
process = psutil.Process()
process_memory = process.memory_info().rss / (1024**2) # MB
# RAG 특화 메트릭 (예시 - 실제로는 RAG 시스템에서 수집)
cache_stats = self.get_cache_stats()
response_stats = self.get_response_stats()
return PerformanceMetrics(
timestamp=datetime.now(),
response_time=response_stats.get('avg_response_time', 0),
memory_usage_mb=process_memory,
cpu_usage_percent=cpu_percent,
cache_hit_rate=cache_stats.get('hit_rate', 0),
active_connections=response_stats.get('active_connections', 0),
queue_length=response_stats.get('queue_length', 0)
)
def record_metrics(self, metrics: PerformanceMetrics):
"""메트릭 기록"""
# 메모리 히스토리 관리
self.metrics_history.append(metrics)
if len(self.metrics_history) > self.max_history:
self.metrics_history.pop(0)
# 파일 로깅
with open(self.log_file, 'a', encoding='utf-8') as f:
log_entry = {
'timestamp': metrics.timestamp.isoformat(),
'response_time': metrics.response_time,
'memory_usage_mb': metrics.memory_usage_mb,
'cpu_usage_percent': metrics.cpu_usage_percent,
'cache_hit_rate': metrics.cache_hit_rate,
'active_connections': metrics.active_connections,
'queue_length': metrics.queue_length
}
f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
def check_alerts(self, metrics: PerformanceMetrics):
"""임계값 기반 알림 확인"""
alerts = []
if metrics.response_time > self.alert_thresholds['response_time']:
alerts.append(f"응답 시간 임계값 초과: {metrics.response_time:.2f}초")
if metrics.memory_usage_mb > self.alert_thresholds['memory_usage']:
alerts.append(f"메모리 사용량 임계값 초과: {metrics.memory_usage_mb:.1f}MB")
if metrics.cpu_usage_percent > self.alert_thresholds['cpu_usage']:
alerts.append(f"CPU 사용률 임계값 초과: {metrics.cpu_usage_percent:.1f}%")
if metrics.cache_hit_rate < self.alert_thresholds['cache_hit_rate']:
alerts.append(f"캐시 히트율 임계값 미달: {metrics.cache_hit_rate:.2%}")
if alerts:
self.send_alerts(alerts)
def send_alerts(self, alerts: List[str]):
"""알림 발송"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n🚨 [{timestamp}] 성능 알림:")
for alert in alerts:
print(f" - {alert}")
def get_performance_report(self, hours: int = 24) -> Dict:
"""성능 보고서 생성"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [
m for m in self.metrics_history
if m.timestamp >= cutoff_time
]
if not recent_metrics:
return {"error": "충분한 데이터가 없습니다"}
# 통계 계산
response_times = [m.response_time for m in recent_metrics]
memory_usage = [m.memory_usage_mb for m in recent_metrics]
cpu_usage = [m.cpu_usage_percent for m in recent_metrics]
cache_hits = [m.cache_hit_rate for m in recent_metrics]
return {
"period_hours": hours,
"total_samples": len(recent_metrics),
"response_time": {
"avg": sum(response_times) / len(response_times),
"min": min(response_times),
"max": max(response_times),
"p95": sorted(response_times)[int(len(response_times) * 0.95)]
},
"memory_usage_mb": {
"avg": sum(memory_usage) / len(memory_usage),
"min": min(memory_usage),
"max": max(memory_usage)
},
"cpu_usage_percent": {
"avg": sum(cpu_usage) / len(cpu_usage),
"max": max(cpu_usage)
},
"cache_hit_rate": {
"avg": sum(cache_hits) / len(cache_hits),
"min": min(cache_hits)
}
}
def optimize_recommendations(self) -> List[str]:
"""최적화 권장사항 생성"""
recommendations = []
recent_metrics = self.metrics_history[-10:] # 최근 10개 샘플
if not recent_metrics:
return ["데이터가 부족합니다"]
avg_response_time = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
avg_cache_hit_rate = sum(m.cache_hit_rate for m in recent_metrics) / len(recent_metrics)
avg_memory = sum(m.memory_usage_mb for m in recent_metrics) / len(recent_metrics)
if avg_response_time > 3.0:
recommendations.append("응답 시간 개선을 위해 캐싱 전략을 검토하세요")
if avg_cache_hit_rate < 0.8:
recommendations.append("캐시 히트율 향상을 위해 캐시 크기를 늘리거나 TTL을 조정하세요")
if avg_memory > 800:
recommendations.append("메모리 사용량이 높습니다. 모델 양자화를 고려하세요")
return recommendations if recommendations else ["현재 성능이 양호합니다"]
# 사용 예시
monitor = RAGPerformanceMonitor()
# 모니터링 시작
monitor.start_monitoring(interval=30) # 30초마다
# 실시간 메트릭 수집
current_metrics = monitor.collect_metrics()
print(f"현재 응답 시간: {current_metrics.response_time:.2f}초")
print(f"메모리 사용량: {current_metrics.memory_usage_mb:.1f}MB")
# 성능 보고서
report = monitor.get_performance_report(hours=24)
print("\n24시간 성능 보고서:")
print(json.dumps(report, indent=2, ensure_ascii=False))
# 최적화 권장사항
recommendations = monitor.optimize_recommendations()
print("\n최적화 권장사항:")
for rec in recommendations:
print(f"- {rec}")
# 모니터링 중지
# monitor.stop_monitoring()실습 과제
RAG 성능 최적화 실습
이번 챕터에서 배운 최적화 기법들을 직접 구현하고 성능을 측정해보세요. 각 최적화 기법이 실제로 얼마나 효과적인지 정량적으로 검증하는 것이 목표입니다.
📊 과제 1: 성능 벤치마킹
- 1. 기본 RAG 시스템 구축
- 2. 캐싱 전/후 성능 측정
- 3. 모델 양자화 효과 검증
- 4. 배치 처리 vs 단일 처리 비교
- 5. 최적화 보고서 작성
⚡ 과제 2: 실시간 모니터링 구현
- • 성능 메트릭 수집 시스템 구축
- • 임계값 기반 알림 시스템
- • 대시보드 UI 개발
- • 자동 최적화 제안 기능
📱 과제 3: 엣지 RAG 프로토타입
- • 모바일 환경용 경량화 RAG
- • 오프라인 동작 지원
- • 배터리 효율성 최적화
- • 제한된 메모리에서의 성능 측정