パートナーシップ

TwelveLabsとWeaviateを活用した、RAGによるビデオ処理時間の向上

ジェームズ・リー

開発者は、Twelve LabsのEmbed APIとWeaviateを使用して、分析を実行する前に最も関連性の高いビデオセグメントのみを取得するビデオRAGシステムを構築できます。これにより、Pegasusによる回答の正確性を維持しながら、ビデオ全体をクエリする場合と比較して、処理時間を72秒から20秒に短縮できます。

開発者は、Twelve LabsのEmbed APIとWeaviateを使用して、分析を実行する前に最も関連性の高いビデオセグメントのみを取得するビデオRAGシステムを構築できます。これにより、Pegasusによる回答の正確性を維持しながら、ビデオ全体をクエリする場合と比較して、処理時間を72秒から20秒に短縮できます。

この記事の内容

No headings found on page

ニュースレターに登録する

ニュースレターに登録する

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

AIを活用してビデオを検索、分析、探索します。

2025/03/18

25分

記事へのリンクをコピー

草稿を確認してくれたWeaviateチームの Tuana CelikErika Cardenas に深く感謝します!



ビデオ処理は、特に長尺コンテンツを分析する場合、計算コストが高く時間がかかります。検索拡張生成(RAG)は、システムがビデオ全体ではなく最も関連性の高いビデオセグメントのみを処理できるようにすることで、この課題を解決します。この的を絞ったアプローチにより、応答の品質を維持または向上させながら、処理時間を大幅に削減できます。

この記事では、Twelve Labsのビデオ理解機能Weaviateのベクトルデータベースを組み合わせて、ビデオコンテンツ用の効率的なRAGシステムを構築する方法を探ります。ビデオをセグメント化し、埋め込みを使用して分析に最も関連する部分のみを検索することで、精度を維持または向上させながら処理時間を大幅に改善できます。

私たちのアプローチは、いくつかの主要技術を活用しています:

  • ビデオの理解と埋め込み生成のためのTwelveLabs PegasusおよびMarengoモデル

  • ビデオセグメントを効率的に保存・収集するためのWeaviateベクトルデータベース

  • ビデオ分析の比較対象としてのオープンソースの LLaVA-NeXT-Video モデル

このRAGベースのアプローチにより、最も関連性の高いセグメントのみに焦点を当てることで、ビデオ処理の計算負荷を軽減し、より長いビデオを効率的に分析できるようになることを実証します。コンテンツモデレーション、スポーツ分析、または教育コンテンツ向けのアプリケーションのいずれを構築している場合でも、このアプローチは高品質な結果を維持しながらビデオ処理能力を拡張するのに役立ちます。



1 - TwelveLabsとWeaviateのセットアップ



TwelveLabs

まだTwelve Labsに登録していない場合は、こちらから登録できます。アカウントをセットアップしたら、Playgroundに移動し、画面右上隅のユーザーアイコンをクリックしてAPI Keyにアクセスします。

お使いのノートブックの左側にある鍵のアイコンをクリックし、この値をTL_API_KEYとしてシークレットを作成します。



Weaviate

Weaviateのアカウントをお持ちでない場合は、こちらからサインアップできます。アカウント作成後、クラウドダッシュボードに移動し、新しいクラスターを作成します。クラスターのセットアップが完了したら、ノートブックのシークレットセクションに2つの値を設定する必要があります。

REST Endpointの下にあるURLをWEAVIATE_URL変数として追加します。API Keysの下にあるAdminキーをコピーし、WEAVIATE_API_KEYに保存します。



2 - GPUランタイムの選択

LLaVA-NeXT-Videoモデルを実行するにはGPUが必要です。ノートブックでランタイム > ランタイムのタイプを変更に移動し、T4 GPUを選択します。



3 - 環境のセットアップ



依存関係のインストール

まず、TwelveLabsとWeaviateのSDKをインストールする必要があります:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

次に、残りの依存関係をインストールします。

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m



TwelveLabsおよびWeaviate SDKのセットアップ

from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

次に、TwelveLabsクライアントを初期化します。

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

最後に、Weaviateクライアントをセットアップし、Video_Embeddingsコレクションを初期化します。

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")



ビデオデータのセットアップ

次に、埋め込み用のビデオデータを取得する必要があります。このリンクを使用して、Googleドライブのフォルダ内でビデオデータを見つけることができます。基本のGoogleドライブフォルダの中に 「TwelveLabs-Weaviate」 という名前のフォルダを作成し、そこへコピーします。以下のセルを使用してドライブをマウントし、ノートブックからビデオファイルにアクセスできるようにします。

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"



ビデオのアップスケーリング

いくつかのビデオは、解像度が低すぎて埋め込みモデルで使用できません。これらを使用する前にアップスケーリングする必要があります。

ここでアップスケーリング関数を作成します。read_video_pyavは、LLaVa-NeXT-VideoのColabノートブックから直接引用したもので、推論に適した正しいnumpy表現にビデオをフォーマットします。

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

raw_video_dirフォルダ内のビデオを取得し、アップスケーリングしてupscaled_video_dirフォルダに保存します。

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)



4 - 単一ビデオによるPegasusとLLaVa-NeXT-Videoの比較

PegasusLLaVa-NeXT-Video は、どちらもビデオを取り込んでそれについて質問できるビデオ理解モデルです。

まず、ビデオコレクションから切り倒した単一のビデオでPegasusとLLaVa-NeXT-Videoを比較することから始めます。ビデオは、ニューヨーク・ジャイアンツがニューイングランド・ペイトリオッツと対戦する、第42回スーパーボウルのシーケンスを示しています。これは「ヘルメットキャッチ」と呼ばれる有名なレセプションで、ジャイアンツのクォーターバックであるイーライ・マニングがパスを投げ、これを受けたジャイアンツのワイドレシーバー、デビッド・タイリーが試合の残り2分で自身のヘルメットにボールを押し付けながらキャッチを見事に成功させたものです。

ビデオの背景を把握したところで、「このビデオで何が起きていますか?」と質問した際に、2つのモデルがビデオをどれだけ理解できるかを判定します。



Pegasusを使用してビデオと対話する

始める前に、ビデオをロードするためのPegasusインデックスをセットアップする必要があります。

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

次に、インデックスにビデオをアップロードする関数を作成します。これにより、ビデオについて質問するために使用できるPegasusのビデオIDが返されます。

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

ビデオをアップロードし、PegasusビデオIDをsingle_video_idに保存します。

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

Pegasusがビデオを本当に理解しているかを確認するために、「このビデオで何が起きていますか?簡潔に答えてください。」と質問します。

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasusから次の返答が届きました:

ビデオは、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツのアメリカンフットボールの試合における極めて重要な瞬間を紹介しています。ジャイアンツのクォーターバックであるイーライ・マニングが投げたパスを、デビッド・タイリーがアウト・オブ・バウンズに倒れ込みながら、ボールをヘルメットに固定して見事なキャッチを決めました。複数のアングルがそのキャッチをリプレイし、その難しさと正確さを強調しています。プレイ後、タイリーは短く喜びを祝い、ビデオは彼と他のチームメイトがフィールドから立ち去るシーンで終了します。

この応答から、Pegasusがビデオを深く理解していることが分かります。これがジャイアンツとペイトリオッツのフットボールの試合であることを把握しています。また、イーライ・マニングがボールを投げ、デビッド・タイリーがそれを受け止めた、試合の勝負を分ける瞬間であることも理解しています。

Pegasusからはこれがスーパーボウルであることは言及されていなかったので、確認のためにさらに質問してみます。

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

Pegasusはこれは、第42回スーパーボウルの試合です。と答えました。これは正しい回答です。

次に、LLaVa-NeXT-Videoがどの程度ビデオを解釈できるかを見てみましょう。



LLaVa-NeXT-Videoを使用してビデオと対話する

LLaVa-NeXT-Videoでは、推論の前に特定の形式でビデオデータを準備する必要があります。モデルはビデオストリーム全体を一度に処理できないため、ビデオ全体から均一にフレームを抽出します。ここでは、各ビデオから40枚の均等に分散されたフレームを抽出するサンプリング関数を作成し、コンテンツ全体の重要な瞬間を逃さず捉えます。このサンプリング手法はLLaVA-NeXT-Videoの公式実装に準拠しています。サンプリング後、Hugging Face Hubからモデルをロードし、モデルの要件に適合するように入力を構成したうえで、質問に対する応答を生成するための推論を実行します。

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

ビデオのサンプリングが適切に行われたら、モデルをセットアップします。

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

次に、モデルに対してクエリを実行する関数を作成します。

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

最後に、同様の質問をしてPegasusとの出力を比較します。

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

次のような回答が返信されました:

USER: このビデオで何が起きていますか?簡潔に。 ASSISTANT: 進行中のフットボールの試合が映っており、フィールドには複数のプレイヤーがいます。一部の選手が着用しているジャージの番号や古いスタイルのヘルメットから判断すると、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツの間の第3回スーパーボウルであるようです。1人の選手が空中姿勢でボールをキャッチしようとしており、他の選手にタックルされかけています。また、レフェリーはファーストダウンのシグナルを送っています。コーチ陣やその他の試合関係者も...

このモデルはジャイアンツとペイトリオッツのフットボールの試合であることを認識していますが、試合を第3回スーパーボウルと誤認しています。また、ビデオの最も重要な場面、すなわちヘルメットキャッチの箇所を明確に識別できていません。

先ほどと同じ「これはどの試合ですか?」という質問をしてみます。わずかに正確な内容に近づいているものの、正解には至っていません。

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: 提供された画像は、現在進行中のフットボールの試合、特に第41回スーパーボウルの一部です。これはニューイングランド・ペイトリオッツ対ニューヨーク・ジャイアンツの試合です。画像に写っているプレイヤーはジャイアンツとペイトリオッツの選手です。



5 - 単一ビデオにおけるセグメントレベル・クエリのためのRAG

比較結果により、Pegasusはビデオ全体を分析する際により優れた処理を提供し、処理時間を短縮しつつ、より正確で一貫性のある回答を返すことが分かりました。

しかしながら、モデルの注目範囲を最も関連性の高いビデオセグメントのみに絞り込むことで、さらにパフォーマンスを向上させる余地があります。これこそが検索拡張生成(RAG)の有用性が発揮される理由です。ビデオ全体を処理する代わりに、特定のクエリに関連する情報が格納されたセグメントのみを特定して分析することが可能となります。

このアプローチを実装するために、ビデオセグメントのセマンティック(意味的内容)を取得する高品質な埋め込みの作成を専門とする、TwelveLabsのMarengoモデルを活用します。これらの埋め込みによって以下が可能となります:

  1. 一度に、ビデオの各セグメントを個別にインデックス化する。

  2. ユーザーのクエリに最も一致する関連セグメントを特定する。

  3. 特定の処理セグメントのみをビデオ理解モデルによって処理する。

まず、ビデオを各セグメントに分割し、Marengoモデルを使用してそれぞれに対して埋め込みを生成します。これらの埋め込みにより、RAGシステムの基盤が確立されます。



Marengoを使用して、フルビデオおよびビデオクリップの埋め込みを作成する

セグメントの長さをMarengoでサポートされる最大の長さである10秒に設定します。

# Define the video segment length
segment_length = 10

次に、Marengoを使用してビデオの埋め込み処理を行います。注意:Marengoがビデオ全体の埋め込みと、ビデオ内の各10秒のクリップの埋め込みの両方を確実に返すようにするため、video_embedding_scopes=["clip", "video"]およびvideo_clip_length=segment_lengthを設定します。

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

埋め込み処理が完了したら、必要に応じてそれらを抽出するためにMarengoのタスクIDを保存できます。後ほどWeaviateデータベースにデータを入力する際に使用できるよう、タスクIDをmarengo_task_idsに保持します。

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id



RAG用ビデオセグメントの準備

効率的なRAGパイプラインを構築するために、データベース内でPegasusビデオIDをMarengoタスクIDと関連付けます。これにより、ベクトル検索で対応するセグメントが返された際に、そのビデオセグメントについてチャットできるようになります。これを実行するために、各ビデオセグメントをインデックス構築用のPegasusにアップロードする必要があります。

まず、Pegasusにアップロードするための、ビデオを10秒に分割するsplit_video関数を作成します。同時に、各セグメントがPegasusでの最低要件である4秒を超えていることを保証する必要があります。最後のクリップが5秒未満の場合は、最後から2番目のクリップと一部を重複させることで、これを実現します。

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

新しく作成したvideo_segments_dirにビデオを保存します。

split_video(single_video_file, video_segments_dir,segment_length)

次に、pegasus_video_ids(ファイル名とPegasusビデオIDを関連付ける辞書)を作成し、元のビデオ全体のビデオIDを追加します。

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

続いて、分割したビデオセグメントをPegasusにアップロードし、各ビデオIDをpegasus_video_idsに割り当てます。

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

最後に、LLaVa-NeXT-Videoモデルで効率的に処理できるよう、すべてのビデオをサンプリングします。

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video



Weaviateへの埋め込みデータのアップロード

Weaviateは、コレクションに登録する際にメタデータレコードと埋め込みベクトルが区別されていることを想定しています。MarengoタスクIDとPegasusビデオIDを取得し、アップロード用のrecords(レコード配列)とvectors(ベクトル配列)を準備するためのprepare_marengo_embeddings_for_weaviate関数を作成します。

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

作成した関数を使用して、Weaviateにアップロードするレコードとベクトルを取得します。

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



ベクトル検索のテスト

すべてのデータがコレクションに揃ったので、Weaviateでのベクトル検索が正しいビデオを返すかテストできます。Weaviateのnear_vector検索を使用しているため、ビデオのベクトルで検索した場合は、同一オブジェクトとして距離が「0」として返出されます。

今回はコレクションから5番目のベクトル(インデックス5)を取り出して検索します。これにより、対応するビデオセグメントが距離「0」で返されます。

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

この出力結果は、埋め込みが正常に保存され、正しく検索可能であることを示しています。




RAG用の関連性の高いビデオセグメントの取得

RAGパイプラインの核となるのは、ユーザーの質問を最も関連性の高いビデオセグメントと一致させる能力です。このプロセスは3つの重要な手順で機能します:

  1. TwelveLabsのMarengoモデルを使用して、ユーザーのテキストクエリをベクトル埋め込みに変換します。

  2. Weaviateをスキャンし、クエリ埋め込みに最も類似しているビデオセグメントの埋め込みを特定します。

  3. 最も関連性の高いビデオセグメントが特定されたら、それに関連付けられたPegasusのビデオIDを使用して、そのセグメントに特化した正確な回答を生成します。

この対象を絞った処理方法により、ビデオコンテンツの最も関連性の高い部分だけを処理させ、効率と応答品質を両立して大幅に向上させます。

まず、Marengoを使用してテキストクエリを埋め込みベクトルに変換します。

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

次に、最も関連性の高いクリップを特定します。filters=(Filter.by_property("type").equal("clip"))を使用して、ビデオ全体の埋め込みを無視し、クリップの埋め込みのみが返されるようにします。

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

検索結果により、4番目のクリップ(インデックス3)であるfootball_480_segment_003.mp4が返されたことが確認できます。

抽出されたクリップを確認してみましょう:

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

この方法によって、ヘルメットキャッチが行われたまさにその瞬間が正確に特定されています。

適切なセグメントが抽出された状態となったため、PegasusとLLaVa-NeXT-Videoがこの短いクリップに対してどのような挙動を示すかを確認します。



ビデオセグメントとの対話:Pegasus 対 LLaVa-NeTX-Video

最初にPegasusがどのように応答するかを見てみましょう。

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

What technique did David Tyree use to catch the ball?

デビッド・タイリーは、ボールをヘルメットに押し付けてキャッチを確実にする技術を使用しました。これは、ニューヨーク・ジャイアンツがポゼッションを維持し、攻撃を継続することを可能にした極めて重要なプレーでした。

Pegasusが見事な回答を提供しました。ヘルメットキャッチのプレーと、それがジャイアンツにとって重要な推進力となった点に言及できています。

次に、LLaVa-NeTX-Videoが同じセグメントをスキャンした際に、より洗練された回答を返せるか確認します。

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: キャッチに成功した選手は、両手で頭上に持ってくるキャッチング技術を使用しました。この技術は、ディフェンス陣に囲まれる中で確実にボールを手元に引き込むためのものです。意図した軌道からボールが偏向した場合にはリスクの高い技術ですが、混戦の中で空中のボールを鷲掴みするのに非常に効果的な方法です。デビッド・タイリーは...

頭上でのハンドキャッチという点に言及するものの、ヘルメットキャッチに成功したという詳細(最も特徴的な要素)を認識できていません。さらに、後半で同じ結論を繰り返し饒舌になっています。



6 - Marengo、Weaviate、およびPegasusを使用した複数ビデオRAG

単一のビデオから個々のクリップに対してMarengoの埋め込みがどのように動作するかを理解した上で、より実用的なRAGユースケースに向けて、複数のビデオにまたがる埋め込みを使用する方法を示します。



すべてのビデオについてMarengo埋め込みを取得する

まず、すべてのビデオ用のMarengoタスクIDを用いて、marengo_task_idsの辞書を更新します。

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id



残りのビデオをセグメントに分割する

残りのビデオを以前と同様にいくつかの短いセグメントに分割します。

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)



すべてのビデオおよびそのセグメントに対してPegasusビデオIDを取得する

次に、残りのセグメントと元動画すべてに対するPegasusビデオIDを抽出します。時間を節約するために、この処理を並行して実行します。

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")



Weaviateへのデータアップロード

残りの埋め込みデータをWeaviateにアップロードします。

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



RAGパフォーマンス評価:クリップ 対 フルビデオ

Marengoの埋め込みとPegasusのビデオIDがWeaviateに正常にインデックス化されたので、このRAGシステムの有効性を評価します。この評価は以下の2つの重要な側面に焦点を当てます:

  1. 回答品質:クリップレベルの検索とビデオ全体の検索の違いによって、質問に対するシステムの応答の正確性はどのように変化するか?

  2. 処理効率:応答時間および計算リソースの消費量に関してどのような性能差異があるか?

この比較により、これら違いを定量的に測定可能にし、特により長尺なビデオや、ビデオ内の特定の瞬間を提示する複雑な質問に対して、最も適切な箇所だけに対象を動的に絞ることで、RAGが処理効率と品質を同時に向上させる仕組みを実証します。

まず、複数の異なるスポーツジャンルをカバーし、かつビデオに含まれる特定のイベントを解釈することを必要とする、様々なテスト用質問セットを定義します。

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]



PegasusによるマルチビデオRAG

最初に、フルビデオを取得して質問した際の回答動作を測定します:

from weaviate.classes.query import MetadataQuery, Filter
import time

# Perform query using the whole video dataset
pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

次に、これを10秒のセグメントクリップでの実行と比較します。

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

抽出したセグメントクリップとフルビデオそれぞれの検索により生成された回答を比較します。

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

双方から十分に正確で一貫性の高い回答が得られたことが分かります。しかし、クリップを用いたアプローチがわずか20秒で完了したのに対して、ビデオ全体を毎回処理するアプローチは完了に72秒を要しました。




LLaVa-NeXT-VideoによるマルチビデオRAG

次に、LLaVa-NeXT-Videoモデルを使用して同様のテストを行います。ただし、まずは事前にすべてのビデオファイルをサンプリングする必要があります。

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

フルビデオの読み込みによる質問処理から開始します。

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

次に、これをセグメントクリップと比較します。

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

各実行で全く同等の時間を要したことが分かります。これは、ビデオの秒数に関係なく、各項目から常に均等に40枚のフレームだけを抽出して評価するためです。

次に、LLaVa-NeXT-Videoが抽出したクリップとフルビデオに対してそれぞれ生成した返答内容を調査します。

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

今回のケースでは、セグメントクリップを処理した際に、LLaVa-NeXT-Videoは5問中2問で正しい解を導き出しました:

  1. 最初の質問において、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツが対戦中である点を認識できており、また

  2. 3番目の質問において、テニスの試合がロジャー・フェデラーとノバク・ジョコビッチの間で行われている点を正確に見分けています。




7 - まとめ:TwelveLabsとWeaviateによる効率的なビデオ理解のためのRAGの活用

ビデオ処理における検索拡張生成(RAG)の調査は、効率と正確性の両面で大きな利点を示しました。TwelveLabsの高度なビデオ理解機能とWeaviateの強力なベクトルデータベースを組み合わせることで、ビデオ全体ではなく、最も関連性の高いビデオセグメントのみを論理的に処理するシステムを構築しました。



主な知見

  1. パフォーマンスの改善:Weaviateを統合したRAGシステムにTwelveLabsのPegasusを適応すると、元ビデオを毎回全て読み込ませる代わりに、抽出されたクリップだけをクエリ処理することで、レスポンス速度を劇的に改善できました。

  2. 正確性の強化:LLaVa-NeXT-Videoなどのオープンソースモデルにおいて、注目箇所をあらかじめ切り分けたクリップセグメントに限定することで、ビデオ全体の処理に比べて回答の正確さが大きく向上することが示されました。

  3. スケーラビリティを備えたアーキテクチャ:このRAGモデルの検証は、TwelveLabsの埋め込みモデル(Marengo)とWeaviateの高性能ベクトルデータベースの連携が、いかに強力な処理性能を提供するかを示しています。高次元の埋め込みデータを低遅延で検索して保存できるWeaviateの技術は、ビデオ理解などの複雑なタスクを実用的なアプリケーションに落とし込むための極めて重要なコアコンポーネントです。



ユースケース

TwelveLabsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせることで、多数の産業で強力なアプリケーションの構築が可能になります:

  1. メディア&エンターテインメント:クリエイターは大規模なアーカイブから目的のシーンを瞬時に検索可能になり、編集作業の迅速化やSNS向け動画クリップの自動生成、アーカイブの効率的な再活用などが可能になります。

  2. スポーツアナリティクス:コーチやアナリストは、ビデオを手動で早送り・巻き戻しして探すことなく、特定のアクション(例:『相手チームの裏に投げる短いパスのシーン』)をただテキストで定義するだけで、即座に関連クリップを全試合データから呼び出せるようになります。

  3. 小売・Eコマース:小売事業者は自社のチュートリアル動画を、ただ受動的に見せる一方向のコンテンツから、双方向で質問に答えるインタラクティブなショッピング体験にシフトさせることができます。例えば、顧客が『ストラップの調整方法を教えて』『バックパックに入れたときのサイズ感を見せて』と質問した際、説明動画の該当部分のみを自動で瞬時にカットして再生表示するといったことが容易に実現します。

TwelveLabsとWeaviateの組み合わせにより、膨大なビデオデータを扱う環境でも、情報抽出の効率化と高度な対話システムを同時に満たす、強力なVideo RAG基盤を構築できます。

草稿を確認してくれたWeaviateチームの Tuana CelikErika Cardenas に深く感謝します!



ビデオ処理は、特に長尺コンテンツを分析する場合、計算コストが高く時間がかかります。検索拡張生成(RAG)は、システムがビデオ全体ではなく最も関連性の高いビデオセグメントのみを処理できるようにすることで、この課題を解決します。この的を絞ったアプローチにより、応答の品質を維持または向上させながら、処理時間を大幅に削減できます。

この記事では、Twelve Labsのビデオ理解機能Weaviateのベクトルデータベースを組み合わせて、ビデオコンテンツ用の効率的なRAGシステムを構築する方法を探ります。ビデオをセグメント化し、埋め込みを使用して分析に最も関連する部分のみを検索することで、精度を維持または向上させながら処理時間を大幅に改善できます。

私たちのアプローチは、いくつかの主要技術を活用しています:

  • ビデオの理解と埋め込み生成のためのTwelveLabs PegasusおよびMarengoモデル

  • ビデオセグメントを効率的に保存・収集するためのWeaviateベクトルデータベース

  • ビデオ分析の比較対象としてのオープンソースの LLaVA-NeXT-Video モデル

このRAGベースのアプローチにより、最も関連性の高いセグメントのみに焦点を当てることで、ビデオ処理の計算負荷を軽減し、より長いビデオを効率的に分析できるようになることを実証します。コンテンツモデレーション、スポーツ分析、または教育コンテンツ向けのアプリケーションのいずれを構築している場合でも、このアプローチは高品質な結果を維持しながらビデオ処理能力を拡張するのに役立ちます。



1 - TwelveLabsとWeaviateのセットアップ



TwelveLabs

まだTwelve Labsに登録していない場合は、こちらから登録できます。アカウントをセットアップしたら、Playgroundに移動し、画面右上隅のユーザーアイコンをクリックしてAPI Keyにアクセスします。

お使いのノートブックの左側にある鍵のアイコンをクリックし、この値をTL_API_KEYとしてシークレットを作成します。



Weaviate

Weaviateのアカウントをお持ちでない場合は、こちらからサインアップできます。アカウント作成後、クラウドダッシュボードに移動し、新しいクラスターを作成します。クラスターのセットアップが完了したら、ノートブックのシークレットセクションに2つの値を設定する必要があります。

REST Endpointの下にあるURLをWEAVIATE_URL変数として追加します。API Keysの下にあるAdminキーをコピーし、WEAVIATE_API_KEYに保存します。



2 - GPUランタイムの選択

LLaVA-NeXT-Videoモデルを実行するにはGPUが必要です。ノートブックでランタイム > ランタイムのタイプを変更に移動し、T4 GPUを選択します。



3 - 環境のセットアップ



依存関係のインストール

まず、TwelveLabsとWeaviateのSDKをインストールする必要があります:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

次に、残りの依存関係をインストールします。

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m



TwelveLabsおよびWeaviate SDKのセットアップ

from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

次に、TwelveLabsクライアントを初期化します。

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

最後に、Weaviateクライアントをセットアップし、Video_Embeddingsコレクションを初期化します。

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")



ビデオデータのセットアップ

次に、埋め込み用のビデオデータを取得する必要があります。このリンクを使用して、Googleドライブのフォルダ内でビデオデータを見つけることができます。基本のGoogleドライブフォルダの中に 「TwelveLabs-Weaviate」 という名前のフォルダを作成し、そこへコピーします。以下のセルを使用してドライブをマウントし、ノートブックからビデオファイルにアクセスできるようにします。

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"



ビデオのアップスケーリング

いくつかのビデオは、解像度が低すぎて埋め込みモデルで使用できません。これらを使用する前にアップスケーリングする必要があります。

ここでアップスケーリング関数を作成します。read_video_pyavは、LLaVa-NeXT-VideoのColabノートブックから直接引用したもので、推論に適した正しいnumpy表現にビデオをフォーマットします。

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

raw_video_dirフォルダ内のビデオを取得し、アップスケーリングしてupscaled_video_dirフォルダに保存します。

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)



4 - 単一ビデオによるPegasusとLLaVa-NeXT-Videoの比較

PegasusLLaVa-NeXT-Video は、どちらもビデオを取り込んでそれについて質問できるビデオ理解モデルです。

まず、ビデオコレクションから切り倒した単一のビデオでPegasusとLLaVa-NeXT-Videoを比較することから始めます。ビデオは、ニューヨーク・ジャイアンツがニューイングランド・ペイトリオッツと対戦する、第42回スーパーボウルのシーケンスを示しています。これは「ヘルメットキャッチ」と呼ばれる有名なレセプションで、ジャイアンツのクォーターバックであるイーライ・マニングがパスを投げ、これを受けたジャイアンツのワイドレシーバー、デビッド・タイリーが試合の残り2分で自身のヘルメットにボールを押し付けながらキャッチを見事に成功させたものです。

ビデオの背景を把握したところで、「このビデオで何が起きていますか?」と質問した際に、2つのモデルがビデオをどれだけ理解できるかを判定します。



Pegasusを使用してビデオと対話する

始める前に、ビデオをロードするためのPegasusインデックスをセットアップする必要があります。

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

次に、インデックスにビデオをアップロードする関数を作成します。これにより、ビデオについて質問するために使用できるPegasusのビデオIDが返されます。

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

ビデオをアップロードし、PegasusビデオIDをsingle_video_idに保存します。

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

Pegasusがビデオを本当に理解しているかを確認するために、「このビデオで何が起きていますか?簡潔に答えてください。」と質問します。

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasusから次の返答が届きました:

ビデオは、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツのアメリカンフットボールの試合における極めて重要な瞬間を紹介しています。ジャイアンツのクォーターバックであるイーライ・マニングが投げたパスを、デビッド・タイリーがアウト・オブ・バウンズに倒れ込みながら、ボールをヘルメットに固定して見事なキャッチを決めました。複数のアングルがそのキャッチをリプレイし、その難しさと正確さを強調しています。プレイ後、タイリーは短く喜びを祝い、ビデオは彼と他のチームメイトがフィールドから立ち去るシーンで終了します。

この応答から、Pegasusがビデオを深く理解していることが分かります。これがジャイアンツとペイトリオッツのフットボールの試合であることを把握しています。また、イーライ・マニングがボールを投げ、デビッド・タイリーがそれを受け止めた、試合の勝負を分ける瞬間であることも理解しています。

Pegasusからはこれがスーパーボウルであることは言及されていなかったので、確認のためにさらに質問してみます。

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

Pegasusはこれは、第42回スーパーボウルの試合です。と答えました。これは正しい回答です。

次に、LLaVa-NeXT-Videoがどの程度ビデオを解釈できるかを見てみましょう。



LLaVa-NeXT-Videoを使用してビデオと対話する

LLaVa-NeXT-Videoでは、推論の前に特定の形式でビデオデータを準備する必要があります。モデルはビデオストリーム全体を一度に処理できないため、ビデオ全体から均一にフレームを抽出します。ここでは、各ビデオから40枚の均等に分散されたフレームを抽出するサンプリング関数を作成し、コンテンツ全体の重要な瞬間を逃さず捉えます。このサンプリング手法はLLaVA-NeXT-Videoの公式実装に準拠しています。サンプリング後、Hugging Face Hubからモデルをロードし、モデルの要件に適合するように入力を構成したうえで、質問に対する応答を生成するための推論を実行します。

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

ビデオのサンプリングが適切に行われたら、モデルをセットアップします。

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

次に、モデルに対してクエリを実行する関数を作成します。

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

最後に、同様の質問をしてPegasusとの出力を比較します。

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

次のような回答が返信されました:

USER: このビデオで何が起きていますか?簡潔に。 ASSISTANT: 進行中のフットボールの試合が映っており、フィールドには複数のプレイヤーがいます。一部の選手が着用しているジャージの番号や古いスタイルのヘルメットから判断すると、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツの間の第3回スーパーボウルであるようです。1人の選手が空中姿勢でボールをキャッチしようとしており、他の選手にタックルされかけています。また、レフェリーはファーストダウンのシグナルを送っています。コーチ陣やその他の試合関係者も...

このモデルはジャイアンツとペイトリオッツのフットボールの試合であることを認識していますが、試合を第3回スーパーボウルと誤認しています。また、ビデオの最も重要な場面、すなわちヘルメットキャッチの箇所を明確に識別できていません。

先ほどと同じ「これはどの試合ですか?」という質問をしてみます。わずかに正確な内容に近づいているものの、正解には至っていません。

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: 提供された画像は、現在進行中のフットボールの試合、特に第41回スーパーボウルの一部です。これはニューイングランド・ペイトリオッツ対ニューヨーク・ジャイアンツの試合です。画像に写っているプレイヤーはジャイアンツとペイトリオッツの選手です。



5 - 単一ビデオにおけるセグメントレベル・クエリのためのRAG

比較結果により、Pegasusはビデオ全体を分析する際により優れた処理を提供し、処理時間を短縮しつつ、より正確で一貫性のある回答を返すことが分かりました。

しかしながら、モデルの注目範囲を最も関連性の高いビデオセグメントのみに絞り込むことで、さらにパフォーマンスを向上させる余地があります。これこそが検索拡張生成(RAG)の有用性が発揮される理由です。ビデオ全体を処理する代わりに、特定のクエリに関連する情報が格納されたセグメントのみを特定して分析することが可能となります。

このアプローチを実装するために、ビデオセグメントのセマンティック(意味的内容)を取得する高品質な埋め込みの作成を専門とする、TwelveLabsのMarengoモデルを活用します。これらの埋め込みによって以下が可能となります:

  1. 一度に、ビデオの各セグメントを個別にインデックス化する。

  2. ユーザーのクエリに最も一致する関連セグメントを特定する。

  3. 特定の処理セグメントのみをビデオ理解モデルによって処理する。

まず、ビデオを各セグメントに分割し、Marengoモデルを使用してそれぞれに対して埋め込みを生成します。これらの埋め込みにより、RAGシステムの基盤が確立されます。



Marengoを使用して、フルビデオおよびビデオクリップの埋め込みを作成する

セグメントの長さをMarengoでサポートされる最大の長さである10秒に設定します。

# Define the video segment length
segment_length = 10

次に、Marengoを使用してビデオの埋め込み処理を行います。注意:Marengoがビデオ全体の埋め込みと、ビデオ内の各10秒のクリップの埋め込みの両方を確実に返すようにするため、video_embedding_scopes=["clip", "video"]およびvideo_clip_length=segment_lengthを設定します。

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

埋め込み処理が完了したら、必要に応じてそれらを抽出するためにMarengoのタスクIDを保存できます。後ほどWeaviateデータベースにデータを入力する際に使用できるよう、タスクIDをmarengo_task_idsに保持します。

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id



RAG用ビデオセグメントの準備

効率的なRAGパイプラインを構築するために、データベース内でPegasusビデオIDをMarengoタスクIDと関連付けます。これにより、ベクトル検索で対応するセグメントが返された際に、そのビデオセグメントについてチャットできるようになります。これを実行するために、各ビデオセグメントをインデックス構築用のPegasusにアップロードする必要があります。

まず、Pegasusにアップロードするための、ビデオを10秒に分割するsplit_video関数を作成します。同時に、各セグメントがPegasusでの最低要件である4秒を超えていることを保証する必要があります。最後のクリップが5秒未満の場合は、最後から2番目のクリップと一部を重複させることで、これを実現します。

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

新しく作成したvideo_segments_dirにビデオを保存します。

split_video(single_video_file, video_segments_dir,segment_length)

次に、pegasus_video_ids(ファイル名とPegasusビデオIDを関連付ける辞書)を作成し、元のビデオ全体のビデオIDを追加します。

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

続いて、分割したビデオセグメントをPegasusにアップロードし、各ビデオIDをpegasus_video_idsに割り当てます。

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

最後に、LLaVa-NeXT-Videoモデルで効率的に処理できるよう、すべてのビデオをサンプリングします。

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video



Weaviateへの埋め込みデータのアップロード

Weaviateは、コレクションに登録する際にメタデータレコードと埋め込みベクトルが区別されていることを想定しています。MarengoタスクIDとPegasusビデオIDを取得し、アップロード用のrecords(レコード配列)とvectors(ベクトル配列)を準備するためのprepare_marengo_embeddings_for_weaviate関数を作成します。

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

作成した関数を使用して、Weaviateにアップロードするレコードとベクトルを取得します。

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



ベクトル検索のテスト

すべてのデータがコレクションに揃ったので、Weaviateでのベクトル検索が正しいビデオを返すかテストできます。Weaviateのnear_vector検索を使用しているため、ビデオのベクトルで検索した場合は、同一オブジェクトとして距離が「0」として返出されます。

今回はコレクションから5番目のベクトル(インデックス5)を取り出して検索します。これにより、対応するビデオセグメントが距離「0」で返されます。

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

この出力結果は、埋め込みが正常に保存され、正しく検索可能であることを示しています。




RAG用の関連性の高いビデオセグメントの取得

RAGパイプラインの核となるのは、ユーザーの質問を最も関連性の高いビデオセグメントと一致させる能力です。このプロセスは3つの重要な手順で機能します:

  1. TwelveLabsのMarengoモデルを使用して、ユーザーのテキストクエリをベクトル埋め込みに変換します。

  2. Weaviateをスキャンし、クエリ埋め込みに最も類似しているビデオセグメントの埋め込みを特定します。

  3. 最も関連性の高いビデオセグメントが特定されたら、それに関連付けられたPegasusのビデオIDを使用して、そのセグメントに特化した正確な回答を生成します。

この対象を絞った処理方法により、ビデオコンテンツの最も関連性の高い部分だけを処理させ、効率と応答品質を両立して大幅に向上させます。

まず、Marengoを使用してテキストクエリを埋め込みベクトルに変換します。

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

次に、最も関連性の高いクリップを特定します。filters=(Filter.by_property("type").equal("clip"))を使用して、ビデオ全体の埋め込みを無視し、クリップの埋め込みのみが返されるようにします。

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

検索結果により、4番目のクリップ(インデックス3)であるfootball_480_segment_003.mp4が返されたことが確認できます。

抽出されたクリップを確認してみましょう:

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

この方法によって、ヘルメットキャッチが行われたまさにその瞬間が正確に特定されています。

適切なセグメントが抽出された状態となったため、PegasusとLLaVa-NeXT-Videoがこの短いクリップに対してどのような挙動を示すかを確認します。



ビデオセグメントとの対話:Pegasus 対 LLaVa-NeTX-Video

最初にPegasusがどのように応答するかを見てみましょう。

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

What technique did David Tyree use to catch the ball?

デビッド・タイリーは、ボールをヘルメットに押し付けてキャッチを確実にする技術を使用しました。これは、ニューヨーク・ジャイアンツがポゼッションを維持し、攻撃を継続することを可能にした極めて重要なプレーでした。

Pegasusが見事な回答を提供しました。ヘルメットキャッチのプレーと、それがジャイアンツにとって重要な推進力となった点に言及できています。

次に、LLaVa-NeTX-Videoが同じセグメントをスキャンした際に、より洗練された回答を返せるか確認します。

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: キャッチに成功した選手は、両手で頭上に持ってくるキャッチング技術を使用しました。この技術は、ディフェンス陣に囲まれる中で確実にボールを手元に引き込むためのものです。意図した軌道からボールが偏向した場合にはリスクの高い技術ですが、混戦の中で空中のボールを鷲掴みするのに非常に効果的な方法です。デビッド・タイリーは...

頭上でのハンドキャッチという点に言及するものの、ヘルメットキャッチに成功したという詳細(最も特徴的な要素)を認識できていません。さらに、後半で同じ結論を繰り返し饒舌になっています。



6 - Marengo、Weaviate、およびPegasusを使用した複数ビデオRAG

単一のビデオから個々のクリップに対してMarengoの埋め込みがどのように動作するかを理解した上で、より実用的なRAGユースケースに向けて、複数のビデオにまたがる埋め込みを使用する方法を示します。



すべてのビデオについてMarengo埋め込みを取得する

まず、すべてのビデオ用のMarengoタスクIDを用いて、marengo_task_idsの辞書を更新します。

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id



残りのビデオをセグメントに分割する

残りのビデオを以前と同様にいくつかの短いセグメントに分割します。

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)



すべてのビデオおよびそのセグメントに対してPegasusビデオIDを取得する

次に、残りのセグメントと元動画すべてに対するPegasusビデオIDを抽出します。時間を節約するために、この処理を並行して実行します。

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")



Weaviateへのデータアップロード

残りの埋め込みデータをWeaviateにアップロードします。

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



RAGパフォーマンス評価:クリップ 対 フルビデオ

Marengoの埋め込みとPegasusのビデオIDがWeaviateに正常にインデックス化されたので、このRAGシステムの有効性を評価します。この評価は以下の2つの重要な側面に焦点を当てます:

  1. 回答品質:クリップレベルの検索とビデオ全体の検索の違いによって、質問に対するシステムの応答の正確性はどのように変化するか?

  2. 処理効率:応答時間および計算リソースの消費量に関してどのような性能差異があるか?

この比較により、これら違いを定量的に測定可能にし、特により長尺なビデオや、ビデオ内の特定の瞬間を提示する複雑な質問に対して、最も適切な箇所だけに対象を動的に絞ることで、RAGが処理効率と品質を同時に向上させる仕組みを実証します。

まず、複数の異なるスポーツジャンルをカバーし、かつビデオに含まれる特定のイベントを解釈することを必要とする、様々なテスト用質問セットを定義します。

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]



PegasusによるマルチビデオRAG

最初に、フルビデオを取得して質問した際の回答動作を測定します:

from weaviate.classes.query import MetadataQuery, Filter
import time

# Perform query using the whole video dataset
pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

次に、これを10秒のセグメントクリップでの実行と比較します。

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

抽出したセグメントクリップとフルビデオそれぞれの検索により生成された回答を比較します。

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

双方から十分に正確で一貫性の高い回答が得られたことが分かります。しかし、クリップを用いたアプローチがわずか20秒で完了したのに対して、ビデオ全体を毎回処理するアプローチは完了に72秒を要しました。




LLaVa-NeXT-VideoによるマルチビデオRAG

次に、LLaVa-NeXT-Videoモデルを使用して同様のテストを行います。ただし、まずは事前にすべてのビデオファイルをサンプリングする必要があります。

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

フルビデオの読み込みによる質問処理から開始します。

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

次に、これをセグメントクリップと比較します。

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

各実行で全く同等の時間を要したことが分かります。これは、ビデオの秒数に関係なく、各項目から常に均等に40枚のフレームだけを抽出して評価するためです。

次に、LLaVa-NeXT-Videoが抽出したクリップとフルビデオに対してそれぞれ生成した返答内容を調査します。

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

今回のケースでは、セグメントクリップを処理した際に、LLaVa-NeXT-Videoは5問中2問で正しい解を導き出しました:

  1. 最初の質問において、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツが対戦中である点を認識できており、また

  2. 3番目の質問において、テニスの試合がロジャー・フェデラーとノバク・ジョコビッチの間で行われている点を正確に見分けています。




7 - まとめ:TwelveLabsとWeaviateによる効率的なビデオ理解のためのRAGの活用

ビデオ処理における検索拡張生成(RAG)の調査は、効率と正確性の両面で大きな利点を示しました。TwelveLabsの高度なビデオ理解機能とWeaviateの強力なベクトルデータベースを組み合わせることで、ビデオ全体ではなく、最も関連性の高いビデオセグメントのみを論理的に処理するシステムを構築しました。



主な知見

  1. パフォーマンスの改善:Weaviateを統合したRAGシステムにTwelveLabsのPegasusを適応すると、元ビデオを毎回全て読み込ませる代わりに、抽出されたクリップだけをクエリ処理することで、レスポンス速度を劇的に改善できました。

  2. 正確性の強化:LLaVa-NeXT-Videoなどのオープンソースモデルにおいて、注目箇所をあらかじめ切り分けたクリップセグメントに限定することで、ビデオ全体の処理に比べて回答の正確さが大きく向上することが示されました。

  3. スケーラビリティを備えたアーキテクチャ:このRAGモデルの検証は、TwelveLabsの埋め込みモデル(Marengo)とWeaviateの高性能ベクトルデータベースの連携が、いかに強力な処理性能を提供するかを示しています。高次元の埋め込みデータを低遅延で検索して保存できるWeaviateの技術は、ビデオ理解などの複雑なタスクを実用的なアプリケーションに落とし込むための極めて重要なコアコンポーネントです。



ユースケース

TwelveLabsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせることで、多数の産業で強力なアプリケーションの構築が可能になります:

  1. メディア&エンターテインメント:クリエイターは大規模なアーカイブから目的のシーンを瞬時に検索可能になり、編集作業の迅速化やSNS向け動画クリップの自動生成、アーカイブの効率的な再活用などが可能になります。

  2. スポーツアナリティクス:コーチやアナリストは、ビデオを手動で早送り・巻き戻しして探すことなく、特定のアクション(例:『相手チームの裏に投げる短いパスのシーン』)をただテキストで定義するだけで、即座に関連クリップを全試合データから呼び出せるようになります。

  3. 小売・Eコマース:小売事業者は自社のチュートリアル動画を、ただ受動的に見せる一方向のコンテンツから、双方向で質問に答えるインタラクティブなショッピング体験にシフトさせることができます。例えば、顧客が『ストラップの調整方法を教えて』『バックパックに入れたときのサイズ感を見せて』と質問した際、説明動画の該当部分のみを自動で瞬時にカットして再生表示するといったことが容易に実現します。

TwelveLabsとWeaviateの組み合わせにより、膨大なビデオデータを扱う環境でも、情報抽出の効率化と高度な対話システムを同時に満たす、強力なVideo RAG基盤を構築できます。

草稿を確認してくれたWeaviateチームの Tuana CelikErika Cardenas に深く感謝します!



ビデオ処理は、特に長尺コンテンツを分析する場合、計算コストが高く時間がかかります。検索拡張生成(RAG)は、システムがビデオ全体ではなく最も関連性の高いビデオセグメントのみを処理できるようにすることで、この課題を解決します。この的を絞ったアプローチにより、応答の品質を維持または向上させながら、処理時間を大幅に削減できます。

この記事では、Twelve Labsのビデオ理解機能Weaviateのベクトルデータベースを組み合わせて、ビデオコンテンツ用の効率的なRAGシステムを構築する方法を探ります。ビデオをセグメント化し、埋め込みを使用して分析に最も関連する部分のみを検索することで、精度を維持または向上させながら処理時間を大幅に改善できます。

私たちのアプローチは、いくつかの主要技術を活用しています:

  • ビデオの理解と埋め込み生成のためのTwelveLabs PegasusおよびMarengoモデル

  • ビデオセグメントを効率的に保存・収集するためのWeaviateベクトルデータベース

  • ビデオ分析の比較対象としてのオープンソースの LLaVA-NeXT-Video モデル

このRAGベースのアプローチにより、最も関連性の高いセグメントのみに焦点を当てることで、ビデオ処理の計算負荷を軽減し、より長いビデオを効率的に分析できるようになることを実証します。コンテンツモデレーション、スポーツ分析、または教育コンテンツ向けのアプリケーションのいずれを構築している場合でも、このアプローチは高品質な結果を維持しながらビデオ処理能力を拡張するのに役立ちます。



1 - TwelveLabsとWeaviateのセットアップ



TwelveLabs

まだTwelve Labsに登録していない場合は、こちらから登録できます。アカウントをセットアップしたら、Playgroundに移動し、画面右上隅のユーザーアイコンをクリックしてAPI Keyにアクセスします。

お使いのノートブックの左側にある鍵のアイコンをクリックし、この値をTL_API_KEYとしてシークレットを作成します。



Weaviate

Weaviateのアカウントをお持ちでない場合は、こちらからサインアップできます。アカウント作成後、クラウドダッシュボードに移動し、新しいクラスターを作成します。クラスターのセットアップが完了したら、ノートブックのシークレットセクションに2つの値を設定する必要があります。

REST Endpointの下にあるURLをWEAVIATE_URL変数として追加します。API Keysの下にあるAdminキーをコピーし、WEAVIATE_API_KEYに保存します。



2 - GPUランタイムの選択

LLaVA-NeXT-Videoモデルを実行するにはGPUが必要です。ノートブックでランタイム > ランタイムのタイプを変更に移動し、T4 GPUを選択します。



3 - 環境のセットアップ



依存関係のインストール

まず、TwelveLabsとWeaviateのSDKをインストールする必要があります:

!python -m pip install -U -q twelvelabs
!python -m pip install -U -q "weaviate-client>=4.0.0"

次に、残りの依存関係をインストールします。

!python -m pip install torch
!python -m pip install -q av
!python -m pip install --upgrade -q accelerate 
!python -m pip install -U bitsandbytes
!python -m pip install git

!python -m pip install pillow
!python -m pip install sentencepiece
!python -m



TwelveLabsおよびWeaviate SDKのセットアップ

from google.colab import userdata

TL_API_KEY=userdata.get('TL_API_KEY')
weaviate_url = userdata.get("WEAVIATE_URL")
weaviate_api_key = userdata.get("WEAVIATE_API_KEY")

次に、TwelveLabsクライアントを初期化します。

from twelvelabs import TwelveLabs

# Initialize the Twelve Labs client
twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)

最後に、Weaviateクライアントをセットアップし、Video_Embeddingsコレクションを初期化します。

import weaviate
from weaviate.classes.init import Auth

# Connect to Weaviate Cloud
weaviate_client = weaviate.connect_to_weaviate_cloud(
    cluster_url=weaviate_url,
    auth_credentials=Auth.api_key(weaviate_api_key),
)

# Get or create collection
try:
    collection = weaviate_client.collections.get("Video_Embeddings")
except:
    collection = weaviate_client.collections.create(name="Video_Embeddings")



ビデオデータのセットアップ

次に、埋め込み用のビデオデータを取得する必要があります。このリンクを使用して、Googleドライブのフォルダ内でビデオデータを見つけることができます。基本のGoogleドライブフォルダの中に 「TwelveLabs-Weaviate」 という名前のフォルダを作成し、そこへコピーします。以下のセルを使用してドライブをマウントし、ノートブックからビデオファイルにアクセスできるようにします。

from google.colab import drive
drive.mount('/content/drive')
base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate"
raw_video_dir = base_folder_path + "/sports_videos"

upscaled_video_dir = base_folder_path + "/upscaled_videos/"
video_segments_dir = base_folder_path + "/video_segments/"



ビデオのアップスケーリング

いくつかのビデオは、解像度が低すぎて埋め込みモデルで使用できません。これらを使用する前にアップスケーリングする必要があります。

ここでアップスケーリング関数を作成します。read_video_pyavは、LLaVa-NeXT-VideoのColabノートブックから直接引用したもので、推論に適した正しいnumpy表現にビデオをフォーマットします。

import av
import numpy as np

def upscale_video(input_file, output_file, target_width=1280, target_height=720):
    input_container = av.open(input_file)
    output_container = av.open(output_file, mode='w')

    input_stream = input_container.streams.video[0]
    output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate)
    output_stream.width = target_width
    output_stream.height = target_height
    output_stream.pix_fmt = 'yuv420p'

    for frame in input_container.decode(input_stream):
        frame = frame.reformat(width=target_width, height=target_height)
        packet = output_stream.encode(frame)
        output_container.mux(packet)

    # Flush the encoder
    packet = output_stream.encode(None)
    output_container.mux(packet)

    # Close the containers
    input_container.close()
    output_container.close()

def read_video_pyav(container, indices):
    '''
    Decode the video with PyAV decoder.

    Args:
        container (av.container.input.InputContainer): PyAV container.
        indices (List[int]): List of frame indices to decode.

    Returns:
        np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3).
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

raw_video_dirフォルダ内のビデオを取得し、アップスケーリングしてupscaled_video_dirフォルダに保存します。

# Create output directory if it doesn't exist
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# Iterate over all files in the raw video directory
for filename in os.listdir(raw_video_dir):
    
    # Check if the file is a video file
    if filename.endswith(".mp4"):
        print(filename)
        # Get the file name without extension
        input_file_no_ext = os.path.splitext(filename)[0]
        # Define the output file name
        output_file = f"{input_file_no_ext}_480.mp4"
        if output_file in os.listdir(upscaled_video_dir):
            continue
        # Define the full path for the input and output files
        input_file_path = os.path.join(raw_video_dir, filename)
        output_file_path = os.path.join(upscaled_video_dir, output_file)
        # Upscale the video
        upscale_video(input_file_path, output_file_path)



4 - 単一ビデオによるPegasusとLLaVa-NeXT-Videoの比較

PegasusLLaVa-NeXT-Video は、どちらもビデオを取り込んでそれについて質問できるビデオ理解モデルです。

まず、ビデオコレクションから切り倒した単一のビデオでPegasusとLLaVa-NeXT-Videoを比較することから始めます。ビデオは、ニューヨーク・ジャイアンツがニューイングランド・ペイトリオッツと対戦する、第42回スーパーボウルのシーケンスを示しています。これは「ヘルメットキャッチ」と呼ばれる有名なレセプションで、ジャイアンツのクォーターバックであるイーライ・マニングがパスを投げ、これを受けたジャイアンツのワイドレシーバー、デビッド・タイリーが試合の残り2分で自身のヘルメットにボールを押し付けながらキャッチを見事に成功させたものです。

ビデオの背景を把握したところで、「このビデオで何が起きていますか?」と質問した際に、2つのモデルがビデオをどれだけ理解できるかを判定します。



Pegasusを使用してビデオと対話する

始める前に、ビデオをロードするためのPegasusインデックスをセットアップする必要があります。

models = [
        {
            "name": "pegasus1.2",
            "options": ["visual"]
        }
    ]

index_name = "sports_videos"
indices_list = twelve_labs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelve_labs_client.index.create(
        name=index_name,
        models=models

    )
    print(f"A new index has been created: id={index.id} name={index.name} models={index.models}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} models={index.models}")

次に、インデックスにビデオをアップロードする関数を作成します。これにより、ビデオについて質問するために使用できるPegasusのビデオIDが返されます。

# Monitor the status of the video task
def on_task_update(task):
    print(f"  Status={task.status}")
    
def upload_video_to_twelve_labs_pegasus(video_path):
    task = twelve_labs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")
    return task.video_id

ビデオをアップロードし、PegasusビデオIDをsingle_video_idに保存します。

# Define the video file path
single_video_file = upscaled_video_dir + "football_480.mp4"

single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)

Pegasusがビデオを本当に理解しているかを確認するために、「このビデオで何が起きていますか?簡潔に答えてください。」と質問します。

single_video_query = "What is going on in this video? Please be concise."

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt=single_video_query
)
print(f"{res.data}")

Pegasusから次の返答が届きました:

ビデオは、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツのアメリカンフットボールの試合における極めて重要な瞬間を紹介しています。ジャイアンツのクォーターバックであるイーライ・マニングが投げたパスを、デビッド・タイリーがアウト・オブ・バウンズに倒れ込みながら、ボールをヘルメットに固定して見事なキャッチを決めました。複数のアングルがそのキャッチをリプレイし、その難しさと正確さを強調しています。プレイ後、タイリーは短く喜びを祝い、ビデオは彼と他のチームメイトがフィールドから立ち去るシーンで終了します。

この応答から、Pegasusがビデオを深く理解していることが分かります。これがジャイアンツとペイトリオッツのフットボールの試合であることを把握しています。また、イーライ・マニングがボールを投げ、デビッド・タイリーがそれを受け止めた、試合の勝負を分ける瞬間であることも理解しています。

Pegasusからはこれがスーパーボウルであることは言及されていなかったので、確認のためにさらに質問してみます。

res = twelve_labs_client.generate.text(
  video_id=single_video_id,
  prompt="What game is this?"
)
print(f"{res.data}")

Pegasusはこれは、第42回スーパーボウルの試合です。と答えました。これは正しい回答です。

次に、LLaVa-NeXT-Videoがどの程度ビデオを解釈できるかを見てみましょう。



LLaVa-NeXT-Videoを使用してビデオと対話する

LLaVa-NeXT-Videoでは、推論の前に特定の形式でビデオデータを準備する必要があります。モデルはビデオストリーム全体を一度に処理できないため、ビデオ全体から均一にフレームを抽出します。ここでは、各ビデオから40枚の均等に分散されたフレームを抽出するサンプリング関数を作成し、コンテンツ全体の重要な瞬間を逃さず捉えます。このサンプリング手法はLLaVA-NeXT-Videoの公式実装に準拠しています。サンプリング後、Hugging Face Hubからモデルをロードし、モデルの要件に適合するように入力を構成したうえで、質問に対する応答を生成するための推論を実行します。

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # sample uniformly num_samples frames from the video
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)
    
    sampled_frames = read_video_pyav(container, indices)
    
    return sampled_frames
    
sampled_video = sample_video(single_video_file, num_samples=40)

ビデオのサンプリングが適切に行われたら、モデルをセットアップします。

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

次に、モデルに対してクエリを実行する関数を作成します。

def query_llava_next(query,model,processor,sampled_video):

    # Each "content" is a list of dicts and you can add image/video/text modalities
    conversation = [
        {
            "role": "user",
            "content": [
                {"type": "text", "text": query},
                {"type": "video"},
                ],
        },
    ]

    prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
    # prompt_len = len(prompt)

    inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device)

    generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

    output = model.generate(**inputs, **generate_kwargs)
    generated_text = processor.batch_decode(output, skip_special_tokens=True)

    return generated_text[0]

最後に、同様の質問をしてPegasusとの出力を比較します。

llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

次のような回答が返信されました:

USER: このビデオで何が起きていますか?簡潔に。 ASSISTANT: 進行中のフットボールの試合が映っており、フィールドには複数のプレイヤーがいます。一部の選手が着用しているジャージの番号や古いスタイルのヘルメットから判断すると、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツの間の第3回スーパーボウルであるようです。1人の選手が空中姿勢でボールをキャッチしようとしており、他の選手にタックルされかけています。また、レフェリーはファーストダウンのシグナルを送っています。コーチ陣やその他の試合関係者も...

このモデルはジャイアンツとペイトリオッツのフットボールの試合であることを認識していますが、試合を第3回スーパーボウルと誤認しています。また、ビデオの最も重要な場面、すなわちヘルメットキャッチの箇所を明確に識別できていません。

先ほどと同じ「これはどの試合ですか?」という質問をしてみます。わずかに正確な内容に近づいているものの、正解には至っていません。

llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video)
print(llava_next_result)

USER: what game is this? ASSISTANT: 提供された画像は、現在進行中のフットボールの試合、特に第41回スーパーボウルの一部です。これはニューイングランド・ペイトリオッツ対ニューヨーク・ジャイアンツの試合です。画像に写っているプレイヤーはジャイアンツとペイトリオッツの選手です。



5 - 単一ビデオにおけるセグメントレベル・クエリのためのRAG

比較結果により、Pegasusはビデオ全体を分析する際により優れた処理を提供し、処理時間を短縮しつつ、より正確で一貫性のある回答を返すことが分かりました。

しかしながら、モデルの注目範囲を最も関連性の高いビデオセグメントのみに絞り込むことで、さらにパフォーマンスを向上させる余地があります。これこそが検索拡張生成(RAG)の有用性が発揮される理由です。ビデオ全体を処理する代わりに、特定のクエリに関連する情報が格納されたセグメントのみを特定して分析することが可能となります。

このアプローチを実装するために、ビデオセグメントのセマンティック(意味的内容)を取得する高品質な埋め込みの作成を専門とする、TwelveLabsのMarengoモデルを活用します。これらの埋め込みによって以下が可能となります:

  1. 一度に、ビデオの各セグメントを個別にインデックス化する。

  2. ユーザーのクエリに最も一致する関連セグメントを特定する。

  3. 特定の処理セグメントのみをビデオ理解モデルによって処理する。

まず、ビデオを各セグメントに分割し、Marengoモデルを使用してそれぞれに対して埋め込みを生成します。これらの埋め込みにより、RAGシステムの基盤が確立されます。



Marengoを使用して、フルビデオおよびビデオクリップの埋め込みを作成する

セグメントの長さをMarengoでサポートされる最大の長さである10秒に設定します。

# Define the video segment length
segment_length = 10

次に、Marengoを使用してビデオの埋め込み処理を行います。注意:Marengoがビデオ全体の埋め込みと、ビデオ内の各10秒のクリップの埋め込みの両方を確実に返すようにするため、video_embedding_scopes=["clip", "video"]およびvideo_clip_length=segment_lengthを設定します。

task = twelve_labs_client.embed.task.create(
    model_name="Marengo-retrieval-2.7",
    video_file=single_video_file,
    video_clip_length=segment_length,
    video_embedding_scopes=["clip", "video"]
)
print(
    f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
)

# Monitor the status of the video embedding task
status = task.wait_for_done(
    sleep_interval=2,
    callback=on_task_update
)
print(f"Embedding done: {status}")

埋め込み処理が完了したら、必要に応じてそれらを抽出するためにMarengoのタスクIDを保存できます。後ほどWeaviateデータベースにデータを入力する際に使用できるよう、タスクIDをmarengo_task_idsに保持します。

single_video_task_id = task.id

marengo_task_ids = {}

single_video_file_name = single_video_file.split("/")[-1]
marengo_task_ids[single_video_file_name] = single_video_task_id



RAG用ビデオセグメントの準備

効率的なRAGパイプラインを構築するために、データベース内でPegasusビデオIDをMarengoタスクIDと関連付けます。これにより、ベクトル検索で対応するセグメントが返された際に、そのビデオセグメントについてチャットできるようになります。これを実行するために、各ビデオセグメントをインデックス構築用のPegasusにアップロードする必要があります。

まず、Pegasusにアップロードするための、ビデオを10秒に分割するsplit_video関数を作成します。同時に、各セグメントがPegasusでの最低要件である4秒を超えていることを保証する必要があります。最後のクリップが5秒未満の場合は、最後から2番目のクリップと一部を重複させることで、これを実現します。

import os
import subprocess
import json
    
def split_video(input_path, output_dir, segment_duration=10):
    """
    Split a video into segments of the specified duration.
    Regular segments will be exactly segment_duration seconds.
    The last segment will be at least 5 seconds long, potentially overlapping
    with the previous segment if needed.
    
    Args:
        input_path: Path to the input video file
        output_dir: Directory to save the output segments
        segment_duration: Duration of each segment in seconds (default: 10)
    """

    # Minimum length for the last segment
    min_last_segment_len = 5
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Get base filename without extension
    base_name = os.path.splitext(os.path.basename(input_path))[0]
    
    # Get video duration using ffprobe
    probe_cmd = [
        "ffprobe", "-v", "quiet", "-print_format", "json",
        "-show_format", input_path
    ]
    
    try:
        probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True)
        video_info = json.loads(probe_result.stdout)
        duration = float(video_info["format"]["duration"])
    except Exception as e:
        print(f"Error getting video duration: {e}")
        return 0
    
    # Calculate number of full segments
    num_full_segments = int(duration / segment_duration)
    
    # Calculate remaining duration
    remaining_duration = duration - (num_full_segments * segment_duration)
    
    # Determine total number of segments and if we need to adjust the last segment
    if remaining_duration > 0:
        if remaining_duration < min_last_segment_len:
            # Last segment would be too short, so we'll adjust its start time
            num_segments = num_full_segments + 1
            needs_adjustment = True
        else:
            # Last segment is already long enough
            num_segments = num_full_segments + 1
            needs_adjustment = False
    else:
        # No remaining duration, all segments are complete
        num_segments = num_full_segments
        needs_adjustment = False
    
    print(f"Video {base_name} is {duration:.2f} seconds long")
    print(f"Creating {num_segments} segments")
    
    # Create each segment
    for i in range(num_segments):
        # For regular segments, start at the segment boundary
        if i < num_full_segments:
            start_time = i * segment_duration
            actual_duration = segment_duration
        else:
            # This is the last segment
            if needs_adjustment:
                # Start earlier to ensure it's at least min_last_segment_len seconds
                start_time = duration - min_last_segment_len
                actual_duration = min_last_segment_len
            else:
                # Last segment is already long enough
                start_time = i * segment_duration
                actual_duration = remaining_duration
        
        output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4")
        
        # For all segments, use copy mode for speed
        cmd = [
            "ffmpeg", "-y",
            "-ss", str(start_time),
            "-i", input_path,
            "-t", str(actual_duration),
            "-c:v", "copy",
            "-c:a", "copy",
            output_path
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"Error creating segment {i+1}: {result.stderr[:100]}...")
        else:
            end_time = start_time + actual_duration
            if i == num_segments - 1 and needs_adjustment:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)")
            else:
                print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s")
    
    print(f"Successfully split {base_name} into {num_segments} segments")
    return num_segments

新しく作成したvideo_segments_dirにビデオを保存します。

split_video(single_video_file, video_segments_dir,segment_length)

次に、pegasus_video_ids(ファイル名とPegasusビデオIDを関連付ける辞書)を作成し、元のビデオ全体のビデオIDを追加します。

pegasus_video_ids = {}

fname = single_video_file.split("/")[-1]
pegasus_video_ids[fname] = single_video_id

続いて、分割したビデオセグメントをPegasusにアップロードし、各ビデオIDをpegasus_video_idsに割り当てます。

segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))]

# Process each video
for segment_video_file in segment_video_files:
    if segment_video_file in pegasus_video_ids:
        print("skip file",segment_video_file)
        continue
    print("processing file",segment_video_file)
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file)
        pegasus_video_ids[segment_video_file] = video_id
    except:
        print("error",segment_video_file)
        continue

最後に、LLaVa-NeXT-Videoモデルで効率的に処理できるよう、すべてのビデオをサンプリングします。

sampled_video_files = {}

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

for video_file in os.listdir(upscaled_video_dir):
    print(video_file)
    sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video



Weaviateへの埋め込みデータのアップロード

Weaviateは、コレクションに登録する際にメタデータレコードと埋め込みベクトルが区別されていることを想定しています。MarengoタスクIDとPegasusビデオIDを取得し、アップロード用のrecords(レコード配列)とvectors(ベクトル配列)を準備するためのprepare_marengo_embeddings_for_weaviate関数を作成します。

def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids):

    # Prepare data for Weaviate upload
    records = []
    vectors = []

    for video_file_name in marengo_task_ids.keys():

        

        marengo_task_id = marengo_task_ids[video_file_name]

        # Retrieve marengo full video and clip embeddings
        marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id)


        #track segment number to match with fiel
        segment_number = 0

        for segment in marengo_embeddings_result.video_embedding.segments:
            # Determine if this is a video or clip segment
            is_video = segment.embedding_scope == "video"


            #Update the file name if segment
            updated_file_name = video_file_name
            if not is_video:
                updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4")
                segment_number += 1

            video_name = video_file_name.replace(".mp4","")
            
            pegasus_video_id = None
            if updated_file_name in pegasus_video_ids:
                pegasus_video_id = pegasus_video_ids[updated_file_name] 

            record = {
                'video_name':video_name,
                'segment_number': 0 if is_video else segment_number,
                'video_file': updated_file_name,
                'start_time': getattr(segment, 'start_offset_sec', 0),
                'end_time': getattr(segment, 'end_offset_sec', 0),
                'type': 'video' if is_video else 'clip',
                'task_id': marengo_task_id,
                'pegasus_video_id': pegasus_video_id
            }
            
            # Get the embedding vector
            embedding_vector = [float(x) for x in segment.embeddings_float]
            
            # Add to our lists
            records.append(record)
            vectors.append(embedding_vector)

    # Print summary
    print(f"Prepared {len(records)} segments for upload to Weaviate")
    print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}")
    print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}")

    return records, vectors

作成した関数を使用して、Weaviateにアップロードするレコードとベクトルを取得します。

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



ベクトル検索のテスト

すべてのデータがコレクションに揃ったので、Weaviateでのベクトル検索が正しいビデオを返すかテストできます。Weaviateのnear_vector検索を使用しているため、ビデオのベクトルで検索した場合は、同一オブジェクトとして距離が「0」として返出されます。

今回はコレクションから5番目のベクトル(インデックス5)を取り出して検索します。これにより、対応するビデオセグメントが距離「0」で返されます。

from weaviate.classes.query import MetadataQuery, Filter

# Use a specific vector for the query
query_vector = vectors[5]

# Perform vector search
response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
)

print(f"Found {len(response.objects)} results for vector search")
for obj in response.objects:
    print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}")
    if 'segment_id' in obj.properties:
        print(f"Segment: {obj.properties['segment_id']}")
    if 'text' in obj.properties and obj.properties['text']:
        print(f"Text: {obj.properties['text']}")
    print(f"Distance: {obj.metadata.distance}")
    print("-" * 50)

この出力結果は、埋め込みが正常に保存され、正しく検索可能であることを示しています。




RAG用の関連性の高いビデオセグメントの取得

RAGパイプラインの核となるのは、ユーザーの質問を最も関連性の高いビデオセグメントと一致させる能力です。このプロセスは3つの重要な手順で機能します:

  1. TwelveLabsのMarengoモデルを使用して、ユーザーのテキストクエリをベクトル埋め込みに変換します。

  2. Weaviateをスキャンし、クエリ埋め込みに最も類似しているビデオセグメントの埋め込みを特定します。

  3. 最も関連性の高いビデオセグメントが特定されたら、それに関連付けられたPegasusのビデオIDを使用して、そのセグメントに特化した正確な回答を生成します。

この対象を絞った処理方法により、ビデオコンテンツの最も関連性の高い部分だけを処理させ、効率と応答品質を両立して大幅に向上させます。

まず、Marengoを使用してテキストクエリを埋め込みベクトルに変換します。

sample_question = "What technique did David Tyree use to catch the ball?"

embedding = twelve_labs_client.embed.create(
    model_name="Marengo-retrieval-2.7",
    text=sample_question,
    text_truncate="start",
)

query_vector = embedding.text_embedding.segments[0].embeddings_float

次に、最も関連性の高いクリップを特定します。filters=(Filter.by_property("type").equal("clip"))を使用して、ビデオ全体の埋め込みを無視し、クリップの埋め込みのみが返されるようにします。

response = collection.query.near_vector(
    near_vector=query_vector,
    limit=1,
    return_metadata=MetadataQuery(distance=True),
    filters=(Filter.by_property("type").equal("clip"))
)

video_file = response.objects[0].properties.get("video_file")
print(video_file)

検索結果により、4番目のクリップ(インデックス3)であるfootball_480_segment_003.mp4が返されたことが確認できます。

抽出されたクリップを確認してみましょう:

import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

video_file = response.objects[0].properties.get("video_file")
video = sampled_video_files[video_file]

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # this is required to not display the generated image

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())

この方法によって、ヘルメットキャッチが行われたまさにその瞬間が正確に特定されています。

適切なセグメントが抽出された状態となったため、PegasusとLLaVa-NeXT-Videoがこの短いクリップに対してどのような挙動を示すかを確認します。



ビデオセグメントとの対話:Pegasus 対 LLaVa-NeTX-Video

最初にPegasusがどのように応答するかを見てみましょう。

pegasus_video_id = response.objects[0].properties.get("pegasus_video_id")


print(sample_question)

res = twelve_labs_client.generate.text(
  video_id=pegasus_video_id,
  prompt=sample_question
)
print(f"{res.data}")

What technique did David Tyree use to catch the ball?

デビッド・タイリーは、ボールをヘルメットに押し付けてキャッチを確実にする技術を使用しました。これは、ニューヨーク・ジャイアンツがポゼッションを維持し、攻撃を継続することを可能にした極めて重要なプレーでした。

Pegasusが見事な回答を提供しました。ヘルメットキャッチのプレーと、それがジャイアンツにとって重要な推進力となった点に言及できています。

次に、LLaVa-NeTX-Videoが同じセグメントをスキャンした際に、より洗練された回答を返せるか確認します。

video_file = response.objects[0].properties.get("video_file")
sampled_video = sampled_video_files[video_file]
generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video)

print(generated_text)

USER: What technique did David Tyree use to catch the ball? ASSISTANT: キャッチに成功した選手は、両手で頭上に持ってくるキャッチング技術を使用しました。この技術は、ディフェンス陣に囲まれる中で確実にボールを手元に引き込むためのものです。意図した軌道からボールが偏向した場合にはリスクの高い技術ですが、混戦の中で空中のボールを鷲掴みするのに非常に効果的な方法です。デビッド・タイリーは...

頭上でのハンドキャッチという点に言及するものの、ヘルメットキャッチに成功したという詳細(最も特徴的な要素)を認識できていません。さらに、後半で同じ結論を繰り返し饒舌になっています。



6 - Marengo、Weaviate、およびPegasusを使用した複数ビデオRAG

単一のビデオから個々のクリップに対してMarengoの埋め込みがどのように動作するかを理解した上で、より実用的なRAGユースケースに向けて、複数のビデオにまたがる埋め込みを使用する方法を示します。



すべてのビデオについてMarengo埋め込みを取得する

まず、すべてのビデオ用のMarengoタスクIDを用いて、marengo_task_idsの辞書を更新します。

for video_file_name in os.listdir(upscaled_video_dir):

    if video_file_name in marengo_task_ids:
        print(f"skipping {video_file_name} because embeddings already exist")
        continue

    print(f"processing {video_file_name}")

    file_path = os.path.join(upscaled_video_dir, video_file_name)

    task = twelve_labs_client.embed.task.create(
        model_name="Marengo-retrieval-2.7",
        video_file=file_path,
        video_clip_length=segment_length,
        video_embedding_scopes=["clip", "video"]
    )
    print(
        f"Created task: id={task.id} model_name={task.model_name} status={task.status}"
    )

    # Monitor the status of the video embedding task
    status = task.wait_for_done(
        sleep_interval=2,
        callback=on_task_update
    )
    print(f"Embedding done: {status}")

    marengo_task_ids[video_file_name] = task.id



残りのビデオをセグメントに分割する

残りのビデオを以前と同様にいくつかの短いセグメントに分割します。

# Create output folder if it doesn't exist
os.makedirs(upscaled_video_dir, exist_ok=True)

# Get all video files
video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))]

# Process each video
for video_file in video_files:
    split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)



すべてのビデオおよびそのセグメントに対してPegasusビデオIDを取得する

次に、残りのセグメントと元動画すべてに対するPegasusビデオIDを抽出します。時間を節約するために、この処理を並行して実行します。

import concurrent.futures
import os
from tqdm import tqdm  # Use standard tqdm instead of tqdm.notebook

def process_video(video_path):
    video_file_name = video_path.split("/")[-1]
    try:
        video_id = upload_video_to_twelve_labs_pegasus(video_path)
        return video_file_name, video_id
    except Exception as e:
        print(f"Error processing {video_file_name}: {str(e)}")
        return video_file_name, None

# Filter out videos that are already processed
segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')]
full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')]
all_video_files = segment_video_files + full_video_files

videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids]

print(f"Processing {len(videos_to_process)} videos in parallel...")

# Use ThreadPoolExecutor for I/O-bound operations like API calls
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # Submit all tasks and create a dictionary mapping futures to their video files
    future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process}
    
    # Process results as they complete with a progress bar
    for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)):
        video_file_name, video_id = future.result()
        if video_id:
            pegasus_video_ids[video_file_name] = video_id

print("All videos processed!")
print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")



Weaviateへのデータアップロード

残りの埋め込みデータをWeaviateにアップロードします。

records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids)

with collection.batch.dynamic() as batch:
    for i, record in enumerate(records):
        if record["pegasus_video_id"] is None:
            continue
        batch.add_object(
            properties=record,
            vector=vectors[i]
        )

print(f"Added {len(records)} embeddings to Weaviate")



RAGパフォーマンス評価:クリップ 対 フルビデオ

Marengoの埋め込みとPegasusのビデオIDがWeaviateに正常にインデックス化されたので、このRAGシステムの有効性を評価します。この評価は以下の2つの重要な側面に焦点を当てます:

  1. 回答品質:クリップレベルの検索とビデオ全体の検索の違いによって、質問に対するシステムの応答の正確性はどのように変化するか?

  2. 処理効率:応答時間および計算リソースの消費量に関してどのような性能差異があるか?

この比較により、これら違いを定量的に測定可能にし、特により長尺なビデオや、ビデオ内の特定の瞬間を提示する複雑な質問に対して、最も適切な箇所だけに対象を動的に絞ることで、RAGが処理効率と品質を同時に向上させる仕組みを実証します。

まず、複数の異なるスポーツジャンルをカバーし、かつビデオに含まれる特定のイベントを解釈することを必要とする、様々なテスト用質問セットを定義します。

video_questions = [
    "In the American Football Video, what are the teams playing?", 
    "What technique does David Tyree use to catch the ball?",
    "In the tennis match video, who is playing?", 
    "What foot does Messi shoot at the goal with?",
    "When does Keri Strug hurt her foot?"
]



PegasusによるマルチビデオRAG

最初に、フルビデオを取得して質問した際の回答動作を測定します:

from weaviate.classes.query import MetadataQuery, Filter
import time

# Perform query using the whole video dataset
pegasus_full_video_answers = []

start_time = time.time()

for question in video_questions:

    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_full_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 72 seconds

次に、これを10秒のセグメントクリップでの実行と比較します。

pegasus_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start",
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_name = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    res = twelve_labs_client.generate.text(
        video_id=selected_video_id,
        prompt=question
    )

    pegasus_clip_video_answers.append([question,selected_video_name,res.data])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 20 seconds

抽出したセグメントクリップとフルビデオそれぞれの検索により生成された回答を比較します。

for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

双方から十分に正確で一貫性の高い回答が得られたことが分かります。しかし、クリップを用いたアプローチがわずか20秒で完了したのに対して、ビデオ全体を毎回処理するアプローチは完了に72秒を要しました。




LLaVa-NeXT-VideoによるマルチビデオRAG

次に、LLaVa-NeXT-Videoモデルを使用して同様のテストを行います。ただし、まずは事前にすべてのビデオファイルをサンプリングする必要があります。

for video_file in os.listdir(video_segments_dir):
    print(video_file)
    sampled_video = sample_video(video_segments_dir + video_file,num_samples=40)
    sampled_video_files[video_file] = sampled_video

フルビデオの読み込みによる質問処理から開始します。

llava_full_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("video"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_full_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

次に、これをセグメントクリップと比較します。

from weaviate.classes.query import MetadataQuery

import time

llava_clip_video_answers = []

start_time = time.time()

for question in video_questions:
    embedding = twelve_labs_client.embed.create(
        model_name="Marengo-retrieval-2.7",
        text=question,
        text_truncate="start"
    )

    query_vector = embedding.text_embedding.segments[0].embeddings_float

    response = collection.query.near_vector(
        near_vector=query_vector,
        limit=1,
        return_metadata=MetadataQuery(distance=True),
        filters=(Filter.by_property("type").equal("clip"))
    )

    selected_video_file = response.objects[0].properties["video_file"]
    selected_video_id = response.objects[0].properties["pegasus_video_id"]

    sampled_video = sampled_video_files[selected_video_file]
    generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video)

    llava_clip_video_answers.append([question,selected_video_name,generated_text])

end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {int(execution_time)} seconds")

Execution time: 24 seconds

各実行で全く同等の時間を要したことが分かります。これは、ビデオの秒数に関係なく、各項目から常に均等に40枚のフレームだけを抽出して評価するためです。

次に、LLaVa-NeXT-Videoが抽出したクリップとフルビデオに対してそれぞれ生成した返答内容を調査します。

for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers):

    print("question",clip_answer[0])
    print("clip:  ",clip_answer[2])
    print("full:  ",full_answer[2])
    print("\n")

今回のケースでは、セグメントクリップを処理した際に、LLaVa-NeXT-Videoは5問中2問で正しい解を導き出しました:

  1. 最初の質問において、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツが対戦中である点を認識できており、また

  2. 3番目の質問において、テニスの試合がロジャー・フェデラーとノバク・ジョコビッチの間で行われている点を正確に見分けています。




7 - まとめ:TwelveLabsとWeaviateによる効率的なビデオ理解のためのRAGの活用

ビデオ処理における検索拡張生成(RAG)の調査は、効率と正確性の両面で大きな利点を示しました。TwelveLabsの高度なビデオ理解機能とWeaviateの強力なベクトルデータベースを組み合わせることで、ビデオ全体ではなく、最も関連性の高いビデオセグメントのみを論理的に処理するシステムを構築しました。



主な知見

  1. パフォーマンスの改善:Weaviateを統合したRAGシステムにTwelveLabsのPegasusを適応すると、元ビデオを毎回全て読み込ませる代わりに、抽出されたクリップだけをクエリ処理することで、レスポンス速度を劇的に改善できました。

  2. 正確性の強化:LLaVa-NeXT-Videoなどのオープンソースモデルにおいて、注目箇所をあらかじめ切り分けたクリップセグメントに限定することで、ビデオ全体の処理に比べて回答の正確さが大きく向上することが示されました。

  3. スケーラビリティを備えたアーキテクチャ:このRAGモデルの検証は、TwelveLabsの埋め込みモデル(Marengo)とWeaviateの高性能ベクトルデータベースの連携が、いかに強力な処理性能を提供するかを示しています。高次元の埋め込みデータを低遅延で検索して保存できるWeaviateの技術は、ビデオ理解などの複雑なタスクを実用的なアプリケーションに落とし込むための極めて重要なコアコンポーネントです。



ユースケース

TwelveLabsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせることで、多数の産業で強力なアプリケーションの構築が可能になります:

  1. メディア&エンターテインメント:クリエイターは大規模なアーカイブから目的のシーンを瞬時に検索可能になり、編集作業の迅速化やSNS向け動画クリップの自動生成、アーカイブの効率的な再活用などが可能になります。

  2. スポーツアナリティクス:コーチやアナリストは、ビデオを手動で早送り・巻き戻しして探すことなく、特定のアクション(例:『相手チームの裏に投げる短いパスのシーン』)をただテキストで定義するだけで、即座に関連クリップを全試合データから呼び出せるようになります。

  3. 小売・Eコマース:小売事業者は自社のチュートリアル動画を、ただ受動的に見せる一方向のコンテンツから、双方向で質問に答えるインタラクティブなショッピング体験にシフトさせることができます。例えば、顧客が『ストラップの調整方法を教えて』『バックパックに入れたときのサイズ感を見せて』と質問した際、説明動画の該当部分のみを自動で瞬時にカットして再生表示するといったことが容易に実現します。

TwelveLabsとWeaviateの組み合わせにより、膨大なビデオデータを扱う環境でも、情報抽出の効率化と高度な対話システムを同時に満たす、強力なVideo RAG基盤を構築できます。