파트너십

TwelveLabs와 Weaviate의 RAG 라이브러리를 활용한 비디오 처리 시간 단축

제임스 러

개발자는 Twelve Labs의 Embed API와 Weaviate를 활용하여 비디오 RAG 시스템을 구축할 수 있습니다. 이를 통해 비디오 전체를 쿼리할 때와 비교하여, Pegasus를 통한 답변의 정확도는 그대로 유지하면서 분석 실행 전에 가장 관련성이 높은 비디오 세그먼트만 검색함으로써 처리 시간을 72초에서 20초로 단축할 수 있습니다.

개발자는 Twelve Labs의 Embed API와 Weaviate를 활용하여 비디오 RAG 시스템을 구축할 수 있습니다. 이를 통해 비디오 전체를 쿼리할 때와 비교하여, Pegasus를 통한 답변의 정확도는 그대로 유지하면서 분석 실행 전에 가장 관련성이 높은 비디오 세그먼트만 검색함으로써 처리 시간을 72초에서 20초로 단축할 수 있습니다.

목차

No headings found on page

뉴스레터 구독하기

뉴스레터 구독하기

영상 이해 분야의 최신 기술 업데이트, 튜토리얼 및 인사이트를 받아보세요.

영상 이해 분야의 최신 기술 업데이트, 튜토리얼 및 인사이트를 받아보세요.

AI로 영상을 검색하고, 분석하고, 탐색하세요.

2025. 3. 18.

25분

링크 복사하기

초안을 검토해 주신 Weaviate 팀의 Tuana Celik 님과 Erika Cardenas 님께 깊은 감사를 드립니다!



비디오 처리는 연산 비용이 많이 들고 시간이 오래 걸리는 작업입니다. 특히 장편 콘텐츠를 분석할 때는 더욱 그렇습니다. 검색 증강 생성(RAG)은 시스템이 전체 비디오가 아닌 가장 관련성 높은 비디오 세그먼트만 처리하도록 함으로써 이 문제를 해결합니다. 이러한 타겟팅된 접근 방식은 응답 품질을 유지하거나 향상시키면서도 처리 시간을 크게 단축합니다.

이 포스트에서는 Twelve Labs의 비디오 이해 기능Weaviate의 벡터 데이터베이스를 결합하여 비디오 콘텐츠를 위한 효율적인 RAG 시스템을 구축하는 방법을 살펴보겠습니다. 비디오를 세그먼트로 나누고 임베딩을 사용하여 분석에 가장 관련성 높은 부분만 검색함으로써, 정확도를 유지하거나 오히려 향상시키면서도 처리 시간을 크게 줄일 수 있습니다.

우리의 접근 방식은 몇 가지 핵심 기술을 활용합니다:

  • 비디오 이해 및 임베딩 생성을 위한 TwelveLabs Pegasus 및 Marengo 모델

  • 비디오 세그먼트의 효율적인 저장 및 검색을 위한 Weaviate 벡터 데이터베이스

  • 비디오 분석의 비교군 역할을 할 오픈 소스 LLaVA-NeXT-Video 모델

이 RAG 기반 접근 방식이 가장 관련성 높은 세그먼트에만 집중하여 비디오 처리의 연산 부하를 어떻게 줄일 수 있는지 보여드릴 것입니다. 이를 통해 더 긴 비디오를 한층 효율적으로 분석할 수 있게 됩니다. 콘텐츠 중재, 스포츠 분석, 교육용 콘텐츠 중 어떤 애플리케이션을 개발하든 관계없이, 이 접근 방식은 고품질의 결과를 유지하면서 비디오 처리 기능을 확장하는 데 도움을 줄 것입니다.



1 - TwelveLabs 및 Weaviate 설정하기



TwelveLabs

아직 Twelve Labs에 가입하지 않으셨다면 여기에서 가입하실 수 있습니다. 계정 설정이 완료되면 Playground로 이동하여 화면 오른쪽 상단의 사용자 아이콘을 클릭한 뒤 API Key 페이지로 이동하세요.

작업 중인 노트북의 왼쪽에서 열쇠 아이콘을 클릭하고, 이 값을 TL_API_KEY라는 이름의 시크릿(Secret)으로 생성합니다.



Weaviate

Weaviate 계정이 없다면 여기에서 가입하실 수 있습니다. 계정이 생성되면 클라우드 대시보드로 이동하여 새 클러스터를 생성하세요. 클러스터 설정이 완료되면 노트북 시크릿 섹션에 두 가지 값을 입력해야 합니다.

REST Endpoint 아래의 URL을 WEAVIATE_URL 변수에 추가합니다. API Keys 아래의 Admin 키를 복사하여 WEAVIATE_API_KEY에 저장합니다.



2 - GPU 런타임 선택하기

LLaVA-NeXT-Video 모델을 실행하려면 GPU가 필요합니다. 노트북에서 런타임 > 런타임 유형 변경으로 이동하여 T4 GPU를 선택하세요.



3 - 환경 설정하기



의존성 라이브러리 설치

먼저 TwelveLabs와 Weaviate SDK를 설치해야 합니다:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

그런 다음 나머지 필요한 패키지들을 설치합니다.

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m



TwelveLabs 및 Weaviate SDK 설정

from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

그 다음 TwelveLabs 클라이언트를 초기화합니다.

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

마지막으로, Weaviate 클라이언트를 설정하고 Video_Embeddings 컬렉션을 초기화합니다.

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")



비디오 데이터 설정하기

이제 임베딩을 위한 비디오 데이터를 준비해야 합니다. 비디오 데이터는 이 링크의 구글 드라이브 폴더에서 확인할 수 있습니다. 이 파일들을 사용자의 구글 드라이브 기본 폴더에 "TwelveLabs-Weaviate"라는 이름의 폴더를 만들어 복사하세요. 아래 셀을 실행하면 구글 드라이브를 마운트하고 노트북에서 비디오 파일에 접근할 수 있게 됩니다.

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"



비디오 업스케일링

보유한 비디오 중 일부는 임베딩 모델에 사용하기에 해상도가 너무 낮습니다. 따라서 사용하기 전에 해상도를 높여주는(업스케일링) 작업이 필요합니다.

여기에 업스케일링 함수를 생성해 줍니다. read_video_pyav 함수는 LLaVa-NeXT-Video 콜랩 노트북에서 가져온 것으로, 비디오를 추론에 적합한 형태의 numpy 배열로 포맷팅해 줍니다.

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

raw_video_dir에 있는 비디오들을 가져와 업스케일링한 뒤 upscaled_video_dir에 저장합니다.

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)



4 - 단일 비디오에서 Pegasus와 LLaVa-NeXT-Video 비교하기

PegasusLLaVa-NeXT-Video는 비디오를 입력하고 관련 질문을 던질 수 있는 비디오 이해(Video Understanding) 모델입니다.

먼저 우리가 가진 비디오 컬렉션 중 단일 비디오 하나를 사용하여 Pegasus와 LLaVa-NeXT-Video의 성능을 비교해 보겠습니다. 이 비디오는 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠가 맞붙은 제42회 슈퍼볼의 한 장면을 담고 있습니다. 경기 종료 2분을 남겨두고 자이언츠의 쿼터백 일라이 매닝이 던진 패스를 리시버 데이비드 타이리가 자신의 헬멧에 공을 밀착시켜 극적으로 잡아낸, 일명 '헬멧 캐치(Helmet Catch)'로 불리는 역사적인 순간을 보여주는 영상입니다.

비디오에 대한 맥락을 파악했으니, 이제 두 모델에게 "이 비디오에서 무슨 일이 일어나고 있나요?"라는 질문을 던져 비디오를 얼마나 잘 이해하는지 비교 확인해 보겠습니다.



Pegasus로 비디오와 대화하기

시작하기 앞서, 비디오를 저장할 Pegasus 인덱스를 하나 생성해야 합니다.

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

그 다음 해당 인덱스에 비디오를 업로드하는 함수를 작성합니다. 업로드가 완료되면 질문을 할 때 사용할 Pegasus 비디오 ID가 반환됩니다.

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

비디오를 업로드하고 Pegasus 비디오 ID를 single_video_id 변수에 저장해 둡니다.

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

Pegasus가 비디오를 제대로 이해했는지 확인하기 위해, "이 비디오에서 무슨 일이 일어나고 있나요? 간결하게 답변해 주세요."라고 질문해 보겠습니다.

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasus는 다음과 같이 답변했습니다:

비디오는 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠 간의 미시축구 경기에서 결정적인 순간을 보여줍니다. 자이언츠의 쿼터백 일라이 매닝이 던진 패스를 데이비드 타이리가 경기장 밖으로 쓰러지면서 공을 자신의 헬멧에 고정한 채 아주 환상적으로 받아냅니다. 다양한 앵글로 이 캐치 장면이 리플레이되면서 극적인 순간의 난이도와 정밀함이 강조됩니다. 타이리는 플레이 직후 짧게 세레머니를 하고, 비디오는 타이리와 다른 선수들이 경기장을 벗어나는 모습으로 끝이 납니다.

답변을 통해 Pegasus가 비디오를 매우 깊이 있게 이해하고 있음을 알 수 있습니다. 이 경기가 자이언츠와 패트리어츠 간의 풋볼 경기라는 점을 파악했고, 일라이 매닝이 패스를 던졌으며 데이비드 타이리가 이를 받아낸 장면이 경기의 결정적인 순간이었음을 정확히 짚어냈습니다.

Pegasus가 이 경기가 슈퍼볼 경기라는 것을 직접 언급하지 않았으므로, 확인차 관련 질문을 추가로 해보겠습니다.

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

그러자 Pegasus는 정확하게 이 경기는 제42회 슈퍼볼(Super Bowl XLII) 경기입니다.라고 답했습니다.

이제 LLaVa-NeXT-Video 모델은 이 비디오를 얼마나 잘 이해하는지 비교 확인해 보겠습니다.



LLaVa-NeXT-Video로 비디오와 대화하기

LLaVa-NeXT-Video의 경우, 추론을 실행하기 전에 비디오 데이터를 특정 규격의 포맷으로 준비해 주어야 합니다. 이 모델은 전체 비디오 스트림을 한 번에 처리하지 않기 때문에 비디오 전반에서 균일하게 프레임을 추출(샘플링)하는 과정이 필요합니다. 각 비디오에서 40개의 프레임을 고르게 추출하는 샘플링 함수를 제작하여 비디오 전체의 핵심 순간들을 빠짐없이 포착할 수 있도록 하겠습니다. 이 샘플링 기법은 LLaVA-NeXT-Video 공식 구현 코드를 응용하여 만들었습니다. 샘플링이 완료되면 허깅페이스 허브(Hugging Face Hub)에서 모델을 불러온 뒤, 형식에 맞게 입력을 변환하여 추론을 실행하고 응답을 생성해 보겠습니다.

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

비디오 샘플링이 올바르게 완료되면, 추론을 위해 모델을 구성합니다.

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

그 다음, 모델에 유저 질문을 전송해 추론하는 헬퍼 함수를 구현합니다.

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

마지막으로, Pegasus와 직접 비교를 해보기 위해 동일한 질문을 던져보겠습니다.

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

그 결과, 다음 응답을 얻었습니다:

What is happening in this video? Be concise ASSISTANT: 비디오는 경기장에 여러 선수들이 있는 풋볼 경기를 보여주고 있습니다. 저지 번호와 일부 선수들이 착용한 옛날식 헬멧으로 판단하건대 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠 간의 '제3회 슈퍼볼(Super Bowl III)' 경기인 것 같습니다. 한 선수가 동작 중에 공을 잡고 다른 선수에게 태클을 당하고 있으며, 심판은 퍼스트 다운 사인을 보내고 있습니다. 코치와 다른 경기 관계자들도 보입니다.

이 모델도 자이언츠와 패트리어츠 간에 미식축구 경기가 열리고 있다는 사실은 감지해 냈으나, 뜬금없이 이 경기를 '제3회 슈퍼볼'로 비정상 지목해 버렸습니다. 또한 이 비디오의 핵심이자 눈여겨볼 장면인 '헬멧 캐치'에 대한 내용은 깔끔하게 누락했습니다.

다시 한 번 "이 경기는 무슨 경기인가요?"라는 질문을 던져 보겠습니다. Pegasus에 근접하기는 했지만 여전히 잘못된 오류 답변을 출력합니다.

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: 제공해주신 비디오는 현재 진행 중인 풋볼 경기의 영상으로, 구체적으로는 제41회 슈퍼볼(Super Bowl XLI)의 한 장면입니다. 뉴잉글랜드 패트리어츠 대 뉴욕 자이언츠의 대결입니다. 화면 속의 선수들은 자이언츠와 패트리어츠 소속입니다.



5 - 단일 비디오에서 세그먼트 단위 쿼리를 위한 RAG 적용하기

전체 비디오 분석에 있어서는 Pegasus가 LLaVa-NeXT-Video보다 더 빠르고 일관된 호환성이 높은 답변을 출력하며, 품질 측면에서도 훨씬 압도적인 퍼포먼스를 보여줍니다.

하지만 비디오 분석의 초점을 가장 밀접한 일부 세그먼트 영역으로만 좁혀주는 구조가 마련된다면, 분석 모델들의 전반적인 품질을 끌어올릴 수 있습니다. 여기서 RAG(검색 증강 생성)의 가치가 유용하게 빛을 발합니다. 비디오 전체를 무식하게 다 다루기보단, 사용자가 질문한 내용에 대한 힌트 정보가 포함된 세그먼트만 귀신같이 찾아내 국소적으로 분석을 제공하는 것입니다.

이 파이프라인을 구현하기 위해 우리는 Marengo 모델을 빌려 쓸 것입니다. 이는 비디오 세그먼트의 의미 단위 데이터를 래핑하여 완성도 높은 고품질 벡터 임베딩으로 뽑아내 주는 전용 특화 모델입니다. 이 임베딩을 구성하면 다음 플로우가 가능해집니다:

  1. 전체 긴 비디오에서 조각난 각 세그먼트별로 독립적인 임베딩 인덱스를 만듭니다.

  2. 유저 질문(문장) 정보를 파싱해 그것과 극도로 유사한 특정 비디오 세그먼트를 매칭해 냅니다.

  3. 적중률이 가장 높은 타겟 세그먼트 한두 개만 콕 집어 우리의 최종 분석 모델에 먹여 연산합니다.

그럼 이제 비디오를 일정한 구간 세그먼트로 잘라 조각내고, Marengo 모델을 사용해 각 조각들의 임베딩 벡터를 뽑아보겠습니다. 이 임베딩 세트가 우리의 RAG 시스템의 든든한 뼈대가 될 것입니다.



Marengo를 활용한 전체 비디오 및 비디오 컷 임베딩 생성

Marengo 모델이 잘라 낼 수 있는 최대 길이 제한인 10초 단위로 세그먼트 길이를 지정하겠습니다.

# Define the video segment length
segment_length = 10

그 다음 Marengo를 작동시켜 임베딩을 생성합니다. 분석할 때 video_embedding_scopes=["clip", "video"]video_clip_length=segment_length 옵션을 부여하는 것에 유의해 주시기 바랍니다. 이렇게 세팅해야 Marengo가 전체 비디오에 대칭되는 통합 임베딩과 더불어 자정해 둔 10초 단위 마이크로 클립들을 일괄 연산하여 각각의 개별 임베딩 세트로 반환해 줍니다.

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

임베딩 태스크 처리가 끝나면 나중에 해당 임베딩을 언제든 꺼내올 수 있도록 Marengo Task ID를 로컬에 보관해 두어야 합니다. 추후 Weaviate 데이터베이스에 업로드할 때 매칭할 수 있도록 marengo_task_ids 사전에 이를 매핑 정리해 두겠습니다.

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id



RAG용 비디오 세그먼트 가공 환경 설계

에러 없는 유기적인 RAG 파이프라인 처리를 완성하려면, 데이터베이스 내부에서 Pegasus가 발급한 Video ID와 방금 만든 Marengo의 Task ID를 정확히 엮어 두어야 합니다. 이렇게 한 묶음으로 설계해야 벡터 데이터베이스 탐색에서 매칭된 세그먼트에 대해 Pegasus 모델에 바로 질문을 이어 던질 수 있습니다. 이를 구축하기 위해 조각낸 미니 세그먼트 비디오 파일들을 Pegasus 인덱서로도 빠짐없이 전부 업로드해 줍니다.

가장 먼저 비디오를 10초 규격의 개별 조각 파일들로 균일하게 분할해 내는 split_video 유틸 함수를 준비합니다. 이때 Pegasus 인덱서에 업로드할 수 있는 최소 비디오 조건인 '4초 이상' 스펙을 어기지 않도록 각별히 방어 조치를 해야 합니다. 비디오를 다 잘라내고 남은 자투리 맨 마지막 조각이 혹시라도 5초 미만으로 비정상 짧은 경우엔, 그 직전 세그먼트 영역과 서로 겹치게 프레임을 넉넉히 교차 결합해 주어 비디오 분할이 안전하게 5초 이상 길이를 갖추도록 세이브 기믹을 적용해 구현했습니다.

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

지정해 둔 전용 세그먼트 데이터 저장 폴더인 video_segments_dir 경로에 분할 비디오들을 적재시킵니다.

split_video(single_video_file, video_segments_dir,segment_length)

파일명과 매칭되는 Pegasus Video ID 값을 담아 둘 매핑용 빈 사전인 pegasus_video_ids를 초기 선언하고 먼저 원본 전체 비디오의 고유 ID 정보부터 삽입해 놓겠습니다.

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

이제 준비된 미니 세그먼트용 비디오 파일들을 차례대로 Pegasus 데이터 인덱서로 업로드 전송 처리하여 리턴된 고유 ID 값들로 pegasus_video_ids 매핑용 데이터 사전을 완성해 줍니다.

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

마지막으로, 비교 모델인 LLaVa-NeXT-Video에서도 분석이 매끄럽게 수행될 수 있도록 전체 세그먼트 영상들을 미리 균일 프레임 샘플링 구조로 컨버전(배열 변환) 적용해 놓는 사전 로드 빌드 작업을 처리해 둡니다.

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video



Weaviate에 임베딩 업로드하기

Weaviate는 컬렉션으로 다룰 때 메타데이터 객체와 실제 탐색에 쓰일 임베딩 벡터 리스트를 분리해서 처리하는 방식을 권장합니다. 이에 대칭되는 Weaviate 업로드 셋을 만들기 위해, Marengo Task ID 값과 이에 매칭되는 Pegasus Video ID 정보를 전달받아 규격화된 records 리스트와 이에 상응하는 vectors 배열 세트로 파싱 가공해 내는 prepare_marengo_embeddings_for_weaviate 유틸 변환 함수를 구성하겠습니다.

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

그 다음 작성이 완료된 해당 파이프라인 컴포넌트 변환 동작 함수를 실행하여 도출된 정보 레코드와 임베딩 벡터 리스트들을 Weaviate 서버 대시보드로 바인딩 전송해 줍니다.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



벡터 검색 테스트하기

보유 중인 모든 리소스를 수집 데이터베이스 인스턴스에 집어넣었으므로, 이제 원하는 풋볼 정보 검색 쿼리에 최적화되어 대응하는지 Weaviate의 near_vector 벡터 거리가 유사하게 동작하는지 성능 테스트를 실행해 보겠습니다. 우리가 가공해 넣어둔 임베딩 세트의 임의 인텍스 벡터를 쿼리에 주입하면 본인 자신과의 벡터 연산 코사인 유클리드 거리가 완전히 0에 가깝게 딱 정방향 인출되어야 맞습니다.

임의로 타겟팅한 5번 벡터 변환 샘플값으로 가상 쿼리를 태우면 리턴된 응답 벡터 셋에서 자기 자신에 준하는 비디오 정보를 최우선 거리값 0의 레코드로 보여줄 것입니다.

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

동일한 임시 쿼리 작동으로 우리 DB 내부의 고유 벡터 레코드가 거리 0인 환상의 무결점 상태로 검색되어 반환됨을 목격했습니다.




RAG 설계를 위한 타겟 세그먼트 영상 인출 및 검증

우리가 공들여 빌드한 이번 비디오 RAG 시스템의 핵심 역량은, 클라이언트에게서 인입된 자연어 질문에 반응하고 알맞은 영상 레코드를 매칭 인출해 주는 프로세스에 있습니다. 전체적인 처리는 아래 3단계로 핵심 처리됩니다.

  1. TwelveLabs의 전용 임베딩 모델인 Marengo로 고객의 질문(텍스트) 정보를 분석하고 벡터 임베딩으로 번환 처리합니다.

  2. 생성된 질의 벡터와 DB에 미리 구축해 둔 타겟 미니 영상 고유 벡터 셋들 간 최인접 유클리드 서치를 탐색해 우수한 매칭값을 가려냅니다.

  3. 최적의 거리값으로 채택된 비디오 세그먼트를 추출하고, 이 영상에 매달려 있는 Pegasus 고유 ID를 조회해 타겟 조각 구간에 대칭되는 정밀한 답변을 합성 제공하도록 모델을 실행합니다.

이러한 정밀한 설계를 사용하면 방대한 영상에서 쓸모없는 연산 낭비를 가차 없이 패스하고, 질문 내용에 매칭되는 국소 구간만 초고속 분석 처리해 답변 전달 품질과 리소스 세이빙 비용을 엄청나게 끌어올릴 수 있습니다.

첫 번째 단계로 유저의 한글 영문 질문 데이터를 임베딩 쿼리로 인코딩합니다.

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

그 다음 Weaviate 내부에서 쿼리 임베딩과 가장 극단적으로 가까이에 인접 배치된 우수 타겟 세그먼트 단품 조각을 찾아 내겠습니다. filters=(Filter.by_property("type").equal("clip"))로 소조건을 세팅함으로써, 무겁고 모호한 전체 비디오 통합 임베딩들은 한꺼번에 배제하고 알맹이들만 타겟 인출되도록 한정 지어 성능을 보강합니다.

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

탐색 서치가 멋지게 성공하여 4번째 구간 세그먼트 데이터 파일인 football_480_segment_003.mp4 레코드를 탁월한 스펙으로 가져왔습니다.

실제 적중한 영상 세그먼트를 눈등으로 간략 매칭 시각화해 보겠습니다.

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

확인 결과, 전체 시간 축 프레임 중 타이리 선수가 실제 헬멧을 지탱 삼아 볼을 꽉 거머죄는 절정에 달한 정확한 찰나의 캐칭 파트 세그먼트 본체임을 한눈에 가려낼 수 있습니다.

우리가 의도했던 대로 완벽한 세그먼트 조각을 타겟팅했다는 사실을 확신했으니, 이제 이 짧은 조각 구간에서 두 모델이 어느 정도의 분석 퀄리티 차이를 출력하는지 직접 배틀을 시켜 연동해 보겠습니다.



매칭된 미니 세그먼트로 비디오 대화 결과 격돌: Pegasus 대 LLaVa-NeXT-Video

우선 TwelveLabs의 고급 Pegasus 추론 모델 답변 빌드 결과를 관찰해 줍니다.

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

데이비드 타이리가 공을 잡기 위해 사용한 캐칭 기술은 무엇인가요?

데이비드 타이리는 패스된 미식축구 공을 안전하게 확보하기 위하여 자신의 머리에 씌여진 헬멧 겉면에 공 부위를 거세게 내리눌러 밀착 고정하는 독창적인 임기응변 동작 기술을 펼쳤습니다. 이 시도는 자이언츠의 패스 연결 권한을 경이롭게 계속 지속시키는 데 최고의 공을 세운 역사적 하이라이트 플레이로 세이브되었습니다.

질의 요지를 통달한 것처럼, 타이리의 헬멧 위 밀착 볼 컨트롤 동작을 예술적으로 해설해 주며 경기 흐름상 기여도까지 일타 쌍피로 엮어 기품 있는 완벽한 답변을 쏟아냈습니다.

이번에는 부하가 덜해진 짧아진 똑같은 세그먼트 조각에 대하여 오픈소스 비교 구동 모델인 LLaVa-NeXT-Video는 전보다 더 영리한 답변을 만들어 내는지 연쇄 태워보겠습니다.

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: 공을 캐치하려는 모션을 보인 미식축구 선수는 양손을 사용해 머리 위 높은 영역에서 안전하게 포구하는 오버헤드 투핸드 포그립 낙하포착 기술을 활용해 볼을 제어하려 한 것으로 목격됩니다. 이 기교는 디펜더의 인터셉트 위협을 무력화하며 머리꼭대기 높이에 뜬 공을 잡아채는 유용한 포구 루트의 일종이나 밸런스를 잃고 미끄러지면 대단히 큰 패스 실패로 이어질 소지도 있습니다.

공의 비행 궤적에 따른 양손의 일반적인 물리 오버헤드 포지셔닝에 국한해 무난 무난한 답변만을 생성하는 데 그쳤습니다. 우리가 눈여겨봤던 독창적인 트레이드 마크 시그니처 동작인 '헬멧 밀착 캐치' 부분은 이번 마이크로 분석에서도 기대를 빗나가고 포착에 실패한 아쉬운 결과를 보여주었습니다.



6 - 다중 비디오 환경에서 Marengo, Weaviate 및 Pegasus 멀티 RAG 가동하기

단독 미시 단위 비디오 임베딩 구간 대칭 동작을 실컷 경험해 봤으니, 이제 서비스 레벨에서 멀티플 플레이로 가장 빈번하게 빌드 요청을 넣는 다중 비디오 분산 환경에서의 고기능성 멀티 RAG 실전 구현 단계를 보여드리겠습니다.



전체 비디오 풀에 대응하도록 Marengo 인덱싱 일괄 전개

그동안 준비해 뒀던 전체 추가 비디오 컬렉션들에 대한 Marengo 임베딩 정보를 통합 맵 marengo_task_ids 사전에 루핑 전개하여 한 세트로 관리 인덱싱 처리하겠습니다.

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id



추가 확장 비디오 풀 대상 세그먼트 분할 전개

그 외 업스케일 가공 완료된 연계 스포츠 비디오들도 기존 처리했던 방식과 마찬가지로 전부 규격화된 10초 세그먼트 데이터 조각으로 쪼개 놓아 RAG 적용 자격을 부여합니다.

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)



나머지 다중 영상 전부에 대한 Pegasus 고유 Video ID 일괄 전송 바인딩

나머지 파편화된 다중 조각 비디오 채널 전체의 Pegasus Video ID 리스트를 취득하겠습니다. 이 과정은 대기 딜레이를 많이 깎아내기 위해서 고속 멀티 락 패럴렐(병렬 스레드 처리) 호출 기법을 사용해 단번에 완성해 줍니다.

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")



최종 미싱 데이터 세트 Weaviate 일괄 데이터 바인딩

완벽하게 준비된 2단계 멀티 비디오 클립 가공 소스들을 Weaviate의 내부 클러스터 영역으로 일괄 주입 연동합니다.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



RAG 성능 평가 지표: 마이크로 클립 검색 대 전체 비디오 직접 질문 비교

드디어 Weaviate 공간 내부에 모든 멀티 비디오에 대한 Marengo의 특징 벡터 및 Pegasus 매핑용 ID 컬렉션의 세팅을 완벽히 마쳤습니다. 이제 RAG 시스템 성능 평가를 시작해 보겠습니다. 이 테스트는 두 가지 극단적인 주요 성향 포인트를 관측할 것입니다.

  1. 답변의 핵심 적중력(Answer Quality): 클립 조각 기반의 협소한 RAG 탐색과 전체 통합 비디오를 생으로 쿼리했을 때 도출되는 응답 정보 결과의 정확함 차이가 어떻게 전개되는가?

  2. 시간 연산 절약성(Processing Efficiency): 두 방식 간에 시간 소요 격차나 리소스를 낭비하는 딜레이 측면에 어떠한 실전 마일스톤 메리트가 드러나는가?

우리는 서로 다른 성질의 질문 묶음을 던져, 각기 최단 핏을 달성하는 세그먼트 영상 컷을 유도한 케이스와 생비디오를 주입한 케이스의 성능 소요치를 지표로 얻을 것입니다. 이를 목도하면, 비디오 재생 분량이 한정 없이 늘어나도 똑똑한 RAG 구성이 불필요한 계산을 원천 배제하여 뛰어난 퍼포먼스 가치를 달성한다는 것을 자명하게 입증받을 수 있습니다.

다양한 종목의 스포츠들이 넓게 포진된 테스트용 질의응답 세트 목록을 정의해 봅니다.

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]



TwelveLabs Pegasus를 이용한 멀티 비디오 RAG

가장 먼저, 다이렉트로 전체 비디오를 상대로 질의를 대여 처리한 케이스의 소요 수치를 검산해 봅니다.

from weaviate.classes.query import MetadataQuery, Filter
import time

pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

다음으로 정교하게 세그먼트 데이터 컷을 타겟팅하여 뽑아서 RAG 연산을 적용한 결과를 추적 관찰합니다.

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

조각 세그먼트 영상만을 발췌해 RAG를 전개한 방식과 단순 생비디오를 집어넣은 경우 생성된 실시간 응답 퀄리티 비교를 제공해 드리겠습니다.

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

목격하신 것과 같이 생성해 낸 최종 정보의 퀄리티는 대단히 수려하고 서로 어깨를 나란히 할 정도로 준수한 모습을 유지합니다. 하지만 연산 시간에서 세그먼트 RAG 전략은 단 20초 만에 완수된 반면, 전체 영상을 순회 구동하면 72초라는 엄청난 리소스 지연 차이가 벌어졌습니다.




LLaVa-NeXT-Video 모형 기반 유사 다중 RAG 구동

이번 단계로는 LLaVa-NeXT-Video 오픈소스 모델에 위의 멀티 비디오 RAG 설계를 연속 태워보겠습니다. 그전에 비교 동작에 필요한 다중 컷 샘플링 프레임 생성부터 일괄 완수해 늘어놓습니다.

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

우선 통비디오 다이렉트 패싱 연동의 결과물 빌드부터 시작하겠습니다.

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

곧이어 RAG 클립 기반 추론 응답 연산으로 전개해 수치를 확인합니다.

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

이 테스트 세팅에선 두 빌드 실행 시간이 완전히 동률로 뽑혔습니다. 이 현상이 나온 타당한 이유는 비디오 연장 길이 차이와 상관없이 일률적으로 40점씩 고정 프레임만 드롭 샘플링해서 GPU 메모리에 올리기 때문입니다.

이제 LLaVa-NeXT-Video 모형이 출력해 낸 조각 클립 분석본과 통비디오 다이렉트 분석본의 텍스트 답변 내용을 살펴보겠습니다.

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

오픈소스 챗 지향 모델인 LLaVa-NeXT-Video는 클립 세그먼트 RAG를 통과시켰을 때 총 5개의 질의 중 약 2개 부분에서 올바른 과녘 정답을 타격해 냈습니다.

  1. 첫째 질문 대전 팀 정보에서, 뉴잉글랜드와 뉴욕 자이언츠가 올바르게 대결 중인 구도 맥락을 선방하여 정확히 감지했습니다.

  2. 세 번째 질판 스포츠 정보에서도, 전설적인 테니스 거장 페더러와 조코비치가 맞닥뜨린 대전 상황임을 정확히 추적 매칭했습니다.




7 - 결론: TwelveLabs 및 Weaviate 기반 고효율 비디오 RAG 구축 전략 전략과 가치 요약

오늘 마친 비디오 RAG(검색 증강 생성) 파이프라인의 실전 빌드 테스트 결과는 정확도와 시간 절약이라는 두 마리 토끼를 다 움켜잡을 수 있는 현명한 해답임을 여실히 입증했습니다. TwelveLabs의 시각 연산 분석 지배 기술과 Weaviate의 탄탄한 초고속 벡터 데이터베이스를 하이브리드로 접착 가동하면 전체 비디오 소스를 연산에서 패스시키고 유력한 부분 세그먼트만 뽑아 처리하는 영리한 엔진을 얻게 됩니다.



이 테스트를 통해 우리가 거둔 위대한 수확

  1. 비교 불가의 압도적 시간 절약(Performance Improvements): TwelveLabs의 획기적인 Pegasus 연동 구조를 Weaviate의 초당 조회 벡터 풀과 콜라보시키면, 둔중하고 긴 생 영상을 다 분석할 필요 가 없으므로 3분의 1 이상 연산 수행 속도를 전력으로 차 단하여 세이빙 시켰습니다.

  2. 오픈소스 모델의 기량 하계 극복(Enhanced Accuracy): 상대적으로 학습 한계가 명확한 LLaVa-NeXT-Video 같은 모델들마저 모호한 영상을 넓게 줄 때보다 타겟 클립으로 사유 정밀도를 높였을 때 사실 검증 정확도가 비약적으로 올라갑니다.

  3. 대단히 견고한 확장 아키텍처(Scalable Architecture): Marengo 기반 고차원 특징 추출과 Weaviate가 가진 레이턴시 없는 분산 디스크 색인 결합은 현업 비즈니스 제품군 스케일로 올리기에 최고로 안성맞춤인 락인 설계 베이스입니다.



그 외 확장 가능한 영역

이 우수한 결합 모델은 다양한 상용 마켓 도메인의 패러다임을 바꿀 킬러 무기를 제작하는 데 지대한 영향을 줄 것입니다.

  1. 엔터 및 멀티미디어 시장(Media & Entertainment): 수만 시간의 대작 미디어 저장소 아카이브 내부에서 "비 내리는 씬의 주인공 독백 신만 모아줘" 같은 마술 같은 탐색 가공 편집 환경을 초 단위로 서포트할 수 있습니다.

  2. 스포츠 마이크로 전략분석(Sports Analytics): 선수단 코칭 스태프나 경기 분석 전술가들이 훈련 비디오에서 "타자가 세 번째 스트라이크에 방망이 헛돌리는 모션만 탐색"하여 맞춤 분석 조언 서비스를 곧바로 태울 수 있도록 도울 수 있습니다.

  3. 차세대 비디오 이커머스 쇼핑(Retail & E-commerce): 무의미한 쇼츠 스타일 상품 설명 비디오들을 지대적인 대화형 쇼핑으로 개조시켜 유저가 "가방 끈 조절하는 부분만 보여줘"라고 말하면 곧바로 타겟 지점을 영상으로 피사체 하이라이트 제공 해 주는 신개념 구매 흐름을 열어젖힐 수 있습니다.

TwelveLabs의 한계가 없는 비디오 특징 분석 스택과 Weaviate의 든든한 날개 데이터베이스를 활용해 초 효율 시그널 비디오 RAG 시스템을 지금 바로 엔터프라이즈 환경에 빌드 전개해 보시기 바랍니다.

초안을 검토해 주신 Weaviate 팀의 Tuana Celik 님과 Erika Cardenas 님께 깊은 감사를 드립니다!



비디오 처리는 연산 비용이 많이 들고 시간이 오래 걸리는 작업입니다. 특히 장편 콘텐츠를 분석할 때는 더욱 그렇습니다. 검색 증강 생성(RAG)은 시스템이 전체 비디오가 아닌 가장 관련성 높은 비디오 세그먼트만 처리하도록 함으로써 이 문제를 해결합니다. 이러한 타겟팅된 접근 방식은 응답 품질을 유지하거나 향상시키면서도 처리 시간을 크게 단축합니다.

이 포스트에서는 Twelve Labs의 비디오 이해 기능Weaviate의 벡터 데이터베이스를 결합하여 비디오 콘텐츠를 위한 효율적인 RAG 시스템을 구축하는 방법을 살펴보겠습니다. 비디오를 세그먼트로 나누고 임베딩을 사용하여 분석에 가장 관련성 높은 부분만 검색함으로써, 정확도를 유지하거나 오히려 향상시키면서도 처리 시간을 크게 줄일 수 있습니다.

우리의 접근 방식은 몇 가지 핵심 기술을 활용합니다:

  • 비디오 이해 및 임베딩 생성을 위한 TwelveLabs Pegasus 및 Marengo 모델

  • 비디오 세그먼트의 효율적인 저장 및 검색을 위한 Weaviate 벡터 데이터베이스

  • 비디오 분석의 비교군 역할을 할 오픈 소스 LLaVA-NeXT-Video 모델

이 RAG 기반 접근 방식이 가장 관련성 높은 세그먼트에만 집중하여 비디오 처리의 연산 부하를 어떻게 줄일 수 있는지 보여드릴 것입니다. 이를 통해 더 긴 비디오를 한층 효율적으로 분석할 수 있게 됩니다. 콘텐츠 중재, 스포츠 분석, 교육용 콘텐츠 중 어떤 애플리케이션을 개발하든 관계없이, 이 접근 방식은 고품질의 결과를 유지하면서 비디오 처리 기능을 확장하는 데 도움을 줄 것입니다.



1 - TwelveLabs 및 Weaviate 설정하기



TwelveLabs

아직 Twelve Labs에 가입하지 않으셨다면 여기에서 가입하실 수 있습니다. 계정 설정이 완료되면 Playground로 이동하여 화면 오른쪽 상단의 사용자 아이콘을 클릭한 뒤 API Key 페이지로 이동하세요.

작업 중인 노트북의 왼쪽에서 열쇠 아이콘을 클릭하고, 이 값을 TL_API_KEY라는 이름의 시크릿(Secret)으로 생성합니다.



Weaviate

Weaviate 계정이 없다면 여기에서 가입하실 수 있습니다. 계정이 생성되면 클라우드 대시보드로 이동하여 새 클러스터를 생성하세요. 클러스터 설정이 완료되면 노트북 시크릿 섹션에 두 가지 값을 입력해야 합니다.

REST Endpoint 아래의 URL을 WEAVIATE_URL 변수에 추가합니다. API Keys 아래의 Admin 키를 복사하여 WEAVIATE_API_KEY에 저장합니다.



2 - GPU 런타임 선택하기

LLaVA-NeXT-Video 모델을 실행하려면 GPU가 필요합니다. 노트북에서 런타임 > 런타임 유형 변경으로 이동하여 T4 GPU를 선택하세요.



3 - 환경 설정하기



의존성 라이브러리 설치

먼저 TwelveLabs와 Weaviate SDK를 설치해야 합니다:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

그런 다음 나머지 필요한 패키지들을 설치합니다.

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m



TwelveLabs 및 Weaviate SDK 설정

from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

그 다음 TwelveLabs 클라이언트를 초기화합니다.

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

마지막으로, Weaviate 클라이언트를 설정하고 Video_Embeddings 컬렉션을 초기화합니다.

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")



비디오 데이터 설정하기

이제 임베딩을 위한 비디오 데이터를 준비해야 합니다. 비디오 데이터는 이 링크의 구글 드라이브 폴더에서 확인할 수 있습니다. 이 파일들을 사용자의 구글 드라이브 기본 폴더에 "TwelveLabs-Weaviate"라는 이름의 폴더를 만들어 복사하세요. 아래 셀을 실행하면 구글 드라이브를 마운트하고 노트북에서 비디오 파일에 접근할 수 있게 됩니다.

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"



비디오 업스케일링

보유한 비디오 중 일부는 임베딩 모델에 사용하기에 해상도가 너무 낮습니다. 따라서 사용하기 전에 해상도를 높여주는(업스케일링) 작업이 필요합니다.

여기에 업스케일링 함수를 생성해 줍니다. read_video_pyav 함수는 LLaVa-NeXT-Video 콜랩 노트북에서 가져온 것으로, 비디오를 추론에 적합한 형태의 numpy 배열로 포맷팅해 줍니다.

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

raw_video_dir에 있는 비디오들을 가져와 업스케일링한 뒤 upscaled_video_dir에 저장합니다.

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)



4 - 단일 비디오에서 Pegasus와 LLaVa-NeXT-Video 비교하기

PegasusLLaVa-NeXT-Video는 비디오를 입력하고 관련 질문을 던질 수 있는 비디오 이해(Video Understanding) 모델입니다.

먼저 우리가 가진 비디오 컬렉션 중 단일 비디오 하나를 사용하여 Pegasus와 LLaVa-NeXT-Video의 성능을 비교해 보겠습니다. 이 비디오는 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠가 맞붙은 제42회 슈퍼볼의 한 장면을 담고 있습니다. 경기 종료 2분을 남겨두고 자이언츠의 쿼터백 일라이 매닝이 던진 패스를 리시버 데이비드 타이리가 자신의 헬멧에 공을 밀착시켜 극적으로 잡아낸, 일명 '헬멧 캐치(Helmet Catch)'로 불리는 역사적인 순간을 보여주는 영상입니다.

비디오에 대한 맥락을 파악했으니, 이제 두 모델에게 "이 비디오에서 무슨 일이 일어나고 있나요?"라는 질문을 던져 비디오를 얼마나 잘 이해하는지 비교 확인해 보겠습니다.



Pegasus로 비디오와 대화하기

시작하기 앞서, 비디오를 저장할 Pegasus 인덱스를 하나 생성해야 합니다.

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

그 다음 해당 인덱스에 비디오를 업로드하는 함수를 작성합니다. 업로드가 완료되면 질문을 할 때 사용할 Pegasus 비디오 ID가 반환됩니다.

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

비디오를 업로드하고 Pegasus 비디오 ID를 single_video_id 변수에 저장해 둡니다.

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

Pegasus가 비디오를 제대로 이해했는지 확인하기 위해, "이 비디오에서 무슨 일이 일어나고 있나요? 간결하게 답변해 주세요."라고 질문해 보겠습니다.

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasus는 다음과 같이 답변했습니다:

비디오는 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠 간의 미시축구 경기에서 결정적인 순간을 보여줍니다. 자이언츠의 쿼터백 일라이 매닝이 던진 패스를 데이비드 타이리가 경기장 밖으로 쓰러지면서 공을 자신의 헬멧에 고정한 채 아주 환상적으로 받아냅니다. 다양한 앵글로 이 캐치 장면이 리플레이되면서 극적인 순간의 난이도와 정밀함이 강조됩니다. 타이리는 플레이 직후 짧게 세레머니를 하고, 비디오는 타이리와 다른 선수들이 경기장을 벗어나는 모습으로 끝이 납니다.

답변을 통해 Pegasus가 비디오를 매우 깊이 있게 이해하고 있음을 알 수 있습니다. 이 경기가 자이언츠와 패트리어츠 간의 풋볼 경기라는 점을 파악했고, 일라이 매닝이 패스를 던졌으며 데이비드 타이리가 이를 받아낸 장면이 경기의 결정적인 순간이었음을 정확히 짚어냈습니다.

Pegasus가 이 경기가 슈퍼볼 경기라는 것을 직접 언급하지 않았으므로, 확인차 관련 질문을 추가로 해보겠습니다.

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

그러자 Pegasus는 정확하게 이 경기는 제42회 슈퍼볼(Super Bowl XLII) 경기입니다.라고 답했습니다.

이제 LLaVa-NeXT-Video 모델은 이 비디오를 얼마나 잘 이해하는지 비교 확인해 보겠습니다.



LLaVa-NeXT-Video로 비디오와 대화하기

LLaVa-NeXT-Video의 경우, 추론을 실행하기 전에 비디오 데이터를 특정 규격의 포맷으로 준비해 주어야 합니다. 이 모델은 전체 비디오 스트림을 한 번에 처리하지 않기 때문에 비디오 전반에서 균일하게 프레임을 추출(샘플링)하는 과정이 필요합니다. 각 비디오에서 40개의 프레임을 고르게 추출하는 샘플링 함수를 제작하여 비디오 전체의 핵심 순간들을 빠짐없이 포착할 수 있도록 하겠습니다. 이 샘플링 기법은 LLaVA-NeXT-Video 공식 구현 코드를 응용하여 만들었습니다. 샘플링이 완료되면 허깅페이스 허브(Hugging Face Hub)에서 모델을 불러온 뒤, 형식에 맞게 입력을 변환하여 추론을 실행하고 응답을 생성해 보겠습니다.

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

비디오 샘플링이 올바르게 완료되면, 추론을 위해 모델을 구성합니다.

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

그 다음, 모델에 유저 질문을 전송해 추론하는 헬퍼 함수를 구현합니다.

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

마지막으로, Pegasus와 직접 비교를 해보기 위해 동일한 질문을 던져보겠습니다.

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

그 결과, 다음 응답을 얻었습니다:

What is happening in this video? Be concise ASSISTANT: 비디오는 경기장에 여러 선수들이 있는 풋볼 경기를 보여주고 있습니다. 저지 번호와 일부 선수들이 착용한 옛날식 헬멧으로 판단하건대 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠 간의 '제3회 슈퍼볼(Super Bowl III)' 경기인 것 같습니다. 한 선수가 동작 중에 공을 잡고 다른 선수에게 태클을 당하고 있으며, 심판은 퍼스트 다운 사인을 보내고 있습니다. 코치와 다른 경기 관계자들도 보입니다.

이 모델도 자이언츠와 패트리어츠 간에 미식축구 경기가 열리고 있다는 사실은 감지해 냈으나, 뜬금없이 이 경기를 '제3회 슈퍼볼'로 비정상 지목해 버렸습니다. 또한 이 비디오의 핵심이자 눈여겨볼 장면인 '헬멧 캐치'에 대한 내용은 깔끔하게 누락했습니다.

다시 한 번 "이 경기는 무슨 경기인가요?"라는 질문을 던져 보겠습니다. Pegasus에 근접하기는 했지만 여전히 잘못된 오류 답변을 출력합니다.

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: 제공해주신 비디오는 현재 진행 중인 풋볼 경기의 영상으로, 구체적으로는 제41회 슈퍼볼(Super Bowl XLI)의 한 장면입니다. 뉴잉글랜드 패트리어츠 대 뉴욕 자이언츠의 대결입니다. 화면 속의 선수들은 자이언츠와 패트리어츠 소속입니다.



5 - 단일 비디오에서 세그먼트 단위 쿼리를 위한 RAG 적용하기

전체 비디오 분석에 있어서는 Pegasus가 LLaVa-NeXT-Video보다 더 빠르고 일관된 호환성이 높은 답변을 출력하며, 품질 측면에서도 훨씬 압도적인 퍼포먼스를 보여줍니다.

하지만 비디오 분석의 초점을 가장 밀접한 일부 세그먼트 영역으로만 좁혀주는 구조가 마련된다면, 분석 모델들의 전반적인 품질을 끌어올릴 수 있습니다. 여기서 RAG(검색 증강 생성)의 가치가 유용하게 빛을 발합니다. 비디오 전체를 무식하게 다 다루기보단, 사용자가 질문한 내용에 대한 힌트 정보가 포함된 세그먼트만 귀신같이 찾아내 국소적으로 분석을 제공하는 것입니다.

이 파이프라인을 구현하기 위해 우리는 Marengo 모델을 빌려 쓸 것입니다. 이는 비디오 세그먼트의 의미 단위 데이터를 래핑하여 완성도 높은 고품질 벡터 임베딩으로 뽑아내 주는 전용 특화 모델입니다. 이 임베딩을 구성하면 다음 플로우가 가능해집니다:

  1. 전체 긴 비디오에서 조각난 각 세그먼트별로 독립적인 임베딩 인덱스를 만듭니다.

  2. 유저 질문(문장) 정보를 파싱해 그것과 극도로 유사한 특정 비디오 세그먼트를 매칭해 냅니다.

  3. 적중률이 가장 높은 타겟 세그먼트 한두 개만 콕 집어 우리의 최종 분석 모델에 먹여 연산합니다.

그럼 이제 비디오를 일정한 구간 세그먼트로 잘라 조각내고, Marengo 모델을 사용해 각 조각들의 임베딩 벡터를 뽑아보겠습니다. 이 임베딩 세트가 우리의 RAG 시스템의 든든한 뼈대가 될 것입니다.



Marengo를 활용한 전체 비디오 및 비디오 컷 임베딩 생성

Marengo 모델이 잘라 낼 수 있는 최대 길이 제한인 10초 단위로 세그먼트 길이를 지정하겠습니다.

# Define the video segment length
segment_length = 10

그 다음 Marengo를 작동시켜 임베딩을 생성합니다. 분석할 때 video_embedding_scopes=["clip", "video"]video_clip_length=segment_length 옵션을 부여하는 것에 유의해 주시기 바랍니다. 이렇게 세팅해야 Marengo가 전체 비디오에 대칭되는 통합 임베딩과 더불어 자정해 둔 10초 단위 마이크로 클립들을 일괄 연산하여 각각의 개별 임베딩 세트로 반환해 줍니다.

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

임베딩 태스크 처리가 끝나면 나중에 해당 임베딩을 언제든 꺼내올 수 있도록 Marengo Task ID를 로컬에 보관해 두어야 합니다. 추후 Weaviate 데이터베이스에 업로드할 때 매칭할 수 있도록 marengo_task_ids 사전에 이를 매핑 정리해 두겠습니다.

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id



RAG용 비디오 세그먼트 가공 환경 설계

에러 없는 유기적인 RAG 파이프라인 처리를 완성하려면, 데이터베이스 내부에서 Pegasus가 발급한 Video ID와 방금 만든 Marengo의 Task ID를 정확히 엮어 두어야 합니다. 이렇게 한 묶음으로 설계해야 벡터 데이터베이스 탐색에서 매칭된 세그먼트에 대해 Pegasus 모델에 바로 질문을 이어 던질 수 있습니다. 이를 구축하기 위해 조각낸 미니 세그먼트 비디오 파일들을 Pegasus 인덱서로도 빠짐없이 전부 업로드해 줍니다.

가장 먼저 비디오를 10초 규격의 개별 조각 파일들로 균일하게 분할해 내는 split_video 유틸 함수를 준비합니다. 이때 Pegasus 인덱서에 업로드할 수 있는 최소 비디오 조건인 '4초 이상' 스펙을 어기지 않도록 각별히 방어 조치를 해야 합니다. 비디오를 다 잘라내고 남은 자투리 맨 마지막 조각이 혹시라도 5초 미만으로 비정상 짧은 경우엔, 그 직전 세그먼트 영역과 서로 겹치게 프레임을 넉넉히 교차 결합해 주어 비디오 분할이 안전하게 5초 이상 길이를 갖추도록 세이브 기믹을 적용해 구현했습니다.

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

지정해 둔 전용 세그먼트 데이터 저장 폴더인 video_segments_dir 경로에 분할 비디오들을 적재시킵니다.

split_video(single_video_file, video_segments_dir,segment_length)

파일명과 매칭되는 Pegasus Video ID 값을 담아 둘 매핑용 빈 사전인 pegasus_video_ids를 초기 선언하고 먼저 원본 전체 비디오의 고유 ID 정보부터 삽입해 놓겠습니다.

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

이제 준비된 미니 세그먼트용 비디오 파일들을 차례대로 Pegasus 데이터 인덱서로 업로드 전송 처리하여 리턴된 고유 ID 값들로 pegasus_video_ids 매핑용 데이터 사전을 완성해 줍니다.

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

마지막으로, 비교 모델인 LLaVa-NeXT-Video에서도 분석이 매끄럽게 수행될 수 있도록 전체 세그먼트 영상들을 미리 균일 프레임 샘플링 구조로 컨버전(배열 변환) 적용해 놓는 사전 로드 빌드 작업을 처리해 둡니다.

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video



Weaviate에 임베딩 업로드하기

Weaviate는 컬렉션으로 다룰 때 메타데이터 객체와 실제 탐색에 쓰일 임베딩 벡터 리스트를 분리해서 처리하는 방식을 권장합니다. 이에 대칭되는 Weaviate 업로드 셋을 만들기 위해, Marengo Task ID 값과 이에 매칭되는 Pegasus Video ID 정보를 전달받아 규격화된 records 리스트와 이에 상응하는 vectors 배열 세트로 파싱 가공해 내는 prepare_marengo_embeddings_for_weaviate 유틸 변환 함수를 구성하겠습니다.

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

그 다음 작성이 완료된 해당 파이프라인 컴포넌트 변환 동작 함수를 실행하여 도출된 정보 레코드와 임베딩 벡터 리스트들을 Weaviate 서버 대시보드로 바인딩 전송해 줍니다.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



벡터 검색 테스트하기

보유 중인 모든 리소스를 수집 데이터베이스 인스턴스에 집어넣었으므로, 이제 원하는 풋볼 정보 검색 쿼리에 최적화되어 대응하는지 Weaviate의 near_vector 벡터 거리가 유사하게 동작하는지 성능 테스트를 실행해 보겠습니다. 우리가 가공해 넣어둔 임베딩 세트의 임의 인텍스 벡터를 쿼리에 주입하면 본인 자신과의 벡터 연산 코사인 유클리드 거리가 완전히 0에 가깝게 딱 정방향 인출되어야 맞습니다.

임의로 타겟팅한 5번 벡터 변환 샘플값으로 가상 쿼리를 태우면 리턴된 응답 벡터 셋에서 자기 자신에 준하는 비디오 정보를 최우선 거리값 0의 레코드로 보여줄 것입니다.

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

동일한 임시 쿼리 작동으로 우리 DB 내부의 고유 벡터 레코드가 거리 0인 환상의 무결점 상태로 검색되어 반환됨을 목격했습니다.




RAG 설계를 위한 타겟 세그먼트 영상 인출 및 검증

우리가 공들여 빌드한 이번 비디오 RAG 시스템의 핵심 역량은, 클라이언트에게서 인입된 자연어 질문에 반응하고 알맞은 영상 레코드를 매칭 인출해 주는 프로세스에 있습니다. 전체적인 처리는 아래 3단계로 핵심 처리됩니다.

  1. TwelveLabs의 전용 임베딩 모델인 Marengo로 고객의 질문(텍스트) 정보를 분석하고 벡터 임베딩으로 번환 처리합니다.

  2. 생성된 질의 벡터와 DB에 미리 구축해 둔 타겟 미니 영상 고유 벡터 셋들 간 최인접 유클리드 서치를 탐색해 우수한 매칭값을 가려냅니다.

  3. 최적의 거리값으로 채택된 비디오 세그먼트를 추출하고, 이 영상에 매달려 있는 Pegasus 고유 ID를 조회해 타겟 조각 구간에 대칭되는 정밀한 답변을 합성 제공하도록 모델을 실행합니다.

이러한 정밀한 설계를 사용하면 방대한 영상에서 쓸모없는 연산 낭비를 가차 없이 패스하고, 질문 내용에 매칭되는 국소 구간만 초고속 분석 처리해 답변 전달 품질과 리소스 세이빙 비용을 엄청나게 끌어올릴 수 있습니다.

첫 번째 단계로 유저의 한글 영문 질문 데이터를 임베딩 쿼리로 인코딩합니다.

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

그 다음 Weaviate 내부에서 쿼리 임베딩과 가장 극단적으로 가까이에 인접 배치된 우수 타겟 세그먼트 단품 조각을 찾아 내겠습니다. filters=(Filter.by_property("type").equal("clip"))로 소조건을 세팅함으로써, 무겁고 모호한 전체 비디오 통합 임베딩들은 한꺼번에 배제하고 알맹이들만 타겟 인출되도록 한정 지어 성능을 보강합니다.

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

탐색 서치가 멋지게 성공하여 4번째 구간 세그먼트 데이터 파일인 football_480_segment_003.mp4 레코드를 탁월한 스펙으로 가져왔습니다.

실제 적중한 영상 세그먼트를 눈등으로 간략 매칭 시각화해 보겠습니다.

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

확인 결과, 전체 시간 축 프레임 중 타이리 선수가 실제 헬멧을 지탱 삼아 볼을 꽉 거머죄는 절정에 달한 정확한 찰나의 캐칭 파트 세그먼트 본체임을 한눈에 가려낼 수 있습니다.

우리가 의도했던 대로 완벽한 세그먼트 조각을 타겟팅했다는 사실을 확신했으니, 이제 이 짧은 조각 구간에서 두 모델이 어느 정도의 분석 퀄리티 차이를 출력하는지 직접 배틀을 시켜 연동해 보겠습니다.



매칭된 미니 세그먼트로 비디오 대화 결과 격돌: Pegasus 대 LLaVa-NeXT-Video

우선 TwelveLabs의 고급 Pegasus 추론 모델 답변 빌드 결과를 관찰해 줍니다.

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

데이비드 타이리가 공을 잡기 위해 사용한 캐칭 기술은 무엇인가요?

데이비드 타이리는 패스된 미식축구 공을 안전하게 확보하기 위하여 자신의 머리에 씌여진 헬멧 겉면에 공 부위를 거세게 내리눌러 밀착 고정하는 독창적인 임기응변 동작 기술을 펼쳤습니다. 이 시도는 자이언츠의 패스 연결 권한을 경이롭게 계속 지속시키는 데 최고의 공을 세운 역사적 하이라이트 플레이로 세이브되었습니다.

질의 요지를 통달한 것처럼, 타이리의 헬멧 위 밀착 볼 컨트롤 동작을 예술적으로 해설해 주며 경기 흐름상 기여도까지 일타 쌍피로 엮어 기품 있는 완벽한 답변을 쏟아냈습니다.

이번에는 부하가 덜해진 짧아진 똑같은 세그먼트 조각에 대하여 오픈소스 비교 구동 모델인 LLaVa-NeXT-Video는 전보다 더 영리한 답변을 만들어 내는지 연쇄 태워보겠습니다.

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: 공을 캐치하려는 모션을 보인 미식축구 선수는 양손을 사용해 머리 위 높은 영역에서 안전하게 포구하는 오버헤드 투핸드 포그립 낙하포착 기술을 활용해 볼을 제어하려 한 것으로 목격됩니다. 이 기교는 디펜더의 인터셉트 위협을 무력화하며 머리꼭대기 높이에 뜬 공을 잡아채는 유용한 포구 루트의 일종이나 밸런스를 잃고 미끄러지면 대단히 큰 패스 실패로 이어질 소지도 있습니다.

공의 비행 궤적에 따른 양손의 일반적인 물리 오버헤드 포지셔닝에 국한해 무난 무난한 답변만을 생성하는 데 그쳤습니다. 우리가 눈여겨봤던 독창적인 트레이드 마크 시그니처 동작인 '헬멧 밀착 캐치' 부분은 이번 마이크로 분석에서도 기대를 빗나가고 포착에 실패한 아쉬운 결과를 보여주었습니다.



6 - 다중 비디오 환경에서 Marengo, Weaviate 및 Pegasus 멀티 RAG 가동하기

단독 미시 단위 비디오 임베딩 구간 대칭 동작을 실컷 경험해 봤으니, 이제 서비스 레벨에서 멀티플 플레이로 가장 빈번하게 빌드 요청을 넣는 다중 비디오 분산 환경에서의 고기능성 멀티 RAG 실전 구현 단계를 보여드리겠습니다.



전체 비디오 풀에 대응하도록 Marengo 인덱싱 일괄 전개

그동안 준비해 뒀던 전체 추가 비디오 컬렉션들에 대한 Marengo 임베딩 정보를 통합 맵 marengo_task_ids 사전에 루핑 전개하여 한 세트로 관리 인덱싱 처리하겠습니다.

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id



추가 확장 비디오 풀 대상 세그먼트 분할 전개

그 외 업스케일 가공 완료된 연계 스포츠 비디오들도 기존 처리했던 방식과 마찬가지로 전부 규격화된 10초 세그먼트 데이터 조각으로 쪼개 놓아 RAG 적용 자격을 부여합니다.

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)



나머지 다중 영상 전부에 대한 Pegasus 고유 Video ID 일괄 전송 바인딩

나머지 파편화된 다중 조각 비디오 채널 전체의 Pegasus Video ID 리스트를 취득하겠습니다. 이 과정은 대기 딜레이를 많이 깎아내기 위해서 고속 멀티 락 패럴렐(병렬 스레드 처리) 호출 기법을 사용해 단번에 완성해 줍니다.

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")



최종 미싱 데이터 세트 Weaviate 일괄 데이터 바인딩

완벽하게 준비된 2단계 멀티 비디오 클립 가공 소스들을 Weaviate의 내부 클러스터 영역으로 일괄 주입 연동합니다.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



RAG 성능 평가 지표: 마이크로 클립 검색 대 전체 비디오 직접 질문 비교

드디어 Weaviate 공간 내부에 모든 멀티 비디오에 대한 Marengo의 특징 벡터 및 Pegasus 매핑용 ID 컬렉션의 세팅을 완벽히 마쳤습니다. 이제 RAG 시스템 성능 평가를 시작해 보겠습니다. 이 테스트는 두 가지 극단적인 주요 성향 포인트를 관측할 것입니다.

  1. 답변의 핵심 적중력(Answer Quality): 클립 조각 기반의 협소한 RAG 탐색과 전체 통합 비디오를 생으로 쿼리했을 때 도출되는 응답 정보 결과의 정확함 차이가 어떻게 전개되는가?

  2. 시간 연산 절약성(Processing Efficiency): 두 방식 간에 시간 소요 격차나 리소스를 낭비하는 딜레이 측면에 어떠한 실전 마일스톤 메리트가 드러나는가?

우리는 서로 다른 성질의 질문 묶음을 던져, 각기 최단 핏을 달성하는 세그먼트 영상 컷을 유도한 케이스와 생비디오를 주입한 케이스의 성능 소요치를 지표로 얻을 것입니다. 이를 목도하면, 비디오 재생 분량이 한정 없이 늘어나도 똑똑한 RAG 구성이 불필요한 계산을 원천 배제하여 뛰어난 퍼포먼스 가치를 달성한다는 것을 자명하게 입증받을 수 있습니다.

다양한 종목의 스포츠들이 넓게 포진된 테스트용 질의응답 세트 목록을 정의해 봅니다.

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]



TwelveLabs Pegasus를 이용한 멀티 비디오 RAG

가장 먼저, 다이렉트로 전체 비디오를 상대로 질의를 대여 처리한 케이스의 소요 수치를 검산해 봅니다.

from weaviate.classes.query import MetadataQuery, Filter
import time

pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

다음으로 정교하게 세그먼트 데이터 컷을 타겟팅하여 뽑아서 RAG 연산을 적용한 결과를 추적 관찰합니다.

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

조각 세그먼트 영상만을 발췌해 RAG를 전개한 방식과 단순 생비디오를 집어넣은 경우 생성된 실시간 응답 퀄리티 비교를 제공해 드리겠습니다.

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

목격하신 것과 같이 생성해 낸 최종 정보의 퀄리티는 대단히 수려하고 서로 어깨를 나란히 할 정도로 준수한 모습을 유지합니다. 하지만 연산 시간에서 세그먼트 RAG 전략은 단 20초 만에 완수된 반면, 전체 영상을 순회 구동하면 72초라는 엄청난 리소스 지연 차이가 벌어졌습니다.




LLaVa-NeXT-Video 모형 기반 유사 다중 RAG 구동

이번 단계로는 LLaVa-NeXT-Video 오픈소스 모델에 위의 멀티 비디오 RAG 설계를 연속 태워보겠습니다. 그전에 비교 동작에 필요한 다중 컷 샘플링 프레임 생성부터 일괄 완수해 늘어놓습니다.

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

우선 통비디오 다이렉트 패싱 연동의 결과물 빌드부터 시작하겠습니다.

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

곧이어 RAG 클립 기반 추론 응답 연산으로 전개해 수치를 확인합니다.

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

이 테스트 세팅에선 두 빌드 실행 시간이 완전히 동률로 뽑혔습니다. 이 현상이 나온 타당한 이유는 비디오 연장 길이 차이와 상관없이 일률적으로 40점씩 고정 프레임만 드롭 샘플링해서 GPU 메모리에 올리기 때문입니다.

이제 LLaVa-NeXT-Video 모형이 출력해 낸 조각 클립 분석본과 통비디오 다이렉트 분석본의 텍스트 답변 내용을 살펴보겠습니다.

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

오픈소스 챗 지향 모델인 LLaVa-NeXT-Video는 클립 세그먼트 RAG를 통과시켰을 때 총 5개의 질의 중 약 2개 부분에서 올바른 과녘 정답을 타격해 냈습니다.

  1. 첫째 질문 대전 팀 정보에서, 뉴잉글랜드와 뉴욕 자이언츠가 올바르게 대결 중인 구도 맥락을 선방하여 정확히 감지했습니다.

  2. 세 번째 질판 스포츠 정보에서도, 전설적인 테니스 거장 페더러와 조코비치가 맞닥뜨린 대전 상황임을 정확히 추적 매칭했습니다.




7 - 결론: TwelveLabs 및 Weaviate 기반 고효율 비디오 RAG 구축 전략 전략과 가치 요약

오늘 마친 비디오 RAG(검색 증강 생성) 파이프라인의 실전 빌드 테스트 결과는 정확도와 시간 절약이라는 두 마리 토끼를 다 움켜잡을 수 있는 현명한 해답임을 여실히 입증했습니다. TwelveLabs의 시각 연산 분석 지배 기술과 Weaviate의 탄탄한 초고속 벡터 데이터베이스를 하이브리드로 접착 가동하면 전체 비디오 소스를 연산에서 패스시키고 유력한 부분 세그먼트만 뽑아 처리하는 영리한 엔진을 얻게 됩니다.



이 테스트를 통해 우리가 거둔 위대한 수확

  1. 비교 불가의 압도적 시간 절약(Performance Improvements): TwelveLabs의 획기적인 Pegasus 연동 구조를 Weaviate의 초당 조회 벡터 풀과 콜라보시키면, 둔중하고 긴 생 영상을 다 분석할 필요 가 없으므로 3분의 1 이상 연산 수행 속도를 전력으로 차 단하여 세이빙 시켰습니다.

  2. 오픈소스 모델의 기량 하계 극복(Enhanced Accuracy): 상대적으로 학습 한계가 명확한 LLaVa-NeXT-Video 같은 모델들마저 모호한 영상을 넓게 줄 때보다 타겟 클립으로 사유 정밀도를 높였을 때 사실 검증 정확도가 비약적으로 올라갑니다.

  3. 대단히 견고한 확장 아키텍처(Scalable Architecture): Marengo 기반 고차원 특징 추출과 Weaviate가 가진 레이턴시 없는 분산 디스크 색인 결합은 현업 비즈니스 제품군 스케일로 올리기에 최고로 안성맞춤인 락인 설계 베이스입니다.



그 외 확장 가능한 영역

이 우수한 결합 모델은 다양한 상용 마켓 도메인의 패러다임을 바꿀 킬러 무기를 제작하는 데 지대한 영향을 줄 것입니다.

  1. 엔터 및 멀티미디어 시장(Media & Entertainment): 수만 시간의 대작 미디어 저장소 아카이브 내부에서 "비 내리는 씬의 주인공 독백 신만 모아줘" 같은 마술 같은 탐색 가공 편집 환경을 초 단위로 서포트할 수 있습니다.

  2. 스포츠 마이크로 전략분석(Sports Analytics): 선수단 코칭 스태프나 경기 분석 전술가들이 훈련 비디오에서 "타자가 세 번째 스트라이크에 방망이 헛돌리는 모션만 탐색"하여 맞춤 분석 조언 서비스를 곧바로 태울 수 있도록 도울 수 있습니다.

  3. 차세대 비디오 이커머스 쇼핑(Retail & E-commerce): 무의미한 쇼츠 스타일 상품 설명 비디오들을 지대적인 대화형 쇼핑으로 개조시켜 유저가 "가방 끈 조절하는 부분만 보여줘"라고 말하면 곧바로 타겟 지점을 영상으로 피사체 하이라이트 제공 해 주는 신개념 구매 흐름을 열어젖힐 수 있습니다.

TwelveLabs의 한계가 없는 비디오 특징 분석 스택과 Weaviate의 든든한 날개 데이터베이스를 활용해 초 효율 시그널 비디오 RAG 시스템을 지금 바로 엔터프라이즈 환경에 빌드 전개해 보시기 바랍니다.

초안을 검토해 주신 Weaviate 팀의 Tuana Celik 님과 Erika Cardenas 님께 깊은 감사를 드립니다!



비디오 처리는 연산 비용이 많이 들고 시간이 오래 걸리는 작업입니다. 특히 장편 콘텐츠를 분석할 때는 더욱 그렇습니다. 검색 증강 생성(RAG)은 시스템이 전체 비디오가 아닌 가장 관련성 높은 비디오 세그먼트만 처리하도록 함으로써 이 문제를 해결합니다. 이러한 타겟팅된 접근 방식은 응답 품질을 유지하거나 향상시키면서도 처리 시간을 크게 단축합니다.

이 포스트에서는 Twelve Labs의 비디오 이해 기능Weaviate의 벡터 데이터베이스를 결합하여 비디오 콘텐츠를 위한 효율적인 RAG 시스템을 구축하는 방법을 살펴보겠습니다. 비디오를 세그먼트로 나누고 임베딩을 사용하여 분석에 가장 관련성 높은 부분만 검색함으로써, 정확도를 유지하거나 오히려 향상시키면서도 처리 시간을 크게 줄일 수 있습니다.

우리의 접근 방식은 몇 가지 핵심 기술을 활용합니다:

  • 비디오 이해 및 임베딩 생성을 위한 TwelveLabs Pegasus 및 Marengo 모델

  • 비디오 세그먼트의 효율적인 저장 및 검색을 위한 Weaviate 벡터 데이터베이스

  • 비디오 분석의 비교군 역할을 할 오픈 소스 LLaVA-NeXT-Video 모델

이 RAG 기반 접근 방식이 가장 관련성 높은 세그먼트에만 집중하여 비디오 처리의 연산 부하를 어떻게 줄일 수 있는지 보여드릴 것입니다. 이를 통해 더 긴 비디오를 한층 효율적으로 분석할 수 있게 됩니다. 콘텐츠 중재, 스포츠 분석, 교육용 콘텐츠 중 어떤 애플리케이션을 개발하든 관계없이, 이 접근 방식은 고품질의 결과를 유지하면서 비디오 처리 기능을 확장하는 데 도움을 줄 것입니다.



1 - TwelveLabs 및 Weaviate 설정하기



TwelveLabs

아직 Twelve Labs에 가입하지 않으셨다면 여기에서 가입하실 수 있습니다. 계정 설정이 완료되면 Playground로 이동하여 화면 오른쪽 상단의 사용자 아이콘을 클릭한 뒤 API Key 페이지로 이동하세요.

작업 중인 노트북의 왼쪽에서 열쇠 아이콘을 클릭하고, 이 값을 TL_API_KEY라는 이름의 시크릿(Secret)으로 생성합니다.



Weaviate

Weaviate 계정이 없다면 여기에서 가입하실 수 있습니다. 계정이 생성되면 클라우드 대시보드로 이동하여 새 클러스터를 생성하세요. 클러스터 설정이 완료되면 노트북 시크릿 섹션에 두 가지 값을 입력해야 합니다.

REST Endpoint 아래의 URL을 WEAVIATE_URL 변수에 추가합니다. API Keys 아래의 Admin 키를 복사하여 WEAVIATE_API_KEY에 저장합니다.



2 - GPU 런타임 선택하기

LLaVA-NeXT-Video 모델을 실행하려면 GPU가 필요합니다. 노트북에서 런타임 > 런타임 유형 변경으로 이동하여 T4 GPU를 선택하세요.



3 - 환경 설정하기



의존성 라이브러리 설치

먼저 TwelveLabs와 Weaviate SDK를 설치해야 합니다:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

그런 다음 나머지 필요한 패키지들을 설치합니다.

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m



TwelveLabs 및 Weaviate SDK 설정

from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

그 다음 TwelveLabs 클라이언트를 초기화합니다.

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

마지막으로, Weaviate 클라이언트를 설정하고 Video_Embeddings 컬렉션을 초기화합니다.

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")



비디오 데이터 설정하기

이제 임베딩을 위한 비디오 데이터를 준비해야 합니다. 비디오 데이터는 이 링크의 구글 드라이브 폴더에서 확인할 수 있습니다. 이 파일들을 사용자의 구글 드라이브 기본 폴더에 "TwelveLabs-Weaviate"라는 이름의 폴더를 만들어 복사하세요. 아래 셀을 실행하면 구글 드라이브를 마운트하고 노트북에서 비디오 파일에 접근할 수 있게 됩니다.

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"



비디오 업스케일링

보유한 비디오 중 일부는 임베딩 모델에 사용하기에 해상도가 너무 낮습니다. 따라서 사용하기 전에 해상도를 높여주는(업스케일링) 작업이 필요합니다.

여기에 업스케일링 함수를 생성해 줍니다. read_video_pyav 함수는 LLaVa-NeXT-Video 콜랩 노트북에서 가져온 것으로, 비디오를 추론에 적합한 형태의 numpy 배열로 포맷팅해 줍니다.

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

raw_video_dir에 있는 비디오들을 가져와 업스케일링한 뒤 upscaled_video_dir에 저장합니다.

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)



4 - 단일 비디오에서 Pegasus와 LLaVa-NeXT-Video 비교하기

PegasusLLaVa-NeXT-Video는 비디오를 입력하고 관련 질문을 던질 수 있는 비디오 이해(Video Understanding) 모델입니다.

먼저 우리가 가진 비디오 컬렉션 중 단일 비디오 하나를 사용하여 Pegasus와 LLaVa-NeXT-Video의 성능을 비교해 보겠습니다. 이 비디오는 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠가 맞붙은 제42회 슈퍼볼의 한 장면을 담고 있습니다. 경기 종료 2분을 남겨두고 자이언츠의 쿼터백 일라이 매닝이 던진 패스를 리시버 데이비드 타이리가 자신의 헬멧에 공을 밀착시켜 극적으로 잡아낸, 일명 '헬멧 캐치(Helmet Catch)'로 불리는 역사적인 순간을 보여주는 영상입니다.

비디오에 대한 맥락을 파악했으니, 이제 두 모델에게 "이 비디오에서 무슨 일이 일어나고 있나요?"라는 질문을 던져 비디오를 얼마나 잘 이해하는지 비교 확인해 보겠습니다.



Pegasus로 비디오와 대화하기

시작하기 앞서, 비디오를 저장할 Pegasus 인덱스를 하나 생성해야 합니다.

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

그 다음 해당 인덱스에 비디오를 업로드하는 함수를 작성합니다. 업로드가 완료되면 질문을 할 때 사용할 Pegasus 비디오 ID가 반환됩니다.

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

비디오를 업로드하고 Pegasus 비디오 ID를 single_video_id 변수에 저장해 둡니다.

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

Pegasus가 비디오를 제대로 이해했는지 확인하기 위해, "이 비디오에서 무슨 일이 일어나고 있나요? 간결하게 답변해 주세요."라고 질문해 보겠습니다.

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasus는 다음과 같이 답변했습니다:

비디오는 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠 간의 미시축구 경기에서 결정적인 순간을 보여줍니다. 자이언츠의 쿼터백 일라이 매닝이 던진 패스를 데이비드 타이리가 경기장 밖으로 쓰러지면서 공을 자신의 헬멧에 고정한 채 아주 환상적으로 받아냅니다. 다양한 앵글로 이 캐치 장면이 리플레이되면서 극적인 순간의 난이도와 정밀함이 강조됩니다. 타이리는 플레이 직후 짧게 세레머니를 하고, 비디오는 타이리와 다른 선수들이 경기장을 벗어나는 모습으로 끝이 납니다.

답변을 통해 Pegasus가 비디오를 매우 깊이 있게 이해하고 있음을 알 수 있습니다. 이 경기가 자이언츠와 패트리어츠 간의 풋볼 경기라는 점을 파악했고, 일라이 매닝이 패스를 던졌으며 데이비드 타이리가 이를 받아낸 장면이 경기의 결정적인 순간이었음을 정확히 짚어냈습니다.

Pegasus가 이 경기가 슈퍼볼 경기라는 것을 직접 언급하지 않았으므로, 확인차 관련 질문을 추가로 해보겠습니다.

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

그러자 Pegasus는 정확하게 이 경기는 제42회 슈퍼볼(Super Bowl XLII) 경기입니다.라고 답했습니다.

이제 LLaVa-NeXT-Video 모델은 이 비디오를 얼마나 잘 이해하는지 비교 확인해 보겠습니다.



LLaVa-NeXT-Video로 비디오와 대화하기

LLaVa-NeXT-Video의 경우, 추론을 실행하기 전에 비디오 데이터를 특정 규격의 포맷으로 준비해 주어야 합니다. 이 모델은 전체 비디오 스트림을 한 번에 처리하지 않기 때문에 비디오 전반에서 균일하게 프레임을 추출(샘플링)하는 과정이 필요합니다. 각 비디오에서 40개의 프레임을 고르게 추출하는 샘플링 함수를 제작하여 비디오 전체의 핵심 순간들을 빠짐없이 포착할 수 있도록 하겠습니다. 이 샘플링 기법은 LLaVA-NeXT-Video 공식 구현 코드를 응용하여 만들었습니다. 샘플링이 완료되면 허깅페이스 허브(Hugging Face Hub)에서 모델을 불러온 뒤, 형식에 맞게 입력을 변환하여 추론을 실행하고 응답을 생성해 보겠습니다.

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

비디오 샘플링이 올바르게 완료되면, 추론을 위해 모델을 구성합니다.

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

그 다음, 모델에 유저 질문을 전송해 추론하는 헬퍼 함수를 구현합니다.

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

마지막으로, Pegasus와 직접 비교를 해보기 위해 동일한 질문을 던져보겠습니다.

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

그 결과, 다음 응답을 얻었습니다:

What is happening in this video? Be concise ASSISTANT: 비디오는 경기장에 여러 선수들이 있는 풋볼 경기를 보여주고 있습니다. 저지 번호와 일부 선수들이 착용한 옛날식 헬멧으로 판단하건대 뉴욕 자이언츠와 뉴잉글랜드 패트리어츠 간의 '제3회 슈퍼볼(Super Bowl III)' 경기인 것 같습니다. 한 선수가 동작 중에 공을 잡고 다른 선수에게 태클을 당하고 있으며, 심판은 퍼스트 다운 사인을 보내고 있습니다. 코치와 다른 경기 관계자들도 보입니다.

이 모델도 자이언츠와 패트리어츠 간에 미식축구 경기가 열리고 있다는 사실은 감지해 냈으나, 뜬금없이 이 경기를 '제3회 슈퍼볼'로 비정상 지목해 버렸습니다. 또한 이 비디오의 핵심이자 눈여겨볼 장면인 '헬멧 캐치'에 대한 내용은 깔끔하게 누락했습니다.

다시 한 번 "이 경기는 무슨 경기인가요?"라는 질문을 던져 보겠습니다. Pegasus에 근접하기는 했지만 여전히 잘못된 오류 답변을 출력합니다.

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: 제공해주신 비디오는 현재 진행 중인 풋볼 경기의 영상으로, 구체적으로는 제41회 슈퍼볼(Super Bowl XLI)의 한 장면입니다. 뉴잉글랜드 패트리어츠 대 뉴욕 자이언츠의 대결입니다. 화면 속의 선수들은 자이언츠와 패트리어츠 소속입니다.



5 - 단일 비디오에서 세그먼트 단위 쿼리를 위한 RAG 적용하기

전체 비디오 분석에 있어서는 Pegasus가 LLaVa-NeXT-Video보다 더 빠르고 일관된 호환성이 높은 답변을 출력하며, 품질 측면에서도 훨씬 압도적인 퍼포먼스를 보여줍니다.

하지만 비디오 분석의 초점을 가장 밀접한 일부 세그먼트 영역으로만 좁혀주는 구조가 마련된다면, 분석 모델들의 전반적인 품질을 끌어올릴 수 있습니다. 여기서 RAG(검색 증강 생성)의 가치가 유용하게 빛을 발합니다. 비디오 전체를 무식하게 다 다루기보단, 사용자가 질문한 내용에 대한 힌트 정보가 포함된 세그먼트만 귀신같이 찾아내 국소적으로 분석을 제공하는 것입니다.

이 파이프라인을 구현하기 위해 우리는 Marengo 모델을 빌려 쓸 것입니다. 이는 비디오 세그먼트의 의미 단위 데이터를 래핑하여 완성도 높은 고품질 벡터 임베딩으로 뽑아내 주는 전용 특화 모델입니다. 이 임베딩을 구성하면 다음 플로우가 가능해집니다:

  1. 전체 긴 비디오에서 조각난 각 세그먼트별로 독립적인 임베딩 인덱스를 만듭니다.

  2. 유저 질문(문장) 정보를 파싱해 그것과 극도로 유사한 특정 비디오 세그먼트를 매칭해 냅니다.

  3. 적중률이 가장 높은 타겟 세그먼트 한두 개만 콕 집어 우리의 최종 분석 모델에 먹여 연산합니다.

그럼 이제 비디오를 일정한 구간 세그먼트로 잘라 조각내고, Marengo 모델을 사용해 각 조각들의 임베딩 벡터를 뽑아보겠습니다. 이 임베딩 세트가 우리의 RAG 시스템의 든든한 뼈대가 될 것입니다.



Marengo를 활용한 전체 비디오 및 비디오 컷 임베딩 생성

Marengo 모델이 잘라 낼 수 있는 최대 길이 제한인 10초 단위로 세그먼트 길이를 지정하겠습니다.

# Define the video segment length
segment_length = 10

그 다음 Marengo를 작동시켜 임베딩을 생성합니다. 분석할 때 video_embedding_scopes=["clip", "video"]video_clip_length=segment_length 옵션을 부여하는 것에 유의해 주시기 바랍니다. 이렇게 세팅해야 Marengo가 전체 비디오에 대칭되는 통합 임베딩과 더불어 자정해 둔 10초 단위 마이크로 클립들을 일괄 연산하여 각각의 개별 임베딩 세트로 반환해 줍니다.

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

임베딩 태스크 처리가 끝나면 나중에 해당 임베딩을 언제든 꺼내올 수 있도록 Marengo Task ID를 로컬에 보관해 두어야 합니다. 추후 Weaviate 데이터베이스에 업로드할 때 매칭할 수 있도록 marengo_task_ids 사전에 이를 매핑 정리해 두겠습니다.

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id



RAG용 비디오 세그먼트 가공 환경 설계

에러 없는 유기적인 RAG 파이프라인 처리를 완성하려면, 데이터베이스 내부에서 Pegasus가 발급한 Video ID와 방금 만든 Marengo의 Task ID를 정확히 엮어 두어야 합니다. 이렇게 한 묶음으로 설계해야 벡터 데이터베이스 탐색에서 매칭된 세그먼트에 대해 Pegasus 모델에 바로 질문을 이어 던질 수 있습니다. 이를 구축하기 위해 조각낸 미니 세그먼트 비디오 파일들을 Pegasus 인덱서로도 빠짐없이 전부 업로드해 줍니다.

가장 먼저 비디오를 10초 규격의 개별 조각 파일들로 균일하게 분할해 내는 split_video 유틸 함수를 준비합니다. 이때 Pegasus 인덱서에 업로드할 수 있는 최소 비디오 조건인 '4초 이상' 스펙을 어기지 않도록 각별히 방어 조치를 해야 합니다. 비디오를 다 잘라내고 남은 자투리 맨 마지막 조각이 혹시라도 5초 미만으로 비정상 짧은 경우엔, 그 직전 세그먼트 영역과 서로 겹치게 프레임을 넉넉히 교차 결합해 주어 비디오 분할이 안전하게 5초 이상 길이를 갖추도록 세이브 기믹을 적용해 구현했습니다.

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

지정해 둔 전용 세그먼트 데이터 저장 폴더인 video_segments_dir 경로에 분할 비디오들을 적재시킵니다.

split_video(single_video_file, video_segments_dir,segment_length)

파일명과 매칭되는 Pegasus Video ID 값을 담아 둘 매핑용 빈 사전인 pegasus_video_ids를 초기 선언하고 먼저 원본 전체 비디오의 고유 ID 정보부터 삽입해 놓겠습니다.

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

이제 준비된 미니 세그먼트용 비디오 파일들을 차례대로 Pegasus 데이터 인덱서로 업로드 전송 처리하여 리턴된 고유 ID 값들로 pegasus_video_ids 매핑용 데이터 사전을 완성해 줍니다.

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

마지막으로, 비교 모델인 LLaVa-NeXT-Video에서도 분석이 매끄럽게 수행될 수 있도록 전체 세그먼트 영상들을 미리 균일 프레임 샘플링 구조로 컨버전(배열 변환) 적용해 놓는 사전 로드 빌드 작업을 처리해 둡니다.

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video



Weaviate에 임베딩 업로드하기

Weaviate는 컬렉션으로 다룰 때 메타데이터 객체와 실제 탐색에 쓰일 임베딩 벡터 리스트를 분리해서 처리하는 방식을 권장합니다. 이에 대칭되는 Weaviate 업로드 셋을 만들기 위해, Marengo Task ID 값과 이에 매칭되는 Pegasus Video ID 정보를 전달받아 규격화된 records 리스트와 이에 상응하는 vectors 배열 세트로 파싱 가공해 내는 prepare_marengo_embeddings_for_weaviate 유틸 변환 함수를 구성하겠습니다.

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

그 다음 작성이 완료된 해당 파이프라인 컴포넌트 변환 동작 함수를 실행하여 도출된 정보 레코드와 임베딩 벡터 리스트들을 Weaviate 서버 대시보드로 바인딩 전송해 줍니다.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



벡터 검색 테스트하기

보유 중인 모든 리소스를 수집 데이터베이스 인스턴스에 집어넣었으므로, 이제 원하는 풋볼 정보 검색 쿼리에 최적화되어 대응하는지 Weaviate의 near_vector 벡터 거리가 유사하게 동작하는지 성능 테스트를 실행해 보겠습니다. 우리가 가공해 넣어둔 임베딩 세트의 임의 인텍스 벡터를 쿼리에 주입하면 본인 자신과의 벡터 연산 코사인 유클리드 거리가 완전히 0에 가깝게 딱 정방향 인출되어야 맞습니다.

임의로 타겟팅한 5번 벡터 변환 샘플값으로 가상 쿼리를 태우면 리턴된 응답 벡터 셋에서 자기 자신에 준하는 비디오 정보를 최우선 거리값 0의 레코드로 보여줄 것입니다.

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

동일한 임시 쿼리 작동으로 우리 DB 내부의 고유 벡터 레코드가 거리 0인 환상의 무결점 상태로 검색되어 반환됨을 목격했습니다.




RAG 설계를 위한 타겟 세그먼트 영상 인출 및 검증

우리가 공들여 빌드한 이번 비디오 RAG 시스템의 핵심 역량은, 클라이언트에게서 인입된 자연어 질문에 반응하고 알맞은 영상 레코드를 매칭 인출해 주는 프로세스에 있습니다. 전체적인 처리는 아래 3단계로 핵심 처리됩니다.

  1. TwelveLabs의 전용 임베딩 모델인 Marengo로 고객의 질문(텍스트) 정보를 분석하고 벡터 임베딩으로 번환 처리합니다.

  2. 생성된 질의 벡터와 DB에 미리 구축해 둔 타겟 미니 영상 고유 벡터 셋들 간 최인접 유클리드 서치를 탐색해 우수한 매칭값을 가려냅니다.

  3. 최적의 거리값으로 채택된 비디오 세그먼트를 추출하고, 이 영상에 매달려 있는 Pegasus 고유 ID를 조회해 타겟 조각 구간에 대칭되는 정밀한 답변을 합성 제공하도록 모델을 실행합니다.

이러한 정밀한 설계를 사용하면 방대한 영상에서 쓸모없는 연산 낭비를 가차 없이 패스하고, 질문 내용에 매칭되는 국소 구간만 초고속 분석 처리해 답변 전달 품질과 리소스 세이빙 비용을 엄청나게 끌어올릴 수 있습니다.

첫 번째 단계로 유저의 한글 영문 질문 데이터를 임베딩 쿼리로 인코딩합니다.

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

그 다음 Weaviate 내부에서 쿼리 임베딩과 가장 극단적으로 가까이에 인접 배치된 우수 타겟 세그먼트 단품 조각을 찾아 내겠습니다. filters=(Filter.by_property("type").equal("clip"))로 소조건을 세팅함으로써, 무겁고 모호한 전체 비디오 통합 임베딩들은 한꺼번에 배제하고 알맹이들만 타겟 인출되도록 한정 지어 성능을 보강합니다.

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

탐색 서치가 멋지게 성공하여 4번째 구간 세그먼트 데이터 파일인 football_480_segment_003.mp4 레코드를 탁월한 스펙으로 가져왔습니다.

실제 적중한 영상 세그먼트를 눈등으로 간략 매칭 시각화해 보겠습니다.

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

확인 결과, 전체 시간 축 프레임 중 타이리 선수가 실제 헬멧을 지탱 삼아 볼을 꽉 거머죄는 절정에 달한 정확한 찰나의 캐칭 파트 세그먼트 본체임을 한눈에 가려낼 수 있습니다.

우리가 의도했던 대로 완벽한 세그먼트 조각을 타겟팅했다는 사실을 확신했으니, 이제 이 짧은 조각 구간에서 두 모델이 어느 정도의 분석 퀄리티 차이를 출력하는지 직접 배틀을 시켜 연동해 보겠습니다.



매칭된 미니 세그먼트로 비디오 대화 결과 격돌: Pegasus 대 LLaVa-NeXT-Video

우선 TwelveLabs의 고급 Pegasus 추론 모델 답변 빌드 결과를 관찰해 줍니다.

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

데이비드 타이리가 공을 잡기 위해 사용한 캐칭 기술은 무엇인가요?

데이비드 타이리는 패스된 미식축구 공을 안전하게 확보하기 위하여 자신의 머리에 씌여진 헬멧 겉면에 공 부위를 거세게 내리눌러 밀착 고정하는 독창적인 임기응변 동작 기술을 펼쳤습니다. 이 시도는 자이언츠의 패스 연결 권한을 경이롭게 계속 지속시키는 데 최고의 공을 세운 역사적 하이라이트 플레이로 세이브되었습니다.

질의 요지를 통달한 것처럼, 타이리의 헬멧 위 밀착 볼 컨트롤 동작을 예술적으로 해설해 주며 경기 흐름상 기여도까지 일타 쌍피로 엮어 기품 있는 완벽한 답변을 쏟아냈습니다.

이번에는 부하가 덜해진 짧아진 똑같은 세그먼트 조각에 대하여 오픈소스 비교 구동 모델인 LLaVa-NeXT-Video는 전보다 더 영리한 답변을 만들어 내는지 연쇄 태워보겠습니다.

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: 공을 캐치하려는 모션을 보인 미식축구 선수는 양손을 사용해 머리 위 높은 영역에서 안전하게 포구하는 오버헤드 투핸드 포그립 낙하포착 기술을 활용해 볼을 제어하려 한 것으로 목격됩니다. 이 기교는 디펜더의 인터셉트 위협을 무력화하며 머리꼭대기 높이에 뜬 공을 잡아채는 유용한 포구 루트의 일종이나 밸런스를 잃고 미끄러지면 대단히 큰 패스 실패로 이어질 소지도 있습니다.

공의 비행 궤적에 따른 양손의 일반적인 물리 오버헤드 포지셔닝에 국한해 무난 무난한 답변만을 생성하는 데 그쳤습니다. 우리가 눈여겨봤던 독창적인 트레이드 마크 시그니처 동작인 '헬멧 밀착 캐치' 부분은 이번 마이크로 분석에서도 기대를 빗나가고 포착에 실패한 아쉬운 결과를 보여주었습니다.



6 - 다중 비디오 환경에서 Marengo, Weaviate 및 Pegasus 멀티 RAG 가동하기

단독 미시 단위 비디오 임베딩 구간 대칭 동작을 실컷 경험해 봤으니, 이제 서비스 레벨에서 멀티플 플레이로 가장 빈번하게 빌드 요청을 넣는 다중 비디오 분산 환경에서의 고기능성 멀티 RAG 실전 구현 단계를 보여드리겠습니다.



전체 비디오 풀에 대응하도록 Marengo 인덱싱 일괄 전개

그동안 준비해 뒀던 전체 추가 비디오 컬렉션들에 대한 Marengo 임베딩 정보를 통합 맵 marengo_task_ids 사전에 루핑 전개하여 한 세트로 관리 인덱싱 처리하겠습니다.

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id



추가 확장 비디오 풀 대상 세그먼트 분할 전개

그 외 업스케일 가공 완료된 연계 스포츠 비디오들도 기존 처리했던 방식과 마찬가지로 전부 규격화된 10초 세그먼트 데이터 조각으로 쪼개 놓아 RAG 적용 자격을 부여합니다.

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)



나머지 다중 영상 전부에 대한 Pegasus 고유 Video ID 일괄 전송 바인딩

나머지 파편화된 다중 조각 비디오 채널 전체의 Pegasus Video ID 리스트를 취득하겠습니다. 이 과정은 대기 딜레이를 많이 깎아내기 위해서 고속 멀티 락 패럴렐(병렬 스레드 처리) 호출 기법을 사용해 단번에 완성해 줍니다.

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")



최종 미싱 데이터 세트 Weaviate 일괄 데이터 바인딩

완벽하게 준비된 2단계 멀티 비디오 클립 가공 소스들을 Weaviate의 내부 클러스터 영역으로 일괄 주입 연동합니다.

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



RAG 성능 평가 지표: 마이크로 클립 검색 대 전체 비디오 직접 질문 비교

드디어 Weaviate 공간 내부에 모든 멀티 비디오에 대한 Marengo의 특징 벡터 및 Pegasus 매핑용 ID 컬렉션의 세팅을 완벽히 마쳤습니다. 이제 RAG 시스템 성능 평가를 시작해 보겠습니다. 이 테스트는 두 가지 극단적인 주요 성향 포인트를 관측할 것입니다.

  1. 답변의 핵심 적중력(Answer Quality): 클립 조각 기반의 협소한 RAG 탐색과 전체 통합 비디오를 생으로 쿼리했을 때 도출되는 응답 정보 결과의 정확함 차이가 어떻게 전개되는가?

  2. 시간 연산 절약성(Processing Efficiency): 두 방식 간에 시간 소요 격차나 리소스를 낭비하는 딜레이 측면에 어떠한 실전 마일스톤 메리트가 드러나는가?

우리는 서로 다른 성질의 질문 묶음을 던져, 각기 최단 핏을 달성하는 세그먼트 영상 컷을 유도한 케이스와 생비디오를 주입한 케이스의 성능 소요치를 지표로 얻을 것입니다. 이를 목도하면, 비디오 재생 분량이 한정 없이 늘어나도 똑똑한 RAG 구성이 불필요한 계산을 원천 배제하여 뛰어난 퍼포먼스 가치를 달성한다는 것을 자명하게 입증받을 수 있습니다.

다양한 종목의 스포츠들이 넓게 포진된 테스트용 질의응답 세트 목록을 정의해 봅니다.

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]



TwelveLabs Pegasus를 이용한 멀티 비디오 RAG

가장 먼저, 다이렉트로 전체 비디오를 상대로 질의를 대여 처리한 케이스의 소요 수치를 검산해 봅니다.

from weaviate.classes.query import MetadataQuery, Filter
import time

pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

다음으로 정교하게 세그먼트 데이터 컷을 타겟팅하여 뽑아서 RAG 연산을 적용한 결과를 추적 관찰합니다.

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

조각 세그먼트 영상만을 발췌해 RAG를 전개한 방식과 단순 생비디오를 집어넣은 경우 생성된 실시간 응답 퀄리티 비교를 제공해 드리겠습니다.

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

목격하신 것과 같이 생성해 낸 최종 정보의 퀄리티는 대단히 수려하고 서로 어깨를 나란히 할 정도로 준수한 모습을 유지합니다. 하지만 연산 시간에서 세그먼트 RAG 전략은 단 20초 만에 완수된 반면, 전체 영상을 순회 구동하면 72초라는 엄청난 리소스 지연 차이가 벌어졌습니다.




LLaVa-NeXT-Video 모형 기반 유사 다중 RAG 구동

이번 단계로는 LLaVa-NeXT-Video 오픈소스 모델에 위의 멀티 비디오 RAG 설계를 연속 태워보겠습니다. 그전에 비교 동작에 필요한 다중 컷 샘플링 프레임 생성부터 일괄 완수해 늘어놓습니다.

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

우선 통비디오 다이렉트 패싱 연동의 결과물 빌드부터 시작하겠습니다.

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

곧이어 RAG 클립 기반 추론 응답 연산으로 전개해 수치를 확인합니다.

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

이 테스트 세팅에선 두 빌드 실행 시간이 완전히 동률로 뽑혔습니다. 이 현상이 나온 타당한 이유는 비디오 연장 길이 차이와 상관없이 일률적으로 40점씩 고정 프레임만 드롭 샘플링해서 GPU 메모리에 올리기 때문입니다.

이제 LLaVa-NeXT-Video 모형이 출력해 낸 조각 클립 분석본과 통비디오 다이렉트 분석본의 텍스트 답변 내용을 살펴보겠습니다.

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

오픈소스 챗 지향 모델인 LLaVa-NeXT-Video는 클립 세그먼트 RAG를 통과시켰을 때 총 5개의 질의 중 약 2개 부분에서 올바른 과녘 정답을 타격해 냈습니다.

  1. 첫째 질문 대전 팀 정보에서, 뉴잉글랜드와 뉴욕 자이언츠가 올바르게 대결 중인 구도 맥락을 선방하여 정확히 감지했습니다.

  2. 세 번째 질판 스포츠 정보에서도, 전설적인 테니스 거장 페더러와 조코비치가 맞닥뜨린 대전 상황임을 정확히 추적 매칭했습니다.




7 - 결론: TwelveLabs 및 Weaviate 기반 고효율 비디오 RAG 구축 전략 전략과 가치 요약

오늘 마친 비디오 RAG(검색 증강 생성) 파이프라인의 실전 빌드 테스트 결과는 정확도와 시간 절약이라는 두 마리 토끼를 다 움켜잡을 수 있는 현명한 해답임을 여실히 입증했습니다. TwelveLabs의 시각 연산 분석 지배 기술과 Weaviate의 탄탄한 초고속 벡터 데이터베이스를 하이브리드로 접착 가동하면 전체 비디오 소스를 연산에서 패스시키고 유력한 부분 세그먼트만 뽑아 처리하는 영리한 엔진을 얻게 됩니다.



이 테스트를 통해 우리가 거둔 위대한 수확

  1. 비교 불가의 압도적 시간 절약(Performance Improvements): TwelveLabs의 획기적인 Pegasus 연동 구조를 Weaviate의 초당 조회 벡터 풀과 콜라보시키면, 둔중하고 긴 생 영상을 다 분석할 필요 가 없으므로 3분의 1 이상 연산 수행 속도를 전력으로 차 단하여 세이빙 시켰습니다.

  2. 오픈소스 모델의 기량 하계 극복(Enhanced Accuracy): 상대적으로 학습 한계가 명확한 LLaVa-NeXT-Video 같은 모델들마저 모호한 영상을 넓게 줄 때보다 타겟 클립으로 사유 정밀도를 높였을 때 사실 검증 정확도가 비약적으로 올라갑니다.

  3. 대단히 견고한 확장 아키텍처(Scalable Architecture): Marengo 기반 고차원 특징 추출과 Weaviate가 가진 레이턴시 없는 분산 디스크 색인 결합은 현업 비즈니스 제품군 스케일로 올리기에 최고로 안성맞춤인 락인 설계 베이스입니다.



그 외 확장 가능한 영역

이 우수한 결합 모델은 다양한 상용 마켓 도메인의 패러다임을 바꿀 킬러 무기를 제작하는 데 지대한 영향을 줄 것입니다.

  1. 엔터 및 멀티미디어 시장(Media & Entertainment): 수만 시간의 대작 미디어 저장소 아카이브 내부에서 "비 내리는 씬의 주인공 독백 신만 모아줘" 같은 마술 같은 탐색 가공 편집 환경을 초 단위로 서포트할 수 있습니다.

  2. 스포츠 마이크로 전략분석(Sports Analytics): 선수단 코칭 스태프나 경기 분석 전술가들이 훈련 비디오에서 "타자가 세 번째 스트라이크에 방망이 헛돌리는 모션만 탐색"하여 맞춤 분석 조언 서비스를 곧바로 태울 수 있도록 도울 수 있습니다.

  3. 차세대 비디오 이커머스 쇼핑(Retail & E-commerce): 무의미한 쇼츠 스타일 상품 설명 비디오들을 지대적인 대화형 쇼핑으로 개조시켜 유저가 "가방 끈 조절하는 부분만 보여줘"라고 말하면 곧바로 타겟 지점을 영상으로 피사체 하이라이트 제공 해 주는 신개념 구매 흐름을 열어젖힐 수 있습니다.

TwelveLabs의 한계가 없는 비디오 특징 분석 스택과 Weaviate의 든든한 날개 데이터베이스를 활용해 초 효율 시그널 비디오 RAG 시스템을 지금 바로 엔터프라이즈 환경에 빌드 전개해 보시기 바랍니다.