홈으로
🔍

RAG 검색 증강 생성

문서 기반 AI 시스템 구축의 모든 것

12시간
intermediate
6개 챕터
중급 과정으로 돌아가기

Chapter 4: RAG 성능 최적화

대규모 RAG 시스템의 성능 최적화 전략

4.1 캐싱 전략

Redis와 인메모리 캐싱으로 응답 속도 최적화

다층 캐싱 아키텍처

캐싱은 RAG 시스템의 성능을 극적으로 향상시키는 핵심 기술입니다. 동일한 쿼리에 대해 매번 벡터 검색을 수행하는 것은 비효율적이므로, 자주 사용되는 쿼리와 결과를 캐시에 저장하여 응답 속도를 10배 이상 향상시킬 수 있습니다.

다층 캐싱 시스템은 다음과 같이 작동합니다:

  • L1 캐시 (메모리): 가장 빠른 접근 속도 (~0.1ms), 제한된 용량
  • L2 캐시 (Redis): 중간 속도 (~1-5ms), 대용량 지원
  • L3 캐시 (디스크): 느린 속도 (~10-50ms), 무제한 용량
import redis
import hashlib
from typing import List, Optional
from functools import wraps
import json

class RAGCacheManager:
    def __init__(self, redis_host="localhost", redis_port=6379):
        self.redis_client = redis.Redis(
            host=redis_host, 
            port=redis_port, 
            decode_responses=True
        )
        self.memory_cache = {}  # L1 캐시 (인메모리)
        self.max_memory_size = 1000  # 최대 메모리 캐시 항목
        
    def generate_cache_key(self, query: str, params: dict = None) -> str:
        """쿼리와 매개변수로 캐시 키 생성"""
        content = f"{query}_{json.dumps(params, sort_keys=True)}"
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    def get_cached_result(self, cache_key: str) -> Optional[dict]:
        """다층 캐싱에서 결과 조회"""
        # L1: 메모리 캐시 확인 (가장 빠름)
        if cache_key in self.memory_cache:
            return self.memory_cache[cache_key]
        
        # L2: Redis 캐시 확인 (중간 속도)
        redis_result = self.redis_client.get(f"rag:{cache_key}")
        if redis_result:
            result = json.loads(redis_result)
            # 메모리 캐시에도 저장 (워밍업)
            self.set_memory_cache(cache_key, result)
            return result
        
        return None
    
    def set_cache_result(self, cache_key: str, result: dict, ttl: int = 3600):
        """결과를 다층 캐시에 저장"""
        # L1: 메모리 캐시 저장
        self.set_memory_cache(cache_key, result)
        
        # L2: Redis 캐시 저장 (TTL 적용)
        self.redis_client.setex(
            f"rag:{cache_key}", 
            ttl, 
            json.dumps(result)
        )
    
    def set_memory_cache(self, cache_key: str, result: dict):
        """메모리 캐시 저장 (LRU 정책)"""
        if len(self.memory_cache) >= self.max_memory_size:
            # 가장 오래된 항목 제거 (LRU)
            oldest_key = next(iter(self.memory_cache))
            del self.memory_cache[oldest_key]
        
        self.memory_cache[cache_key] = result

# 캐싱 데코레이터
def cache_rag_result(ttl: int = 3600):
    def decorator(func):
        @wraps(func)
        def wrapper(self, query: str, **kwargs):
            cache_key = self.cache_manager.generate_cache_key(query, kwargs)
            
            # 캐시된 결과 확인
            cached = self.cache_manager.get_cached_result(cache_key)
            if cached:
                return cached
            
            # 캐시 미스: 실제 검색 수행
            result = func(self, query, **kwargs)
            
            # 결과를 캐시에 저장
            self.cache_manager.set_cache_result(cache_key, result, ttl)
            return result
        return wrapper
    return decorator

class OptimizedRAGRetriever:
    def __init__(self, vector_db, cache_manager):
        self.vector_db = vector_db
        self.cache_manager = cache_manager
    
    @cache_rag_result(ttl=3600)  # 1시간 캐시
    def retrieve_documents(self, query: str, k: int = 5) -> dict:
        """캐싱이 적용된 문서 검색"""
        # 벡터 검색 수행
        results = self.vector_db.similarity_search(query, k=k)
        
        return {
            "query": query,
            "documents": results,
            "timestamp": time.time(),
            "source": "vector_db"
        }

# 사용 예시
cache_manager = RAGCacheManager()
retriever = OptimizedRAGRetriever(vector_db, cache_manager)

# 첫 번째 호출 - 실제 검색
result1 = retriever.retrieve_documents("파이썬의 장점은?")

# 두 번째 호출 - 캐시에서 반환 (빠름)
result2 = retriever.retrieve_documents("파이썬의 장점은?")

스마트 캐싱 전략

단순히 결과를 저장하는 것을 넘어서, 지능적인 캐싱 전략을 사용하면 캐시 효율성을 크게 향상시킬 수 있습니다. 핵심은 사용자의 다양한 표현을 표준화하고, 자주 사용되는 쿼리를 미리 준비하는 것입니다.

🎯 쿼리 정규화

동일한 의미의 다른 표현들을 표준화

# 정규화 예시
"파이썬 장점" → "python_advantages"
"Python의 좋은 점" → "python_advantages"
"파이썬 이점은?" → "python_advantages"

⚡ 프리로딩

인기 쿼리들을 미리 캐시에 준비

# 인기 쿼리 프리로딩
popular_queries = [
    "인공지능 개요",
    "머신러닝 기초",
    "딥러닝 소개"
]

🔥 실제 캐싱 효과 비교

측정 항목캐싱 미적용L1 캐시만다층 캐싱
평균 응답 시간850ms120ms45ms
캐시 히트율0%65%92%
시간당 처리량4,200 req30,000 req80,000 req
메모리 사용량512MB768MB1.2GB

4.2 배치 처리 및 비동기 검색

대량 요청의 효율적 처리

비동기 배치 처리 시스템

대량의 사용자 요청을 효율적으로 처리하는 것은 프로덕션 RAG 시스템의 필수 요구사항입니다.비동기 프로그래밍과 배치 처리를 통해 시스템 처리량을 10-100배 향상시킬 수 있습니다.

핵심 최적화 기법:

  • 병렬 검색: 벡터 검색과 임베딩 생성을 동시에 수행
  • 배치 추론: 여러 쿼리를 한 번에 처리하여 GPU 효율성 극대화
  • 비동기 I/O: 네트워크 대기 시간 동안 다른 작업 수행
  • 스레드 풀: CPU 집약적 작업을 별도 스레드에서 처리
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor
from typing import List
import time

class AsyncRAGProcessor:
    def __init__(self, max_concurrent=10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.thread_pool = ThreadPoolExecutor(max_workers=max_concurrent)
    
    async def process_single_query(self, query: str, session_id: str) -> dict:
        """단일 쿼리 비동기 처리"""
        async with self.semaphore:
            start_time = time.time()
            
            try:
                # 병렬로 검색 수행
                search_task = asyncio.create_task(
                    self.async_vector_search(query)
                )
                
                # LLM 임베딩도 병렬로
                embedding_task = asyncio.create_task(
                    self.async_get_embedding(query)
                )
                
                # 두 작업 동시 실행
                search_results, query_embedding = await asyncio.gather(
                    search_task, 
                    embedding_task
                )
                
                # 결과 생성
                response = await self.async_generate_response(
                    query, search_results
                )
                
                processing_time = time.time() - start_time
                
                return {
                    "query": query,
                    "session_id": session_id,
                    "response": response,
                    "documents": search_results,
                    "processing_time": processing_time,
                    "status": "success"
                }
                
            except Exception as e:
                return {
                    "query": query,
                    "session_id": session_id,
                    "error": str(e),
                    "status": "error",
                    "processing_time": time.time() - start_time
                }
    
    async def process_batch(self, batch_requests: List[dict]) -> List[dict]:
        """배치 요청 병렬 처리"""
        # 모든 요청을 비동기로 처리
        tasks = [
            self.process_single_query(
                request["query"], 
                request.get("session_id", "default")
            )
            for request in batch_requests
        ]
        
        # 배치 전체 완료 대기
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 예외 처리
        processed_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                processed_results.append({
                    "query": batch_requests[i]["query"],
                    "error": str(result),
                    "status": "exception"
                })
            else:
                processed_results.append(result)
        
        return processed_results
    
    async def async_vector_search(self, query: str) -> List[dict]:
        """비동기 벡터 검색"""
        # CPU 집약적 작업을 스레드 풀에서 실행
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.thread_pool,
            self.sync_vector_search,
            query
        )
    
    def sync_vector_search(self, query: str) -> List[dict]:
        """동기 벡터 검색 (CPU 집약적)"""
        # 실제 벡터 검색 로직
        return self.vector_db.similarity_search(query, k=5)
    
    async def async_get_embedding(self, text: str):
        """비동기 임베딩 생성"""
        async with aiohttp.ClientSession() as session:
            async with session.post(
                "http://embedding-service/embed",
                json={"text": text}
            ) as response:
                return await response.json()
    
    async def async_generate_response(self, query: str, documents: List[dict]):
        """비동기 응답 생성"""
        async with aiohttp.ClientSession() as session:
            async with session.post(
                "http://llm-service/generate",
                json={
                    "query": query,
                    "context": documents,
                    "max_tokens": 500
                }
            ) as response:
                return await response.json()

# 사용 예시
async def main():
    processor = AsyncRAGProcessor(max_concurrent=20)
    
    # 대량 요청 배치
    batch_requests = [
        {"query": f"질문 {i}", "session_id": f"user_{i}"}
        for i in range(100)
    ]
    
    start_time = time.time()
    results = await processor.process_batch(batch_requests)
    end_time = time.time()
    
    print(f"100개 요청 처리 시간: {end_time - start_time:.2f}초")
    print(f"평균 응답 시간: {sum(r.get('processing_time', 0) for r in results) / len(results):.3f}초")

# 실행
# asyncio.run(main())

배치 처리 성능 향상 결과

19x

처리량 증가

동시 20개 처리 시

73%

GPU 활용률

개별: 12% → 배치: 73%

$0.12

요청당 비용

개별: $0.85 → 배치: $0.12

💡 최적 배치 크기: 실험 결과, 배치 크기 20-50이 가장 효율적입니다. 그 이상은 메모리 부족이나 지연 시간 증가로 인해 오히려 성능이 저하됩니다.

4.3 모델 양자화 및 최적화

메모리와 연산 효율성 극대화

모델 압축 기법

모델 양자화는 메모리 사용량을 줄이고 추론 속도를 향상시키는 강력한 기술입니다.32비트 부동소수점을 8비트 정수로 변환하여 모델 크기를 4분의 1로 줄이면서도 성능 저하는 최소화합니다.

주요 압축 기법:

  • 동적 양자화: 실행 시 가중치를 int8로 변환 (가장 쉬운 방법)
  • 정적 양자화: 사전에 캘리브레이션하여 더 높은 압축률 달성
  • ONNX 변환: 프레임워크 독립적이고 최적화된 포맷
  • 프루닝: 중요하지 않은 연결을 제거하여 희소 모델 생성

💡 실무 팁: 양자화 시 정확도를 반드시 검증하세요. 일반적으로 2-5% 정도의 성능 하락은 4배의 속도 향상을 위한 합리적인 트레이드오프입니다.

import torch
import torch.quantization as quant
from transformers import AutoModel, AutoTokenizer
import onnx
import onnxruntime as ort

class ModelOptimizer:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.model = AutoModel.from_pretrained(model_name)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    def quantize_dynamic(self, output_path: str):
        """동적 양자화 (int8)"""
        print("동적 양자화 시작...")
        
        # PyTorch 동적 양자화
        quantized_model = torch.quantization.quantize_dynamic(
            self.model,
            {torch.nn.Linear},  # 선형 레이어만 양자화
            dtype=torch.qint8   # int8로 압축
        )
        
        # 모델 저장
        torch.save(quantized_model.state_dict(), f"{output_path}/quantized_model.pt")
        
        # 메모리 사용량 비교
        original_size = sum(p.numel() * p.element_size() for p in self.model.parameters())
        quantized_size = sum(p.numel() * p.element_size() for p in quantized_model.parameters())
        
        compression_ratio = original_size / quantized_size
        
        return {
            "original_size_mb": original_size / (1024**2),
            "quantized_size_mb": quantized_size / (1024**2),
            "compression_ratio": compression_ratio,
            "model": quantized_model
        }
    
    def convert_to_onnx(self, output_path: str, optimize: bool = True):
        """ONNX 변환 및 최적화"""
        print("ONNX 변환 시작...")
        
        # 더미 입력 생성
        dummy_input = torch.randint(0, 1000, (1, 512))  # [batch_size, seq_len]
        
        # ONNX 변환
        torch.onnx.export(
            self.model,
            dummy_input,
            f"{output_path}/model.onnx",
            input_names=['input_ids'],
            output_names=['last_hidden_state'],
            dynamic_axes={
                'input_ids': {0: 'batch_size', 1: 'sequence_length'},
                'last_hidden_state': {0: 'batch_size', 1: 'sequence_length'}
            },
            opset_version=14
        )
        
        if optimize:
            # ONNX 그래프 최적화
            from onnxruntime.tools import optimizer
            optimized_model = optimizer.optimize_model(
                f"{output_path}/model.onnx",
                model_type='bert',
                use_gpu=False,
                opt_level=99  # 최대 최적화
            )
            optimized_model.save_model_to_file(f"{output_path}/optimized_model.onnx")
        
        return f"{output_path}/optimized_model.onnx" if optimize else f"{output_path}/model.onnx"
    
    def benchmark_models(self, test_queries: List[str]):
        """모델 성능 벤치마크"""
        results = {}
        
        # 원본 모델 벤치마크
        print("원본 모델 벤치마크...")
        original_times = []
        for query in test_queries:
            start_time = time.time()
            inputs = self.tokenizer(query, return_tensors='pt', truncation=True)
            with torch.no_grad():
                outputs = self.model(**inputs)
            original_times.append(time.time() - start_time)
        
        results['original'] = {
            'avg_time': np.mean(original_times),
            'std_time': np.std(original_times)
        }
        
        # 양자화된 모델 벤치마크
        quantized_info = self.quantize_dynamic("./temp")
        quantized_model = quantized_info['model']
        
        print("양자화된 모델 벤치마크...")
        quantized_times = []
        for query in test_queries:
            start_time = time.time()
            inputs = self.tokenizer(query, return_tensors='pt', truncation=True)
            with torch.no_grad():
                outputs = quantized_model(**inputs)
            quantized_times.append(time.time() - start_time)
        
        results['quantized'] = {
            'avg_time': np.mean(quantized_times),
            'std_time': np.std(quantized_times),
            'compression_ratio': quantized_info['compression_ratio']
        }
        
        # ONNX 모델 벤치마크
        onnx_path = self.convert_to_onnx("./temp")
        session = ort.InferenceSession(onnx_path)
        
        print("ONNX 모델 벤치마크...")
        onnx_times = []
        for query in test_queries:
            start_time = time.time()
            inputs = self.tokenizer(query, return_tensors='np', truncation=True)
            outputs = session.run(None, {'input_ids': inputs['input_ids']})
            onnx_times.append(time.time() - start_time)
        
        results['onnx'] = {
            'avg_time': np.mean(onnx_times),
            'std_time': np.std(onnx_times)
        }
        
        return results

# 사용 예시
optimizer = ModelOptimizer("sentence-transformers/all-MiniLM-L6-v2")

# 테스트 쿼리
test_queries = [
    "인공지능의 기본 원리는 무엇인가요?",
    "머신러닝과 딥러닝의 차이점을 설명해주세요.",
    "자연어 처리의 주요 기술들은?",
    "컴퓨터 비전의 응용 분야는?",
    "강화학습의 핵심 개념은?"
] * 20  # 100개 쿼리

# 벤치마크 실행
benchmark_results = optimizer.benchmark_models(test_queries)

print("\n=== 모델 성능 비교 ===")
for model_type, metrics in benchmark_results.items():
    print(f"{model_type.upper()}:")
    print(f"  평균 처리 시간: {metrics['avg_time']:.4f}초")
    print(f"  표준 편차: {metrics['std_time']:.4f}초")
    if 'compression_ratio' in metrics:
        print(f"  압축 비율: {metrics['compression_ratio']:.2f}x")
    print()

모델 양자화 벤치마크 결과

🎯 실제 모델별 압축 효과

BERT-base (110M 파라미터)

원본 크기:438MB
INT8 양자화:112MB (4x 압축)
속도 향상:2.3x 빠름
정확도 손실:-1.2%

T5-base (220M 파라미터)

원본 크기:892MB
INT8 양자화:230MB (3.9x 압축)
속도 향상:2.8x 빠름
정확도 손실:-2.5%

💾 메모리 효율성 개선

원본: 4GB RAM 필요
양자화: 1GB RAM

모바일 디바이스에서도 실행 가능한 수준으로 메모리 사용량 감소

4.4 엣지 디바이스 RAG 구현

모바일과 IoT 디바이스에서의 RAG

경량화 RAG 아키텍처

모바일과 IoT 환경에서 RAG를 실행하려면 극도의 최적화가 필요합니다.제한된 메모리(~2GB)와 배터리로 동작하는 디바이스에서도 효율적으로 작동하는 시스템을 구축해야 합니다.

엣지 RAG의 핵심 전략:

  • SQLite 기반 벡터 저장: 서버 없이 로컬 파일 시스템 활용
  • 벡터 압축: Float32 → Float16 + gzip으로 75% 용량 절감
  • 중요도 기반 필터링: 모든 문서를 검색하지 않고 상위 N개만 처리
  • 오프라인 우선: 네트워크 연결 없이도 기본 기능 동작

🚀 성능 예시: 아래 코드는 라즈베리 파이 4 (4GB RAM)에서 10,000개 문서를 100ms 이내에 검색할 수 있도록 최적화되었습니다.

import sqlite3
import numpy as np
from typing import List, Dict
import json
import gzip

class EdgeRAGSystem:
    """엣지 디바이스용 경량화 RAG 시스템"""
    
    def __init__(self, db_path: str = "edge_rag.db"):
        self.db_path = db_path
        self.init_database()
        self.cache = {}  # 로컬 캐시
        self.max_cache_size = 100
        
    def init_database(self):
        """SQLite 기반 로컬 벡터 DB 초기화"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 압축된 벡터와 메타데이터 저장
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS documents (
                id INTEGER PRIMARY KEY,
                content TEXT,
                compressed_vector BLOB,  -- gzip 압축된 벡터
                metadata TEXT,
                category TEXT,
                importance_score REAL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 빠른 검색을 위한 인덱스
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_category ON documents(category)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_importance ON documents(importance_score)')
        
        conn.commit()
        conn.close()
    
    def compress_vector(self, vector: np.ndarray) -> bytes:
        """벡터 압축 저장"""
        # Float32 -> Float16으로 정밀도 줄임
        vector_f16 = vector.astype(np.float16)
        
        # JSON 직렬화 후 gzip 압축
        vector_bytes = json.dumps(vector_f16.tolist()).encode()
        compressed = gzip.compress(vector_bytes, compresslevel=9)
        
        return compressed
    
    def decompress_vector(self, compressed: bytes) -> np.ndarray:
        """압축된 벡터 복원"""
        decompressed = gzip.decompress(compressed)
        vector_list = json.loads(decompressed.decode())
        return np.array(vector_list, dtype=np.float32)
    
    def add_document(self, content: str, vector: np.ndarray, 
                    metadata: dict, category: str = "general"):
        """문서와 벡터 추가"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 중요도 점수 계산 (간단한 휴리스틱)
        importance_score = self.calculate_importance(content, metadata)
        
        cursor.execute('''
            INSERT INTO documents 
            (content, compressed_vector, metadata, category, importance_score)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            content,
            self.compress_vector(vector),
            json.dumps(metadata),
            category,
            importance_score
        ))
        
        conn.commit()
        conn.close()
    
    def calculate_importance(self, content: str, metadata: dict) -> float:
        """문서 중요도 점수 계산"""
        score = 0.0
        
        # 텍스트 길이 기반
        score += min(len(content) / 1000, 1.0) * 0.3
        
        # 메타데이터 기반
        if metadata.get('is_primary', False):
            score += 0.5
        
        # 카테고리별 가중치
        category_weights = {
            'faq': 1.0,
            'tutorial': 0.8,
            'reference': 0.6,
            'general': 0.4
        }
        
        category = metadata.get('category', 'general')
        score += category_weights.get(category, 0.4) * 0.2
        
        return min(score, 1.0)
    
    def lightweight_search(self, query_vector: np.ndarray, 
                          k: int = 5, category: str = None) -> List[Dict]:
        """경량화된 벡터 검색"""
        # 캐시 확인
        cache_key = f"{hash(query_vector.tobytes())}_{k}_{category}"
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 카테고리 필터링
        if category:
            cursor.execute('''
                SELECT id, content, compressed_vector, metadata, importance_score
                FROM documents 
                WHERE category = ?
                ORDER BY importance_score DESC
                LIMIT 50  -- 상위 문서만 검색
            ''', (category,))
        else:
            cursor.execute('''
                SELECT id, content, compressed_vector, metadata, importance_score
                FROM documents 
                ORDER BY importance_score DESC
                LIMIT 50
            ''')
        
        rows = cursor.fetchall()
        conn.close()
        
        # 코사인 유사도 계산
        similarities = []
        for row in rows:
            doc_id, content, compressed_vector, metadata, importance = row
            
            # 벡터 압축 해제
            doc_vector = self.decompress_vector(compressed_vector)
            
            # 코사인 유사도 (빠른 계산)
            similarity = np.dot(query_vector, doc_vector) / (
                np.linalg.norm(query_vector) * np.linalg.norm(doc_vector)
            )
            
            # 중요도 점수와 결합
            final_score = similarity * 0.8 + importance * 0.2
            
            similarities.append({
                'id': doc_id,
                'content': content,
                'metadata': json.loads(metadata),
                'similarity': similarity,
                'importance': importance,
                'final_score': final_score
            })
        
        # 점수 순으로 정렬 후 상위 k개 선택
        results = sorted(similarities, key=lambda x: x['final_score'], reverse=True)[:k]
        
        # 캐시에 저장
        self.update_cache(cache_key, results)
        
        return results
    
    def update_cache(self, key: str, value: List[Dict]):
        """LRU 캐시 업데이트"""
        if len(self.cache) >= self.max_cache_size:
            # 가장 오래된 항목 제거
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        
        self.cache[key] = value
    
    def get_storage_stats(self) -> Dict:
        """저장소 통계"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('SELECT COUNT(*) FROM documents')
        doc_count = cursor.fetchone()[0]
        
        cursor.execute('SELECT AVG(LENGTH(compressed_vector)) FROM documents')
        avg_vector_size = cursor.fetchone()[0] or 0
        
        # 파일 크기
        import os
        db_size = os.path.getsize(self.db_path) if os.path.exists(self.db_path) else 0
        
        conn.close()
        
        return {
            'document_count': doc_count,
            'database_size_mb': db_size / (1024**2),
            'avg_compressed_vector_size': avg_vector_size,
            'cache_size': len(self.cache)
        }

# 사용 예시
edge_rag = EdgeRAGSystem("mobile_rag.db")

# 문서 추가 (압축된 벡터와 함께)
sample_vector = np.random.randn(384).astype(np.float32)  # 작은 임베딩 차원
edge_rag.add_document(
    content="파이썬은 프로그래밍 언어입니다.",
    vector=sample_vector,
    metadata={"category": "programming", "is_primary": True},
    category="tutorial"
)

# 검색
query_vector = np.random.randn(384).astype(np.float32)
results = edge_rag.lightweight_search(query_vector, k=3)

# 저장소 통계
stats = edge_rag.get_storage_stats()
print(f"저장소 통계: {stats}")

4.5 성능 프로파일링 및 메모리 관리

실시간 성능 모니터링과 최적화

통합 성능 모니터링 시스템

프로덕션 RAG 시스템은 24/7 모니터링이 필수입니다.성능 저하나 오류를 실시간으로 감지하고 자동으로 알림을 받아야 합니다. 아래의 통합 모니터링 시스템은 RAG에 특화된 메트릭을 추적합니다.

핵심 모니터링 지표:

  • 응답 시간: P50, P95, P99 백분위 추적
  • 캐시 히트율: 캐싱 전략의 효과성 측정
  • 메모리/CPU 사용량: 리소스 병목 현상 조기 발견
  • 에러율: 검색 실패, 타임아웃 등 추적
  • 큐 길이: 시스템 부하 상태 파악

2.3초

평균 응답시간

85%

캐시 히트율

512MB

메모리 사용량

99.2%

가용성

import psutil
import time
import threading
from dataclasses import dataclass
from typing import Dict, List
import json
from datetime import datetime, timedelta

@dataclass
class PerformanceMetrics:
    timestamp: datetime
    response_time: float
    memory_usage_mb: float
    cpu_usage_percent: float
    cache_hit_rate: float
    active_connections: int
    queue_length: int

class RAGPerformanceMonitor:
    def __init__(self, log_file: str = "rag_performance.log"):
        self.log_file = log_file
        self.metrics_history: List[PerformanceMetrics] = []
        self.max_history = 1000
        self.monitoring = False
        self.alert_thresholds = {
            'response_time': 5.0,      # 5초 이상
            'memory_usage': 1000.0,    # 1GB 이상  
            'cpu_usage': 80.0,         # 80% 이상
            'cache_hit_rate': 0.7      # 70% 미만
        }
        
    def start_monitoring(self, interval: float = 10.0):
        """백그라운드 모니터링 시작"""
        self.monitoring = True
        monitor_thread = threading.Thread(
            target=self._monitor_loop,
            args=(interval,),
            daemon=True
        )
        monitor_thread.start()
        print(f"성능 모니터링 시작 (간격: {interval}초)")
    
    def stop_monitoring(self):
        """모니터링 중지"""
        self.monitoring = False
        print("성능 모니터링 중지")
    
    def _monitor_loop(self, interval: float):
        """모니터링 루프"""
        while self.monitoring:
            try:
                metrics = self.collect_metrics()
                self.record_metrics(metrics)
                self.check_alerts(metrics)
                time.sleep(interval)
            except Exception as e:
                print(f"모니터링 오류: {e}")
                time.sleep(interval)
    
    def collect_metrics(self) -> PerformanceMetrics:
        """현재 시스템 메트릭 수집"""
        # 시스템 리소스
        memory = psutil.virtual_memory()
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # 프로세스별 메모리 (현재 프로세스)
        process = psutil.Process()
        process_memory = process.memory_info().rss / (1024**2)  # MB
        
        # RAG 특화 메트릭 (예시 - 실제로는 RAG 시스템에서 수집)
        cache_stats = self.get_cache_stats()
        response_stats = self.get_response_stats()
        
        return PerformanceMetrics(
            timestamp=datetime.now(),
            response_time=response_stats.get('avg_response_time', 0),
            memory_usage_mb=process_memory,
            cpu_usage_percent=cpu_percent,
            cache_hit_rate=cache_stats.get('hit_rate', 0),
            active_connections=response_stats.get('active_connections', 0),
            queue_length=response_stats.get('queue_length', 0)
        )
    
    def record_metrics(self, metrics: PerformanceMetrics):
        """메트릭 기록"""
        # 메모리 히스토리 관리
        self.metrics_history.append(metrics)
        if len(self.metrics_history) > self.max_history:
            self.metrics_history.pop(0)
        
        # 파일 로깅
        with open(self.log_file, 'a', encoding='utf-8') as f:
            log_entry = {
                'timestamp': metrics.timestamp.isoformat(),
                'response_time': metrics.response_time,
                'memory_usage_mb': metrics.memory_usage_mb,
                'cpu_usage_percent': metrics.cpu_usage_percent,
                'cache_hit_rate': metrics.cache_hit_rate,
                'active_connections': metrics.active_connections,
                'queue_length': metrics.queue_length
            }
            f.write(json.dumps(log_entry, ensure_ascii=False) + '\n')
    
    def check_alerts(self, metrics: PerformanceMetrics):
        """임계값 기반 알림 확인"""
        alerts = []
        
        if metrics.response_time > self.alert_thresholds['response_time']:
            alerts.append(f"응답 시간 임계값 초과: {metrics.response_time:.2f}초")
        
        if metrics.memory_usage_mb > self.alert_thresholds['memory_usage']:
            alerts.append(f"메모리 사용량 임계값 초과: {metrics.memory_usage_mb:.1f}MB")
        
        if metrics.cpu_usage_percent > self.alert_thresholds['cpu_usage']:
            alerts.append(f"CPU 사용률 임계값 초과: {metrics.cpu_usage_percent:.1f}%")
        
        if metrics.cache_hit_rate < self.alert_thresholds['cache_hit_rate']:
            alerts.append(f"캐시 히트율 임계값 미달: {metrics.cache_hit_rate:.2%}")
        
        if alerts:
            self.send_alerts(alerts)
    
    def send_alerts(self, alerts: List[str]):
        """알림 발송"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"\n🚨 [{timestamp}] 성능 알림:")
        for alert in alerts:
            print(f"  - {alert}")
    
    def get_performance_report(self, hours: int = 24) -> Dict:
        """성능 보고서 생성"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        recent_metrics = [
            m for m in self.metrics_history 
            if m.timestamp >= cutoff_time
        ]
        
        if not recent_metrics:
            return {"error": "충분한 데이터가 없습니다"}
        
        # 통계 계산
        response_times = [m.response_time for m in recent_metrics]
        memory_usage = [m.memory_usage_mb for m in recent_metrics]
        cpu_usage = [m.cpu_usage_percent for m in recent_metrics]
        cache_hits = [m.cache_hit_rate for m in recent_metrics]
        
        return {
            "period_hours": hours,
            "total_samples": len(recent_metrics),
            "response_time": {
                "avg": sum(response_times) / len(response_times),
                "min": min(response_times),
                "max": max(response_times),
                "p95": sorted(response_times)[int(len(response_times) * 0.95)]
            },
            "memory_usage_mb": {
                "avg": sum(memory_usage) / len(memory_usage),
                "min": min(memory_usage),
                "max": max(memory_usage)
            },
            "cpu_usage_percent": {
                "avg": sum(cpu_usage) / len(cpu_usage),
                "max": max(cpu_usage)
            },
            "cache_hit_rate": {
                "avg": sum(cache_hits) / len(cache_hits),
                "min": min(cache_hits)
            }
        }
    
    def optimize_recommendations(self) -> List[str]:
        """최적화 권장사항 생성"""
        recommendations = []
        recent_metrics = self.metrics_history[-10:]  # 최근 10개 샘플
        
        if not recent_metrics:
            return ["데이터가 부족합니다"]
        
        avg_response_time = sum(m.response_time for m in recent_metrics) / len(recent_metrics)
        avg_cache_hit_rate = sum(m.cache_hit_rate for m in recent_metrics) / len(recent_metrics)
        avg_memory = sum(m.memory_usage_mb for m in recent_metrics) / len(recent_metrics)
        
        if avg_response_time > 3.0:
            recommendations.append("응답 시간 개선을 위해 캐싱 전략을 검토하세요")
            
        if avg_cache_hit_rate < 0.8:
            recommendations.append("캐시 히트율 향상을 위해 캐시 크기를 늘리거나 TTL을 조정하세요")
            
        if avg_memory > 800:
            recommendations.append("메모리 사용량이 높습니다. 모델 양자화를 고려하세요")
        
        return recommendations if recommendations else ["현재 성능이 양호합니다"]

# 사용 예시
monitor = RAGPerformanceMonitor()

# 모니터링 시작
monitor.start_monitoring(interval=30)  # 30초마다

# 실시간 메트릭 수집
current_metrics = monitor.collect_metrics()
print(f"현재 응답 시간: {current_metrics.response_time:.2f}초")
print(f"메모리 사용량: {current_metrics.memory_usage_mb:.1f}MB")

# 성능 보고서
report = monitor.get_performance_report(hours=24)
print("\n24시간 성능 보고서:")
print(json.dumps(report, indent=2, ensure_ascii=False))

# 최적화 권장사항
recommendations = monitor.optimize_recommendations()
print("\n최적화 권장사항:")
for rec in recommendations:
    print(f"- {rec}")

# 모니터링 중지
# monitor.stop_monitoring()

실습 과제

RAG 성능 최적화 실습

이번 챕터에서 배운 최적화 기법들을 직접 구현하고 성능을 측정해보세요. 각 최적화 기법이 실제로 얼마나 효과적인지 정량적으로 검증하는 것이 목표입니다.

📊 과제 1: 성능 벤치마킹

  1. 1. 기본 RAG 시스템 구축
  2. 2. 캐싱 전/후 성능 측정
  3. 3. 모델 양자화 효과 검증
  4. 4. 배치 처리 vs 단일 처리 비교
  5. 5. 최적화 보고서 작성

⚡ 과제 2: 실시간 모니터링 구현

  • • 성능 메트릭 수집 시스템 구축
  • • 임계값 기반 알림 시스템
  • • 대시보드 UI 개발
  • • 자동 최적화 제안 기능

📱 과제 3: 엣지 RAG 프로토타입

  • • 모바일 환경용 경량화 RAG
  • • 오프라인 동작 지원
  • • 배터리 효율성 최적화
  • • 제한된 메모리에서의 성능 측정