홈으로
🔍

RAG 검색 증강 생성

문서 기반 AI 시스템 구축의 모든 것

12시간
intermediate
6개 챕터
중급 과정으로 돌아가기

Chapter 5: 멀티모달 RAG

이미지, 비디오, 오디오 데이터를 활용한 고급 RAG

5.1 CLIP 기반 이미지-텍스트 RAG

시각적 정보와 텍스트 통합 검색

CLIP 기반 멀티모달 검색 시스템

CLIP(Contrastive Language-Image Pre-training) 모델을 활용하여 이미지와 텍스트를 동일한 벡터 공간에 임베딩하고, 크로스 모달 검색을 구현합니다.

🖼️ 이미지 검색

  • • 텍스트 쿼리로 이미지 검색
  • • 이미지로 유사 이미지 찾기
  • • 의미론적 유사도 기반 랭킹
  • • 다중 언어 지원

📝 텍스트 생성

  • • 이미지 기반 답변 생성
  • • 시각적 컨텍스트 이해
  • • 멀티모달 프롬프트 엔지니어링
  • • 비전-언어 모델 통합

CLIP 기반 멀티모달 RAG 구현

import torch
import clip
from PIL import Image
import requests
from io import BytesIO
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
from typing import List, Tuple, Optional, Dict
import base64

class MultimodalRAGSystem:
    def __init__(self, 
                 clip_model_name: str = "ViT-B/32",
                 text_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        
        # CLIP 모델 로드
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.clip_model, self.clip_preprocess = clip.load(clip_model_name, device=self.device)
        
        # 텍스트 전용 모델 (더 나은 텍스트 임베딩용)
        self.text_model = SentenceTransformer(text_model_name)
        
        # 벡터 인덱스
        self.image_index = None
        self.text_index = None
        
        # 메타데이터 저장소
        self.image_metadata = []
        self.text_metadata = []
        
        # 임베딩 차원
        self.clip_dim = 512  # CLIP의 기본 차원
        self.text_dim = 384  # MiniLM의 차원
    
    def encode_image(self, image_path_or_url: str) -> np.ndarray:
        """이미지를 벡터로 인코딩"""
        try:
            if image_path_or_url.startswith(('http://', 'https://')):
                response = requests.get(image_path_or_url)
                image = Image.open(BytesIO(response.content))
            else:
                image = Image.open(image_path_or_url)
            
            # CLIP 전처리 및 인코딩
            image_input = self.clip_preprocess(image).unsqueeze(0).to(self.device)
            
            with torch.no_grad():
                image_features = self.clip_model.encode_image(image_input)
                image_features = image_features / image_features.norm(dim=-1, keepdim=True)
            
            return image_features.cpu().numpy()[0]
        
        except Exception as e:
            print(f"Error encoding image {image_path_or_url}: {e}")
            return np.zeros(self.clip_dim)
    
    def encode_text_with_clip(self, text: str) -> np.ndarray:
        """텍스트를 CLIP으로 인코딩 (이미지와 호환)"""
        text_input = clip.tokenize([text]).to(self.device)
        
        with torch.no_grad():
            text_features = self.clip_model.encode_text(text_input)
            text_features = text_features / text_features.norm(dim=-1, keepdim=True)
        
        return text_features.cpu().numpy()[0]
    
    def encode_text(self, text: str) -> np.ndarray:
        """텍스트를 SentenceTransformer로 인코딩 (더 나은 텍스트 이해)"""
        return self.text_model.encode(text)
    
    def build_image_index(self, image_data: List[Dict]):
        """이미지 인덱스 구축
        
        Args:
            image_data: List of dicts with keys:
                - path: image file path or URL
                - caption: optional image caption
                - metadata: additional metadata
        """
        print(f"Building image index for {len(image_data)} images...")
        
        embeddings = []
        for i, data in enumerate(image_data):
            print(f"Processing image {i+1}/{len(image_data)}: {data['path']}")
            
            # 이미지 임베딩
            embedding = self.encode_image(data['path'])
            embeddings.append(embedding)
            
            # 메타데이터 저장
            self.image_metadata.append({
                'id': i,
                'path': data['path'],
                'caption': data.get('caption', ''),
                'metadata': data.get('metadata', {})
            })
        
        # FAISS 인덱스 생성
        embeddings_array = np.array(embeddings).astype('float32')
        self.image_index = faiss.IndexFlatIP(self.clip_dim)  # Inner Product (cosine similarity)
        self.image_index.add(embeddings_array)
        
        print(f"Image index built with {self.image_index.ntotal} vectors")
    
    def build_text_index(self, text_data: List[Dict]):
        """텍스트 인덱스 구축
        
        Args:
            text_data: List of dicts with keys:
                - text: text content
                - source: source information
                - metadata: additional metadata
        """
        print(f"Building text index for {len(text_data)} documents...")
        
        embeddings = []
        for i, data in enumerate(text_data):
            # 텍스트 임베딩
            embedding = self.encode_text(data['text'])
            embeddings.append(embedding)
            
            # 메타데이터 저장
            self.text_metadata.append({
                'id': i,
                'text': data['text'],
                'source': data.get('source', ''),
                'metadata': data.get('metadata', {})
            })
        
        # FAISS 인덱스 생성
        embeddings_array = np.array(embeddings).astype('float32')
        self.text_index = faiss.IndexFlatIP(self.text_dim)
        self.text_index.add(embeddings_array)
        
        print(f"Text index built with {self.text_index.ntotal} vectors")
    
    def search_images_by_text(self, query: str, k: int = 5) -> List[Dict]:
        """텍스트 쿼리로 이미지 검색"""
        if self.image_index is None:
            return []
        
        # 텍스트를 CLIP으로 인코딩 (이미지와 호환 공간)
        query_embedding = self.encode_text_with_clip(query).astype('float32')
        
        # 검색
        scores, indices = self.image_index.search(query_embedding.reshape(1, -1), k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx != -1:  # 유효한 인덱스
                result = self.image_metadata[idx].copy()
                result['score'] = float(score)
                results.append(result)
        
        return results
    
    def search_images_by_image(self, image_path: str, k: int = 5) -> List[Dict]:
        """이미지로 유사한 이미지 검색"""
        if self.image_index is None:
            return []
        
        # 쿼리 이미지 인코딩
        query_embedding = self.encode_image(image_path).astype('float32')
        
        # 검색
        scores, indices = self.image_index.search(query_embedding.reshape(1, -1), k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx != -1:
                result = self.image_metadata[idx].copy()
                result['score'] = float(score)
                results.append(result)
        
        return results
    
    def search_text(self, query: str, k: int = 5) -> List[Dict]:
        """텍스트 검색"""
        if self.text_index is None:
            return []
        
        # 쿼리 임베딩
        query_embedding = self.encode_text(query).astype('float32')
        
        # 검색
        scores, indices = self.text_index.search(query_embedding.reshape(1, -1), k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx != -1:
                result = self.text_metadata[idx].copy()
                result['score'] = float(score)
                results.append(result)
        
        return results
    
    def multimodal_rag(self, query: str, 
                      include_images: bool = True,
                      include_text: bool = True,
                      k_images: int = 3,
                      k_text: int = 5) -> Dict:
        """멀티모달 RAG 검색
        
        Args:
            query: 사용자 질문
            include_images: 이미지 검색 포함 여부
            include_text: 텍스트 검색 포함 여부
            k_images: 검색할 이미지 수
            k_text: 검색할 텍스트 수
        
        Returns:
            dict with retrieved images and texts
        """
        results = {
            'query': query,
            'images': [],
            'texts': [],
            'context': ''
        }
        
        # 이미지 검색
        if include_images and self.image_index is not None:
            image_results = self.search_images_by_text(query, k_images)
            results['images'] = image_results
        
        # 텍스트 검색
        if include_text and self.text_index is not None:
            text_results = self.search_text(query, k_text)
            results['texts'] = text_results
            
            # 텍스트 컨텍스트 구성
            context_parts = []
            for i, result in enumerate(text_results):
                context_parts.append(f"[Document {i+1}] {result['text']}")
            results['context'] = '\n\n'.join(context_parts)
        
        return results
    
    def generate_multimodal_response(self, query: str, rag_results: Dict) -> str:
        """멀티모달 컨텍스트를 활용한 답변 생성"""
        
        # 프롬프트 구성
        prompt_parts = [f"Question: {query}\n"]
        
        # 텍스트 컨텍스트 추가
        if rag_results['context']:
            prompt_parts.append(f"Text Context:\n{rag_results['context']}\n")
        
        # 이미지 정보 추가
        if rag_results['images']:
            image_info = []
            for i, img in enumerate(rag_results['images']):
                caption = img.get('caption', 'No caption available')
                image_info.append(f"Image {i+1}: {caption} (relevance: {img['score']:.3f})")
            prompt_parts.append(f"Related Images:\n" + '\n'.join(image_info) + "\n")
        
        prompt_parts.append(
            "Based on the provided text context and image information, "
            "please provide a comprehensive answer to the question. "
            "Reference specific images or documents when relevant."
        )
        
        full_prompt = '\n'.join(prompt_parts)
        
        # 실제로는 GPT-4V, Claude-3 Vision 등의 멀티모달 모델 사용
        # 여기서는 시뮬레이션
        return self._simulate_multimodal_response(query, rag_results, full_prompt)
    
    def _simulate_multimodal_response(self, query: str, rag_results: Dict, prompt: str) -> str:
        """멀티모달 응답 시뮬레이션"""
        
        response_parts = []
        
        # 텍스트 기반 답변
        if rag_results['texts']:
            response_parts.append(f"Based on the retrieved documents, {query.lower()}")
        
        # 이미지 기반 보충 설명
        if rag_results['images']:
            img_count = len(rag_results['images'])
            response_parts.append(f"The {img_count} related image(s) provide visual context that supports this explanation.")
        
        # 통합된 답변
        if response_parts:
            return ' '.join(response_parts) + " [This is a simulated response - in production, use GPT-4V or similar multimodal models]"
        else:
            return "I couldn't find relevant information to answer your question."

# 사용 예제
def demo_multimodal_rag():
    # 시스템 초기화
    rag_system = MultimodalRAGSystem()
    
    # 이미지 데이터 준비
    image_data = [
        {
            'path': 'https://example.com/cat.jpg',
            'caption': 'A cute cat sitting on a windowsill',
            'metadata': {'category': 'animals', 'source': 'stock_photos'}
        },
        {
            'path': 'https://example.com/dog.jpg',
            'caption': 'A golden retriever playing in a park',
            'metadata': {'category': 'animals', 'source': 'stock_photos'}
        },
        {
            'path': 'https://example.com/sunset.jpg',
            'caption': 'Beautiful sunset over the ocean',
            'metadata': {'category': 'nature', 'source': 'landscape_photos'}
        }
    ]
    
    # 텍스트 데이터 준비
    text_data = [
        {
            'text': 'Cats are independent animals that make great pets. They are known for their hunting skills and agility.',
            'source': 'animal_encyclopedia',
            'metadata': {'category': 'animals', 'subcategory': 'cats'}
        },
        {
            'text': 'Dogs are loyal companions and come in many breeds. They require regular exercise and social interaction.',
            'source': 'pet_guide',
            'metadata': {'category': 'animals', 'subcategory': 'dogs'}
        },
        {
            'text': 'Ocean sunsets create spectacular views with their vibrant colors reflected on the water surface.',
            'source': 'nature_guide',
            'metadata': {'category': 'nature', 'subcategory': 'ocean'}
        }
    ]
    
    # 인덱스 구축
    print("Building indexes...")
    rag_system.build_image_index(image_data)
    rag_system.build_text_index(text_data)
    
    # 멀티모달 검색 및 답변 생성
    queries = [
        "Tell me about cats and show me cat pictures",
        "What makes dogs good pets?",
        "Show me beautiful nature scenes"
    ]
    
    for query in queries:
        print(f"\n{'='*50}")
        print(f"Query: {query}")
        print(f"{'='*50}")
        
        # RAG 검색
        rag_results = rag_system.multimodal_rag(query)
        
        # 결과 출력
        print(f"\nFound {len(rag_results['images'])} relevant images:")
        for i, img in enumerate(rag_results['images']):
            print(f"  {i+1}. {img['caption']} (score: {img['score']:.3f})")
        
        print(f"\nFound {len(rag_results['texts'])} relevant texts:")
        for i, text in enumerate(rag_results['texts']):
            print(f"  {i+1}. {text['text'][:100]}... (score: {text['score']:.3f})")
        
        # 답변 생성
        response = rag_system.generate_multimodal_response(query, rag_results)
        print(f"\nGenerated Response:\n{response}")

# 실행
if __name__ == "__main__":
    demo_multimodal_rag()

고급 이미지-텍스트 RAG 기법

1. Image Captioning 통합

자동 이미지 캡션 생성으로 검색 품질 향상

from transformers import BlipProcessor, BlipForConditionalGeneration

class EnhancedImageRAG(MultimodalRAGSystem):
    def __init__(self):
        super().__init__()
        # BLIP 모델 로드 (이미지 캡셔닝)
        self.caption_processor = BlipProcessor.from_pretrained(
            "Salesforce/blip-image-captioning-base"
        )
        self.caption_model = BlipForConditionalGeneration.from_pretrained(
            "Salesforce/blip-image-captioning-base"
        )
    
    def generate_caption(self, image_path: str) -> str:
        image = Image.open(image_path)
        inputs = self.caption_processor(image, return_tensors="pt")
        out = self.caption_model.generate(**inputs, max_length=50)
        caption = self.caption_processor.decode(out[0], skip_special_tokens=True)
        return caption

2. Visual Question Answering

이미지에 대한 구체적 질문 답변

from transformers import ViltProcessor, ViltForQuestionAnswering

def visual_qa(image_path: str, question: str):
    processor = ViltProcessor.from_pretrained("dandelin/vilt-b32-finetuned-vqa")
    model = ViltForQuestionAnswering.from_pretrained("dandelin/vilt-b32-finetuned-vqa")
    
    image = Image.open(image_path)
    inputs = processor(image, question, return_tensors="pt")
    
    with torch.no_grad():
        outputs = model(**inputs)
        logits = outputs.logits
        idx = logits.argmax(-1).item()
        answer = model.config.id2label[idx]
    
    return answer

5.2 비디오 검색 및 요약

비디오 콘텐츠 분석과 시간 기반 검색

비디오 RAG 아키텍처

비디오 데이터는 시각, 청각, 시간적 정보를 모두 포함하므로 복합적인 처리가 필요합니다.

🎬 프레임 분석

  • • 키프레임 추출
  • • 객체 감지
  • • 장면 전환 감지
  • • 시간 기반 인덱싱

🎵 오디오 처리

  • • 음성 텍스트 변환
  • • 화자 구분
  • • 음악/효과음 분석
  • • 감정 분석

📊 통합 검색

  • • 멀티모달 임베딩
  • • 시간 구간 검색
  • • 요약 생성
  • • 하이라이트 추출

비디오 RAG 구현 예시

import cv2
import whisper
from moviepy.editor import VideoFileClip
import torch
from transformers import AutoProcessor, AutoModel
import numpy as np

class VideoRAGSystem:
    def __init__(self):
        self.whisper_model = whisper.load_model("base")
        self.vision_model = AutoModel.from_pretrained("microsoft/DiT-base-finetuned-rvlcdip")
        self.processor = AutoProcessor.from_pretrained("microsoft/DiT-base-finetuned-rvlcdip")
        
    def extract_keyframes(self, video_path: str, interval: int = 30):
        """비디오에서 키프레임 추출 (매 30초)"""
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_interval = int(fps * interval)
        
        keyframes = []
        frame_count = 0
        
        while True:
            ret, frame = cap.read()
            if not ret:
                break
                
            if frame_count % frame_interval == 0:
                timestamp = frame_count / fps
                keyframes.append({
                    'frame': frame,
                    'timestamp': timestamp,
                    'frame_number': frame_count
                })
            
            frame_count += 1
        
        cap.release()
        return keyframes
    
    def transcribe_audio(self, video_path: str):
        """비디오에서 오디오 추출 및 텍스트 변환"""
        # 오디오 추출
        video = VideoFileClip(video_path)
        audio = video.audio
        audio_path = "temp_audio.wav"
        audio.write_audiofile(audio_path, verbose=False, logger=None)
        
        # Whisper로 전사
        result = self.whisper_model.transcribe(audio_path)
        
        segments = []
        for segment in result['segments']:
            segments.append({
                'start': segment['start'],
                'end': segment['end'], 
                'text': segment['text'],
                'confidence': segment.get('avg_logprob', 0)
            })
        
        return segments
    
    def process_video(self, video_path: str):
        """비디오 전체 처리 파이프라인"""
        print(f"Processing video: {video_path}")
        
        # 1. 키프레임 추출
        keyframes = self.extract_keyframes(video_path)
        print(f"Extracted {len(keyframes)} keyframes")
        
        # 2. 음성 전사
        transcripts = self.transcribe_audio(video_path)
        print(f"Extracted {len(transcripts)} transcript segments")
        
        # 3. 비디오 메타데이터 구성
        video_data = {
            'path': video_path,
            'keyframes': keyframes,
            'transcripts': transcripts,
            'duration': keyframes[-1]['timestamp'] if keyframes else 0
        }
        
        return video_data

# 사용 예제
video_rag = VideoRAGSystem()
video_data = video_rag.process_video("sample_video.mp4")

5.3 오디오/음성 데이터 처리

음성 인식과 오디오 콘텐츠 분석

오디오 RAG 처리 파이프라인

🎙️ 음성 인식

  • • Whisper 모델 활용
  • • 다국어 지원
  • • 실시간 전사
  • • 화자 구분
  • • 노이즈 필터링

📈 오디오 분석

  • • 감정 분석
  • • 음성 품질 평가
  • • 음악/효과음 분류
  • • 침묵 구간 감지
  • • 오디오 핑거프린팅

오디오 RAG 구현

import whisper
import librosa
import torch
import numpy as np
from transformers import pipeline
from typing import List, Dict
import speech_recognition as sr

class AudioRAGSystem:
    def __init__(self):
        # Whisper 모델 로드
        self.whisper_model = whisper.load_model("base")
        
        # 감정 분석 파이프라인
        self.emotion_analyzer = pipeline(
            "text-classification",
            model="j-hartmann/emotion-english-distilroberta-base"
        )
        
        # 음성 인식기
        self.recognizer = sr.Recognizer()
    
    def transcribe_audio(self, audio_path: str) -> Dict:
        """오디오 파일 전사"""
        result = self.whisper_model.transcribe(audio_path)
        
        # 세그먼트별 처리
        processed_segments = []
        for segment in result['segments']:
            # 각 세그먼트의 감정 분석
            emotions = self.emotion_analyzer(segment['text'])
            
            processed_segments.append({
                'start': segment['start'],
                'end': segment['end'],
                'text': segment['text'],
                'confidence': segment.get('avg_logprob', 0),
                'emotions': emotions
            })
        
        return {
            'full_text': result['text'],
            'segments': processed_segments,
            'language': result.get('language', 'unknown')
        }
    
    def analyze_audio_features(self, audio_path: str) -> Dict:
        """오디오 특성 분석"""
        # 오디오 로드
        y, sr = librosa.load(audio_path)
        
        # 기본 특성 추출
        features = {
            'duration': len(y) / sr,
            'sample_rate': sr,
            'rms_energy': float(np.mean(librosa.feature.rms(y=y))),
            'spectral_centroid': float(np.mean(librosa.feature.spectral_centroid(y=y, sr=sr))),
            'zero_crossing_rate': float(np.mean(librosa.feature.zero_crossing_rate(y))),
        }
        
        # MFCC 특성
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
        features['mfcc_mean'] = mfccs.mean(axis=1).tolist()
        
        # 침묵 구간 감지
        intervals = librosa.effects.split(y, top_db=20)
        features['speech_segments'] = len(intervals)
        features['silence_ratio'] = 1 - (len(y[intervals]) / len(y)) if len(intervals) > 0 else 1
        
        return features

# 사용 예제
audio_rag = AudioRAGSystem()
transcript = audio_rag.transcribe_audio("audio_file.wav")
features = audio_rag.analyze_audio_features("audio_file.wav")

5.4 테이블 및 구조화된 데이터 RAG

표 형태 데이터의 이해와 질의 응답

구조화된 데이터 처리

📊 테이블 파싱

  • • HTML/PDF 테이블 추출
  • • CSV/Excel 파일 처리
  • • 헤더 및 데이터 구분
  • • 계층적 테이블 처리

🔍 테이블 QA

  • • 자연어 SQL 변환
  • • 테이블 내용 요약
  • • 수치 계산 및 분석
  • • 트렌드 패턴 인식

테이블 RAG 구현

import pandas as pd
import tabula
from transformers import TapexTokenizer, BartForConditionalGeneration
import sqlite3
from typing import List, Dict, Any

class TableRAGSystem:
    def __init__(self):
        # TAPEX 모델 로드 (테이블 QA용)
        self.tokenizer = TapexTokenizer.from_pretrained("microsoft/tapex-large-finetuned-wtq")
        self.model = BartForConditionalGeneration.from_pretrained("microsoft/tapex-large-finetuned-wtq")
        
        # SQL 데이터베이스
        self.db_connection = sqlite3.connect(':memory:')
    
    def extract_tables_from_pdf(self, pdf_path: str) -> List[pd.DataFrame]:
        """PDF에서 테이블 추출"""
        try:
            tables = tabula.read_pdf(pdf_path, pages='all', multiple_tables=True)
            return [table for table in tables if not table.empty]
        except Exception as e:
            print(f"Error extracting tables from PDF: {e}")
            return []
    
    def process_table(self, df: pd.DataFrame, table_name: str) -> Dict:
        """테이블 데이터 처리 및 인덱싱"""
        # 데이터 정제
        df_clean = df.dropna(how='all').fillna('')
        
        # 메타데이터 생성
        metadata = {
            'name': table_name,
            'shape': df_clean.shape,
            'columns': list(df_clean.columns),
            'dtypes': df_clean.dtypes.to_dict(),
            'sample_rows': df_clean.head(3).to_dict('records'),
            'summary_stats': self._get_summary_stats(df_clean)
        }
        
        # SQLite에 테이블 저장
        df_clean.to_sql(table_name, self.db_connection, if_exists='replace', index=False)
        
        # 텍스트 표현 생성 (TAPEX용)
        table_text = self._table_to_text(df_clean)
        
        return {
            'metadata': metadata,
            'table_text': table_text,
            'dataframe': df_clean
        }
    
    def _get_summary_stats(self, df: pd.DataFrame) -> Dict:
        """테이블 요약 통계"""
        stats = {}
        
        # 수치형 컬럼 통계
        numeric_cols = df.select_dtypes(include=['number']).columns
        if len(numeric_cols) > 0:
            stats['numeric_summary'] = df[numeric_cols].describe().to_dict()
        
        # 범주형 컬럼 통계
        categorical_cols = df.select_dtypes(include=['object']).columns
        if len(categorical_cols) > 0:
            stats['categorical_summary'] = {}
            for col in categorical_cols:
                stats['categorical_summary'][col] = df[col].value_counts().to_dict()
        
        return stats
    
    def _table_to_text(self, df: pd.DataFrame) -> str:
        """테이블을 TAPEX 형식 텍스트로 변환"""
        # 헤더
        headers = " | ".join(df.columns)
        
        # 데이터 행들 (상위 10개만)
        rows = []
        for _, row in df.head(10).iterrows():
            row_text = " | ".join([str(val) for val in row])
            rows.append(row_text)
        
        table_text = headers + " " + " ".join(rows)
        return table_text
    
    def answer_table_question(self, table_text: str, question: str) -> str:
        """테이블에 대한 질문 답변"""
        # TAPEX 입력 형식
        inputs = self.tokenizer(
            table=table_text,
            query=question,
            return_tensors="pt",
            max_length=1024,
            truncation=True
        )
        
        # 답변 생성
        outputs = self.model.generate(**inputs, max_length=128)
        answer = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        return answer
    
    def execute_sql_query(self, query: str) -> pd.DataFrame:
        """SQL 쿼리 실행"""
        try:
            return pd.read_sql_query(query, self.db_connection)
        except Exception as e:
            print(f"SQL query error: {e}")
            return pd.DataFrame()

# 사용 예제
table_rag = TableRAGSystem()

# 샘플 데이터
sales_data = pd.DataFrame({
    'Product': ['A', 'B', 'C', 'A', 'B'],
    'Region': ['North', 'South', 'North', 'South', 'North'],
    'Sales': [100, 150, 200, 120, 180],
    'Quarter': ['Q1', 'Q1', 'Q2', 'Q2', 'Q3']
})

# 테이블 처리
processed_table = table_rag.process_table(sales_data, 'sales_data')

# 질문 답변
question = "What is the total sales for Product A?"
answer = table_rag.answer_table_question(
    processed_table['table_text'], 
    question
)
print(f"Question: {question}")
print(f"Answer: {answer}")

5.5 레이아웃 인식 문서 처리

문서의 시각적 구조를 이해하는 고급 처리

레이아웃 인식의 중요성

문서의 레이아웃은 정보의 의미와 맥락을 전달하는 중요한 요소입니다. 제목, 단락, 표, 이미지의 위치와 관계를 이해해야 정확한 정보 추출이 가능합니다.

📄 문서 요소

  • • 제목 및 헤딩 계층
  • • 단락과 텍스트 블록
  • • 표와 목록
  • • 이미지와 캡션
  • • 각주와 참고문헌

🔧 처리 기법

  • • OCR 후 레이아웃 복원
  • • 문서 구조 트리 생성
  • • 의미론적 청킹
  • • 관계 정보 보존
  • • 컨텍스트 인식 임베딩

레이아웃 인식 처리 구현

from transformers import LayoutLMv3Processor, LayoutLMv3ForTokenClassification
import torch
from PIL import Image
import pytesseract
import cv2
import numpy as np
from typing import List, Dict, Tuple

class LayoutAwareRAG:
    def __init__(self):
        # LayoutLMv3 모델 로드
        self.processor = LayoutLMv3Processor.from_pretrained(
            "microsoft/layoutlmv3-base-finetuned-funsd"
        )
        self.model = LayoutLMv3ForTokenClassification.from_pretrained(
            "microsoft/layoutlmv3-base-finetuned-funsd"
        )
        
        # 라벨 매핑
        self.id2label = {
            0: "O",      # Other
            1: "B-HEADER", # Beginning of header
            2: "I-HEADER", # Inside header
            3: "B-QUESTION", # Beginning of question
            4: "I-QUESTION", # Inside question
            5: "B-ANSWER",   # Beginning of answer
            6: "I-ANSWER"    # Inside answer
        }
    
    def extract_layout_info(self, image_path: str) -> Dict:
        """이미지에서 레이아웃 정보 추출"""
        # 이미지 로드
        image = Image.open(image_path).convert("RGB")
        
        # OCR로 텍스트와 바운딩 박스 추출
        ocr_data = pytesseract.image_to_data(
            image, output_type=pytesseract.Output.DICT
        )
        
        # 단어별 정보 구성
        words = []
        boxes = []
        
        for i in range(len(ocr_data['text'])):
            if int(ocr_data['conf'][i]) > 0:  # 신뢰도가 있는 텍스트만
                word = ocr_data['text'][i].strip()
                if word:
                    words.append(word)
                    
                    # 바운딩 박스 (x0, y0, x1, y1) 형식으로 변환
                    x, y, w, h = (ocr_data['left'][i], ocr_data['top'][i], 
                                ocr_data['width'][i], ocr_data['height'][i])
                    boxes.append([x, y, x + w, y + h])
        
        return {
            'image': image,
            'words': words,
            'boxes': boxes
        }
    
    def classify_layout_elements(self, layout_info: Dict) -> List[Dict]:
        """레이아웃 요소 분류"""
        image = layout_info['image']
        words = layout_info['words']
        boxes = layout_info['boxes']
        
        if not words:
            return []
        
        # LayoutLMv3 입력 준비
        encoding = self.processor(
            image, words, boxes=boxes, 
            return_tensors="pt", 
            truncation=True, padding=True
        )
        
        # 모델 추론
        with torch.no_grad():
            outputs = self.model(**encoding)
            predictions = outputs.logits.argmax(-1).squeeze().tolist()
        
        # 예측 결과를 단어에 매핑
        classified_elements = []
        for word, box, pred in zip(words, boxes, predictions):
            classified_elements.append({
                'word': word,
                'box': box,
                'label': self.id2label.get(pred, 'O'),
                'confidence': float(torch.softmax(outputs.logits, dim=-1).max())
            })
        
        return classified_elements
    
    def group_elements_by_structure(self, classified_elements: List[Dict]) -> Dict:
        """구조별로 요소 그룹화"""
        grouped = {
            'headers': [],
            'questions': [],
            'answers': [],
            'others': []
        }
        
        current_element = {'type': None, 'words': [], 'boxes': []}
        
        for element in classified_elements:
            label = element['label']
            
            # B- 태그는 새로운 요소의 시작
            if label.startswith('B-'):
                if current_element['type']:
                    # 이전 요소 저장
                    self._save_grouped_element(current_element, grouped)
                
                # 새 요소 시작
                element_type = label.split('-')[1].lower()
                current_element = {
                    'type': element_type,
                    'words': [element['word']],
                    'boxes': [element['box']]
                }
            
            # I- 태그는 현재 요소의 연속
            elif label.startswith('I-') and current_element['type']:
                current_element['words'].append(element['word'])
                current_element['boxes'].append(element['box'])
            
            # O 태그 또는 새로운 타입
            else:
                if current_element['type']:
                    self._save_grouped_element(current_element, grouped)
                    current_element = {'type': None, 'words': [], 'boxes': []}
                
                if element['word'].strip():  # 빈 문자열이 아닌 경우
                    grouped['others'].append({
                        'text': element['word'],
                        'box': element['box']
                    })
        
        # 마지막 요소 저장
        if current_element['type']:
            self._save_grouped_element(current_element, grouped)
        
        return grouped
    
    def _save_grouped_element(self, element: Dict, grouped: Dict):
        """그룹화된 요소 저장"""
        element_type = element['type']
        text = ' '.join(element['words'])
        
        # 바운딩 박스 병합 (전체 영역)
        boxes = element['boxes']
        if boxes:
            merged_box = [
                min(box[0] for box in boxes),  # min x
                min(box[1] for box in boxes),  # min y
                max(box[2] for box in boxes),  # max x
                max(box[3] for box in boxes)   # max y
            ]
        else:
            merged_box = [0, 0, 0, 0]
        
        target_list = {
            'header': grouped['headers'],
            'question': grouped['questions'],
            'answer': grouped['answers']
        }.get(element_type, grouped['others'])
        
        target_list.append({
            'text': text,
            'box': merged_box,
            'word_count': len(element['words'])
        })
    
    def create_structured_chunks(self, grouped_elements: Dict) -> List[Dict]:
        """구조화된 청크 생성"""
        chunks = []
        
        # 헤더 기반 섹션 구성
        current_section = {
            'header': '',
            'content': [],
            'qa_pairs': []
        }
        
        for header in grouped_elements['headers']:
            # 이전 섹션 저장
            if current_section['header'] or current_section['content']:
                chunks.append(current_section.copy())
            
            # 새 섹션 시작
            current_section = {
                'header': header['text'],
                'content': [],
                'qa_pairs': []
            }
        
        # Q&A 쌍 생성
        questions = grouped_elements['questions']
        answers = grouped_elements['answers']
        
        for i, question in enumerate(questions):
            if i < len(answers):
                qa_pair = {
                    'question': question['text'],
                    'answer': answers[i]['text'],
                    'question_box': question['box'],
                    'answer_box': answers[i]['box']
                }
                current_section['qa_pairs'].append(qa_pair)
        
        # 기타 콘텐츠 추가
        for other in grouped_elements['others']:
            current_section['content'].append(other['text'])
        
        # 마지막 섹션 저장
        if current_section['header'] or current_section['content'] or current_section['qa_pairs']:
            chunks.append(current_section)
        
        return chunks

# 사용 예제
layout_rag = LayoutAwareRAG()

# 문서 이미지 처리
image_path = "document_page.png"
layout_info = layout_rag.extract_layout_info(image_path)
classified_elements = layout_rag.classify_layout_elements(layout_info)
grouped_elements = layout_rag.group_elements_by_structure(classified_elements)
structured_chunks = layout_rag.create_structured_chunks(grouped_elements)

print(f"Extracted {len(structured_chunks)} structured sections")
for i, chunk in enumerate(structured_chunks):
    print(f"Section {i+1}: {chunk['header']}")
    print(f"  - Q&A pairs: {len(chunk['qa_pairs'])}")
    print(f"  - Content items: {len(chunk['content'])}")

5.6 크로스 모달 검색 전략

다양한 모달리티 간 통합 검색

크로스 모달 검색 아키텍처

서로 다른 모달리티의 데이터를 통합하여 더 풍부하고 정확한 검색 결과를 제공합니다.

🔀 모달리티 융합

  • • 조기 융합 (Early Fusion)
  • • 후기 융합 (Late Fusion)
  • • 중간 융합 (Mid-level Fusion)
  • • 적응적 융합

⚖️ 가중치 조정

  • • 쿼리 타입별 가중치
  • • 동적 가중치 학습
  • • 신뢰도 기반 조정
  • • 사용자 피드백 반영

🎯 최적화

  • • 검색 속도 최적화
  • • 메모리 효율성
  • • 품질 vs 성능 균형
  • • 확장성 고려

통합 멀티모달 RAG 시스템

import numpy as np
import torch
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import faiss

@dataclass
class MultiModalQuery:
    text: Optional[str] = None
    image_path: Optional[str] = None
    audio_path: Optional[str] = None
    metadata: Dict[str, Any] = None

@dataclass 
class SearchResult:
    content: Dict[str, Any]
    score: float
    modality: str
    source: str

class UnifiedMultimodalRAG:
    def __init__(self):
        # 각 모달리티별 인코더
        self.text_encoder = None  # SentenceTransformer
        self.image_encoder = None # CLIP
        self.audio_encoder = None # Wav2Vec2
        
        # 각 모달리티별 인덱스
        self.text_index = None
        self.image_index = None
        self.audio_index = None
        
        # 메타데이터 저장소
        self.text_metadata = []
        self.image_metadata = []
        self.audio_metadata = []
        
        # 융합 전략 설정
        self.fusion_strategy = "adaptive"  # "early", "late", "adaptive"
        self.modality_weights = {
            "text": 0.4,
            "image": 0.4, 
            "audio": 0.2
        }
    
    def unified_search(self, query: MultiModalQuery, k: int = 10) -> List[SearchResult]:
        """통합 멀티모달 검색"""
        all_results = []
        
        # 각 모달리티에서 검색
        if query.text and self.text_index is not None:
            text_results = self._search_text(query.text, k)
            all_results.extend(text_results)
        
        if query.image_path and self.image_index is not None:
            image_results = self._search_images(query.image_path, k)
            all_results.extend(image_results)
            
        if query.audio_path and self.audio_index is not None:
            audio_results = self._search_audio(query.audio_path, k)
            all_results.extend(audio_results)
        
        # 크로스 모달 검색
        if query.text:
            # 텍스트로 이미지 검색
            if self.image_index is not None:
                cross_image_results = self._search_images_by_text(query.text, k//2)
                all_results.extend(cross_image_results)
            
            # 텍스트로 오디오 검색
            if self.audio_index is not None:
                cross_audio_results = self._search_audio_by_text(query.text, k//2)
                all_results.extend(cross_audio_results)
        
        # 결과 융합 및 재랭킹
        fused_results = self._fuse_results(all_results, query)
        
        # 상위 k개 결과 반환
        return sorted(fused_results, key=lambda x: x.score, reverse=True)[:k]
    
    def _search_text(self, query: str, k: int) -> List[SearchResult]:
        """텍스트 검색"""
        # 쿼리 임베딩
        query_embedding = self.text_encoder.encode(query).astype('float32')
        
        # 검색
        scores, indices = self.text_index.search(query_embedding.reshape(1, -1), k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx != -1:
                metadata = self.text_metadata[idx]
                results.append(SearchResult(
                    content=metadata,
                    score=float(score),
                    modality="text",
                    source="text_index"
                ))
        
        return results
    
    def _search_images(self, image_path: str, k: int) -> List[SearchResult]:
        """이미지 유사도 검색"""
        # 이미지 임베딩
        query_embedding = self.image_encoder.encode_image(image_path).astype('float32')
        
        # 검색
        scores, indices = self.image_index.search(query_embedding.reshape(1, -1), k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx != -1:
                metadata = self.image_metadata[idx]
                results.append(SearchResult(
                    content=metadata,
                    score=float(score),
                    modality="image",
                    source="image_index"
                ))
        
        return results
    
    def _search_images_by_text(self, text: str, k: int) -> List[SearchResult]:
        """텍스트로 이미지 검색 (크로스 모달)"""
        # CLIP 텍스트 인코딩
        query_embedding = self.image_encoder.encode_text_with_clip(text).astype('float32')
        
        # 이미지 인덱스에서 검색
        scores, indices = self.image_index.search(query_embedding.reshape(1, -1), k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx != -1:
                metadata = self.image_metadata[idx]
                results.append(SearchResult(
                    content=metadata,
                    score=float(score) * 0.8,  # 크로스 모달 페널티
                    modality="image",
                    source="cross_modal_text_to_image"
                ))
        
        return results
    
    def _search_audio(self, audio_path: str, k: int) -> List[SearchResult]:
        """오디오 유사도 검색"""
        # 오디오 임베딩 (구현 필요)
        query_embedding = self.audio_encoder.encode_audio(audio_path).astype('float32')
        
        scores, indices = self.audio_index.search(query_embedding.reshape(1, -1), k)
        
        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx != -1:
                metadata = self.audio_metadata[idx]
                results.append(SearchResult(
                    content=metadata,
                    score=float(score),
                    modality="audio",
                    source="audio_index"
                ))
        
        return results
    
    def _search_audio_by_text(self, text: str, k: int) -> List[SearchResult]:
        """텍스트로 오디오 검색 (크로스 모달)"""
        # 텍스트를 오디오 도메인으로 매핑 (구현 필요)
        # 실제로는 CLAP (Contrastive Language-Audio Pre-training) 모델 사용
        return []  # 임시 구현
    
    def _fuse_results(self, results: List[SearchResult], query: MultiModalQuery) -> List[SearchResult]:
        """결과 융합"""
        if self.fusion_strategy == "late":
            return self._late_fusion(results, query)
        elif self.fusion_strategy == "adaptive":
            return self._adaptive_fusion(results, query)
        else:
            return results  # 기본적으로는 그대로 반환
    
    def _late_fusion(self, results: List[SearchResult], query: MultiModalQuery) -> List[SearchResult]:
        """후기 융합: 각 모달리티 점수에 가중치 적용"""
        fused_results = []
        
        for result in results:
            # 모달리티별 가중치 적용
            weighted_score = result.score * self.modality_weights.get(result.modality, 1.0)
            
            # 쿼리와의 모달리티 일치도 보너스
            if (query.text and result.modality == "text") or                (query.image_path and result.modality == "image") or                (query.audio_path and result.modality == "audio"):
                weighted_score *= 1.2  # 일치 보너스
            
            fused_result = SearchResult(
                content=result.content,
                score=weighted_score,
                modality=result.modality,
                source=result.source
            )
            fused_results.append(fused_result)
        
        return fused_results
    
    def _adaptive_fusion(self, results: List[SearchResult], query: MultiModalQuery) -> List[SearchResult]:
        """적응적 융합: 쿼리 특성에 따라 가중치 동적 조정"""
        # 쿼리 복잡도 분석
        query_modalities = []
        if query.text:
            query_modalities.append("text")
        if query.image_path:
            query_modalities.append("image")
        if query.audio_path:
            query_modalities.append("audio")
        
        # 동적 가중치 계산
        dynamic_weights = self._calculate_dynamic_weights(query_modalities, results)
        
        fused_results = []
        for result in results:
            # 동적 가중치 적용
            weighted_score = result.score * dynamic_weights.get(result.modality, 1.0)
            
            fused_result = SearchResult(
                content=result.content,
                score=weighted_score,
                modality=result.modality,
                source=result.source
            )
            fused_results.append(fused_result)
        
        return fused_results
    
    def _calculate_dynamic_weights(self, query_modalities: List[str], results: List[SearchResult]) -> Dict[str, float]:
        """동적 가중치 계산"""
        weights = self.modality_weights.copy()
        
        # 쿼리에 포함된 모달리티에 높은 가중치
        for modality in query_modalities:
            weights[modality] *= 1.5
        
        # 검색 결과의 분포에 따른 조정
        modality_counts = {}
        for result in results:
            modality_counts[result.modality] = modality_counts.get(result.modality, 0) + 1
        
        total_results = len(results)
        for modality, count in modality_counts.items():
            # 결과가 적은 모달리티에 보정 가중치
            if count < total_results * 0.1:  # 10% 미만
                weights[modality] *= 1.3
        
        return weights
    
    def generate_multimodal_response(self, query: MultiModalQuery, 
                                   search_results: List[SearchResult]) -> str:
        """멀티모달 검색 결과 기반 응답 생성"""
        # 모달리티별 결과 그룹화
        grouped_results = {}
        for result in search_results[:5]:  # 상위 5개만 사용
            modality = result.modality
            if modality not in grouped_results:
                grouped_results[modality] = []
            grouped_results[modality].append(result)
        
        # 응답 구성
        response_parts = []
        
        # 텍스트 정보
        if "text" in grouped_results:
            text_info = [r.content.get('text', '') for r in grouped_results["text"]]
            response_parts.append(f"Based on relevant documents: {' '.join(text_info[:2])}")
        
        # 이미지 정보
        if "image" in grouped_results:
            image_count = len(grouped_results["image"])
            response_parts.append(f"Supported by {image_count} relevant images")
        
        # 오디오 정보
        if "audio" in grouped_results:
            audio_count = len(grouped_results["audio"])
            response_parts.append(f"Including {audio_count} audio references")
        
        return ". ".join(response_parts) + "."

# 사용 예제
multimodal_rag = UnifiedMultimodalRAG()

# 멀티모달 쿼리
query = MultiModalQuery(
    text="Show me information about cats",
    image_path="query_cat.jpg",
    metadata={"user_preference": "visual_heavy"}
)

# 통합 검색
results = multimodal_rag.unified_search(query, k=10)

# 응답 생성  
response = multimodal_rag.generate_multimodal_response(query, results)
print(f"Generated response: {response}")

Chapter 5 요약

핵심 학습 내용

  • 1. CLIP 기반 이미지-텍스트 RAG
  • 2. 비디오 검색 및 요약 시스템
  • 3. 오디오/음성 데이터 처리
  • 4. 테이블 및 구조화된 데이터 RAG
  • 5. 레이아웃 인식 문서 처리
  • 6. 크로스 모달 검색 전략

실습 성과

  • ✅ 멀티모달 임베딩 시스템
  • ✅ 크로스 모달 검색 구현
  • ✅ 비디오/오디오 처리 파이프라인
  • ✅ 테이블 QA 시스템
  • ✅ 레이아웃 인식 문서 파서
  • ✅ 통합 멀티모달 RAG