
パートナーシップ
TwelveLabsとWeaviateを活用した、RAGによるビデオ処理時間の向上

ジェームズ・リー
開発者は、Twelve LabsのEmbed APIとWeaviateを使用して、分析を実行する前に最も関連性の高いビデオセグメントのみを取得するビデオRAGシステムを構築できます。これにより、Pegasusによる回答の正確性を維持しながら、ビデオ全体をクエリする場合と比較して、処理時間を72秒から20秒に短縮できます。
開発者は、Twelve LabsのEmbed APIとWeaviateを使用して、分析を実行する前に最も関連性の高いビデオセグメントのみを取得するビデオRAGシステムを構築できます。これにより、Pegasusによる回答の正確性を維持しながら、ビデオ全体をクエリする場合と比較して、処理時間を72秒から20秒に短縮できます。

この記事の内容
No headings found on page
ニュースレターに登録する
ニュースレターに登録する
ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします
ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします
AIを活用してビデオを検索、分析、探索します。
2025/03/18
25分
記事へのリンクをコピー
草稿を確認してくれたWeaviateチームの Tuana Celik と Erika Cardenas に深く感謝します!
ビデオ処理は、特に長尺コンテンツを分析する場合、計算コストが高く時間がかかります。検索拡張生成(RAG)は、システムがビデオ全体ではなく最も関連性の高いビデオセグメントのみを処理できるようにすることで、この課題を解決します。この的を絞ったアプローチにより、応答の品質を維持または向上させながら、処理時間を大幅に削減できます。
この記事では、Twelve Labsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせて、ビデオコンテンツ用の効率的なRAGシステムを構築する方法を探ります。ビデオをセグメント化し、埋め込みを使用して分析に最も関連する部分のみを検索することで、精度を維持または向上させながら処理時間を大幅に改善できます。
私たちのアプローチは、いくつかの主要技術を活用しています:
ビデオの理解と埋め込み生成のためのTwelveLabs PegasusおよびMarengoモデル
ビデオセグメントを効率的に保存・収集するためのWeaviateベクトルデータベース
ビデオ分析の比較対象としてのオープンソースの LLaVA-NeXT-Video モデル
このRAGベースのアプローチにより、最も関連性の高いセグメントのみに焦点を当てることで、ビデオ処理の計算負荷を軽減し、より長いビデオを効率的に分析できるようになることを実証します。コンテンツモデレーション、スポーツ分析、または教育コンテンツ向けのアプリケーションのいずれを構築している場合でも、このアプローチは高品質な結果を維持しながらビデオ処理能力を拡張するのに役立ちます。
1 - TwelveLabsとWeaviateのセットアップ
TwelveLabs
まだTwelve Labsに登録していない場合は、こちらから登録できます。アカウントをセットアップしたら、Playgroundに移動し、画面右上隅のユーザーアイコンをクリックしてAPI Keyにアクセスします。
お使いのノートブックの左側にある鍵のアイコンをクリックし、この値をTL_API_KEYとしてシークレットを作成します。
Weaviate
Weaviateのアカウントをお持ちでない場合は、こちらからサインアップできます。アカウント作成後、クラウドダッシュボードに移動し、新しいクラスターを作成します。クラスターのセットアップが完了したら、ノートブックのシークレットセクションに2つの値を設定する必要があります。
REST Endpointの下にあるURLをWEAVIATE_URL変数として追加します。API Keysの下にあるAdminキーをコピーし、WEAVIATE_API_KEYに保存します。
2 - GPUランタイムの選択
LLaVA-NeXT-Videoモデルを実行するにはGPUが必要です。ノートブックでランタイム > ランタイムのタイプを変更に移動し、T4 GPUを選択します。
3 - 環境のセットアップ
依存関係のインストール
まず、TwelveLabsとWeaviateのSDKをインストールする必要があります:
!python -m pip install -U -q twelvelabs !python -m pip install -U -q "weaviate-client>=4.0.0"
次に、残りの依存関係をインストールします。
!python -m pip install torch !python -m pip install -q av !python -m pip install --upgrade -q accelerate !python -m pip install -U bitsandbytes !python -m pip install git
!python -m pip install pillow !python -m pip install sentencepiece !python -m
TwelveLabsおよびWeaviate SDKのセットアップ
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') weaviate_url = userdata.get("WEAVIATE_URL") weaviate_api_key = userdata.get("WEAVIATE_API_KEY")
次に、TwelveLabsクライアントを初期化します。
from twelvelabs import TwelveLabs # Initialize the Twelve Labs client twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)
最後に、Weaviateクライアントをセットアップし、Video_Embeddingsコレクションを初期化します。
import weaviate from weaviate.classes.init import Auth # Connect to Weaviate Cloud weaviate_client = weaviate.connect_to_weaviate_cloud( cluster_url=weaviate_url, auth_credentials=Auth.api_key(weaviate_api_key), ) # Get or create collection try: collection = weaviate_client.collections.get("Video_Embeddings") except: collection = weaviate_client.collections.create(name="Video_Embeddings")
ビデオデータのセットアップ
次に、埋め込み用のビデオデータを取得する必要があります。このリンクを使用して、Googleドライブのフォルダ内でビデオデータを見つけることができます。基本のGoogleドライブフォルダの中に 「TwelveLabs-Weaviate」 という名前のフォルダを作成し、そこへコピーします。以下のセルを使用してドライブをマウントし、ノートブックからビデオファイルにアクセスできるようにします。
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate" raw_video_dir = base_folder_path + "/sports_videos" upscaled_video_dir = base_folder_path + "/upscaled_videos/" video_segments_dir = base_folder_path + "/video_segments/"
ビデオのアップスケーリング
いくつかのビデオは、解像度が低すぎて埋め込みモデルで使用できません。これらを使用する前にアップスケーリングする必要があります。
ここでアップスケーリング関数を作成します。read_video_pyavは、LLaVa-NeXT-VideoのColabノートブックから直接引用したもので、推論に適した正しいnumpy表現にビデオをフォーマットします。
import av import numpy as np def upscale_video(input_file, output_file, target_width=1280, target_height=720): input_container = av.open(input_file) output_container = av.open(output_file, mode='w') input_stream = input_container.streams.video[0] output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate) output_stream.width = target_width output_stream.height = target_height output_stream.pix_fmt = 'yuv420p' for frame in input_container.decode(input_stream): frame = frame.reformat(width=target_width, height=target_height) packet = output_stream.encode(frame) output_container.mux(packet) # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the containers input_container.close() output_container.close() def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame) return np.stack([x.to_ndarray(format="rgb24") for x in frames])
raw_video_dirフォルダ内のビデオを取得し、アップスケーリングしてupscaled_video_dirフォルダに保存します。
# Create output directory if it doesn't exist if not os.path.exists(upscaled_video_dir): os.makedirs(upscaled_video_dir) # Iterate over all files in the raw video directory for filename in os.listdir(raw_video_dir): # Check if the file is a video file if filename.endswith(".mp4"): print(filename) # Get the file name without extension input_file_no_ext = os.path.splitext(filename)[0] # Define the output file name output_file = f"{input_file_no_ext}_480.mp4" if output_file in os.listdir(upscaled_video_dir): continue # Define the full path for the input and output files input_file_path = os.path.join(raw_video_dir, filename) output_file_path = os.path.join(upscaled_video_dir, output_file) # Upscale the video upscale_video(input_file_path, output_file_path)
4 - 単一ビデオによるPegasusとLLaVa-NeXT-Videoの比較
Pegasus と LLaVa-NeXT-Video は、どちらもビデオを取り込んでそれについて質問できるビデオ理解モデルです。
まず、ビデオコレクションから切り倒した単一のビデオでPegasusとLLaVa-NeXT-Videoを比較することから始めます。ビデオは、ニューヨーク・ジャイアンツがニューイングランド・ペイトリオッツと対戦する、第42回スーパーボウルのシーケンスを示しています。これは「ヘルメットキャッチ」と呼ばれる有名なレセプションで、ジャイアンツのクォーターバックであるイーライ・マニングがパスを投げ、これを受けたジャイアンツのワイドレシーバー、デビッド・タイリーが試合の残り2分で自身のヘルメットにボールを押し付けながらキャッチを見事に成功させたものです。
ビデオの背景を把握したところで、「このビデオで何が起きていますか?」と質問した際に、2つのモデルがビデオをどれだけ理解できるかを判定します。
Pegasusを使用してビデオと対話する
始める前に、ビデオをロードするためのPegasusインデックスをセットアップする必要があります。
models = [ { "name": "pegasus1.2", "options": ["visual"] } ] index_name = "sports_videos" indices_list = twelve_labs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelve_labs_client.index.create( name=index_name, models=models ) print(f"A new index has been created: id={index.id} name={index.name} models={index.models}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} models={index.models}")
次に、インデックスにビデオをアップロードする関数を作成します。これにより、ビデオについて質問するために使用できるPegasusのビデオIDが返されます。
# Monitor the status of the video task def on_task_update(task): print(f" Status={task.status}") def upload_video_to_twelve_labs_pegasus(video_path): task = twelve_labs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifer of your video is {task.video_id}.") return task.video_id
ビデオをアップロードし、PegasusビデオIDをsingle_video_idに保存します。
# Define the video file path single_video_file = upscaled_video_dir + "football_480.mp4" single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)
Pegasusがビデオを本当に理解しているかを確認するために、「このビデオで何が起きていますか?簡潔に答えてください。」と質問します。
single_video_query = "What is going on in this video? Please be concise." res = twelve_labs_client.generate.text( video_id=single_video_id, prompt=single_video_query ) print(f"{res.data}")
Pegasusから次の返答が届きました:
ビデオは、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツのアメリカンフットボールの試合における極めて重要な瞬間を紹介しています。ジャイアンツのクォーターバックであるイーライ・マニングが投げたパスを、デビッド・タイリーがアウト・オブ・バウンズに倒れ込みながら、ボールをヘルメットに固定して見事なキャッチを決めました。複数のアングルがそのキャッチをリプレイし、その難しさと正確さを強調しています。プレイ後、タイリーは短く喜びを祝い、ビデオは彼と他のチームメイトがフィールドから立ち去るシーンで終了します。
この応答から、Pegasusがビデオを深く理解していることが分かります。これがジャイアンツとペイトリオッツのフットボールの試合であることを把握しています。また、イーライ・マニングがボールを投げ、デビッド・タイリーがそれを受け止めた、試合の勝負を分ける瞬間であることも理解しています。
Pegasusからはこれがスーパーボウルであることは言及されていなかったので、確認のためにさらに質問してみます。
res = twelve_labs_client.generate.text( video_id=single_video_id, prompt="What game is this?" ) print(f"{res.data}")
Pegasusはこれは、第42回スーパーボウルの試合です。と答えました。これは正しい回答です。
次に、LLaVa-NeXT-Videoがどの程度ビデオを解釈できるかを見てみましょう。
LLaVa-NeXT-Videoを使用してビデオと対話する
LLaVa-NeXT-Videoでは、推論の前に特定の形式でビデオデータを準備する必要があります。モデルはビデオストリーム全体を一度に処理できないため、ビデオ全体から均一にフレームを抽出します。ここでは、各ビデオから40枚の均等に分散されたフレームを抽出するサンプリング関数を作成し、コンテンツ全体の重要な瞬間を逃さず捉えます。このサンプリング手法はLLaVA-NeXT-Videoの公式実装に準拠しています。サンプリング後、Hugging Face Hubからモデルをロードし、モデルの要件に適合するように入力を構成したうえで、質問に対する応答を生成するための推論を実行します。
def sample_video(video_path, num_samples=8): container = av.open(video_path) # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames sampled_video = sample_video(single_video_file, num_samples=40)
ビデオのサンプリングが適切に行われたら、モデルをセットアップします。
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
次に、モデルに対してクエリを実行する関数を作成します。
def query_llava_next(query,model,processor,sampled_video): # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) # prompt_len = len(prompt) inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) return generated_text[0]
最後に、同様の質問をしてPegasusとの出力を比較します。
llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
次のような回答が返信されました:
USER: このビデオで何が起きていますか?簡潔に。 ASSISTANT: 進行中のフットボールの試合が映っており、フィールドには複数のプレイヤーがいます。一部の選手が着用しているジャージの番号や古いスタイルのヘルメットから判断すると、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツの間の第3回スーパーボウルであるようです。1人の選手が空中姿勢でボールをキャッチしようとしており、他の選手にタックルされかけています。また、レフェリーはファーストダウンのシグナルを送っています。コーチ陣やその他の試合関係者も...
このモデルはジャイアンツとペイトリオッツのフットボールの試合であることを認識していますが、試合を第3回スーパーボウルと誤認しています。また、ビデオの最も重要な場面、すなわちヘルメットキャッチの箇所を明確に識別できていません。
先ほどと同じ「これはどの試合ですか?」という質問をしてみます。わずかに正確な内容に近づいているものの、正解には至っていません。
llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
USER: what game is this? ASSISTANT: 提供された画像は、現在進行中のフットボールの試合、特に第41回スーパーボウルの一部です。これはニューイングランド・ペイトリオッツ対ニューヨーク・ジャイアンツの試合です。画像に写っているプレイヤーはジャイアンツとペイトリオッツの選手です。
5 - 単一ビデオにおけるセグメントレベル・クエリのためのRAG
比較結果により、Pegasusはビデオ全体を分析する際により優れた処理を提供し、処理時間を短縮しつつ、より正確で一貫性のある回答を返すことが分かりました。
しかしながら、モデルの注目範囲を最も関連性の高いビデオセグメントのみに絞り込むことで、さらにパフォーマンスを向上させる余地があります。これこそが検索拡張生成(RAG)の有用性が発揮される理由です。ビデオ全体を処理する代わりに、特定のクエリに関連する情報が格納されたセグメントのみを特定して分析することが可能となります。
このアプローチを実装するために、ビデオセグメントのセマンティック(意味的内容)を取得する高品質な埋め込みの作成を専門とする、TwelveLabsのMarengoモデルを活用します。これらの埋め込みによって以下が可能となります:
一度に、ビデオの各セグメントを個別にインデックス化する。
ユーザーのクエリに最も一致する関連セグメントを特定する。
特定の処理セグメントのみをビデオ理解モデルによって処理する。
まず、ビデオを各セグメントに分割し、Marengoモデルを使用してそれぞれに対して埋め込みを生成します。これらの埋め込みにより、RAGシステムの基盤が確立されます。
Marengoを使用して、フルビデオおよびビデオクリップの埋め込みを作成する
セグメントの長さをMarengoでサポートされる最大の長さである10秒に設定します。
# Define the video segment length segment_length = 10
次に、Marengoを使用してビデオの埋め込み処理を行います。注意:Marengoがビデオ全体の埋め込みと、ビデオ内の各10秒のクリップの埋め込みの両方を確実に返すようにするため、video_embedding_scopes=["clip", "video"]およびvideo_clip_length=segment_lengthを設定します。
task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=single_video_file, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}")
埋め込み処理が完了したら、必要に応じてそれらを抽出するためにMarengoのタスクIDを保存できます。後ほどWeaviateデータベースにデータを入力する際に使用できるよう、タスクIDをmarengo_task_idsに保持します。
single_video_task_id = task.id marengo_task_ids = {} single_video_file_name = single_video_file.split("/")[-1] marengo_task_ids[single_video_file_name] = single_video_task_id
RAG用ビデオセグメントの準備
効率的なRAGパイプラインを構築するために、データベース内でPegasusビデオIDをMarengoタスクIDと関連付けます。これにより、ベクトル検索で対応するセグメントが返された際に、そのビデオセグメントについてチャットできるようになります。これを実行するために、各ビデオセグメントをインデックス構築用のPegasusにアップロードする必要があります。
まず、Pegasusにアップロードするための、ビデオを10秒に分割するsplit_video関数を作成します。同時に、各セグメントがPegasusでの最低要件である4秒を超えていることを保証する必要があります。最後のクリップが5秒未満の場合は、最後から2番目のクリップと一部を重複させることで、これを実現します。
import os import subprocess import json def split_video(input_path, output_dir, segment_duration=10): """ Split a video into segments of the specified duration. Regular segments will be exactly segment_duration seconds. The last segment will be at least 5 seconds long, potentially overlapping with the previous segment if needed. Args: input_path: Path to the input video file output_dir: Directory to save the output segments segment_duration: Duration of each segment in seconds (default: 10) """ # Minimum length for the last segment min_last_segment_len = 5 # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Get base filename without extension base_name = os.path.splitext(os.path.basename(input_path))[0] # Get video duration using ffprobe probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", input_path ] try: probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) video_info = json.loads(probe_result.stdout) duration = float(video_info["format"]["duration"]) except Exception as e: print(f"Error getting video duration: {e}") return 0 # Calculate number of full segments num_full_segments = int(duration / segment_duration) # Calculate remaining duration remaining_duration = duration - (num_full_segments * segment_duration) # Determine total number of segments and if we need to adjust the last segment if remaining_duration > 0: if remaining_duration < min_last_segment_len: # Last segment would be too short, so we'll adjust its start time num_segments = num_full_segments + 1 needs_adjustment = True else: # Last segment is already long enough num_segments = num_full_segments + 1 needs_adjustment = False else: # No remaining duration, all segments are complete num_segments = num_full_segments needs_adjustment = False print(f"Video {base_name} is {duration:.2f} seconds long") print(f"Creating {num_segments} segments") # Create each segment for i in range(num_segments): # For regular segments, start at the segment boundary if i < num_full_segments: start_time = i * segment_duration actual_duration = segment_duration else: # This is the last segment if needs_adjustment: # Start earlier to ensure it's at least min_last_segment_len seconds start_time = duration - min_last_segment_len actual_duration = min_last_segment_len else: # Last segment is already long enough start_time = i * segment_duration actual_duration = remaining_duration output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4") # For all segments, use copy mode for speed cmd = [ "ffmpeg", "-y", "-ss", str(start_time), "-i", input_path, "-t", str(actual_duration), "-c:v", "copy", "-c:a", "copy", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error creating segment {i+1}: {result.stderr[:100]}...") else: end_time = start_time + actual_duration if i == num_segments - 1 and needs_adjustment: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)") else: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s") print(f"Successfully split {base_name} into {num_segments} segments") return num_segments
新しく作成したvideo_segments_dirにビデオを保存します。
split_video(single_video_file, video_segments_dir,segment_length)
次に、pegasus_video_ids(ファイル名とPegasusビデオIDを関連付ける辞書)を作成し、元のビデオ全体のビデオIDを追加します。
pegasus_video_ids = {} fname = single_video_file.split("/")[-1] pegasus_video_ids[fname] = single_video_id
続いて、分割したビデオセグメントをPegasusにアップロードし、各ビデオIDをpegasus_video_idsに割り当てます。
segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))] # Process each video for segment_video_file in segment_video_files: if segment_video_file in pegasus_video_ids: print("skip file",segment_video_file) continue print("processing file",segment_video_file) try: video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file) pegasus_video_ids[segment_video_file] = video_id except: print("error",segment_video_file) continue
最後に、LLaVa-NeXT-Videoモデルで効率的に処理できるよう、すべてのビデオをサンプリングします。
sampled_video_files = {} for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video for video_file in os.listdir(upscaled_video_dir): print(video_file) sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
Weaviateへの埋め込みデータのアップロード
Weaviateは、コレクションに登録する際にメタデータレコードと埋め込みベクトルが区別されていることを想定しています。MarengoタスクIDとPegasusビデオIDを取得し、アップロード用のrecords(レコード配列)とvectors(ベクトル配列)を準備するためのprepare_marengo_embeddings_for_weaviate関数を作成します。
def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids): # Prepare data for Weaviate upload records = [] vectors = [] for video_file_name in marengo_task_ids.keys(): marengo_task_id = marengo_task_ids[video_file_name] # Retrieve marengo full video and clip embeddings marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id) #track segment number to match with fiel segment_number = 0 for segment in marengo_embeddings_result.video_embedding.segments: # Determine if this is a video or clip segment is_video = segment.embedding_scope == "video" #Update the file name if segment updated_file_name = video_file_name if not is_video: updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4") segment_number += 1 video_name = video_file_name.replace(".mp4","") pegasus_video_id = None if updated_file_name in pegasus_video_ids: pegasus_video_id = pegasus_video_ids[updated_file_name] record = { 'video_name':video_name, 'segment_number': 0 if is_video else segment_number, 'video_file': updated_file_name, 'start_time': getattr(segment, 'start_offset_sec', 0), 'end_time': getattr(segment, 'end_offset_sec', 0), 'type': 'video' if is_video else 'clip', 'task_id': marengo_task_id, 'pegasus_video_id': pegasus_video_id } # Get the embedding vector embedding_vector = [float(x) for x in segment.embeddings_float] # Add to our lists records.append(record) vectors.append(embedding_vector) # Print summary print(f"Prepared {len(records)} segments for upload to Weaviate") print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}") print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}") return records, vectors
作成した関数を使用して、Weaviateにアップロードするレコードとベクトルを取得します。
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
ベクトル検索のテスト
すべてのデータがコレクションに揃ったので、Weaviateでのベクトル検索が正しいビデオを返すかテストできます。Weaviateのnear_vector検索を使用しているため、ビデオのベクトルで検索した場合は、同一オブジェクトとして距離が「0」として返出されます。
今回はコレクションから5番目のベクトル(インデックス5)を取り出して検索します。これにより、対応するビデオセグメントが距離「0」で返されます。
from weaviate.classes.query import MetadataQuery, Filter # Use a specific vector for the query query_vector = vectors[5] # Perform vector search response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), ) print(f"Found {len(response.objects)} results for vector search") for obj in response.objects: print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}") if 'segment_id' in obj.properties: print(f"Segment: {obj.properties['segment_id']}") if 'text' in obj.properties and obj.properties['text']: print(f"Text: {obj.properties['text']}") print(f"Distance: {obj.metadata.distance}") print("-" * 50)
この出力結果は、埋め込みが正常に保存され、正しく検索可能であることを示しています。
RAG用の関連性の高いビデオセグメントの取得
RAGパイプラインの核となるのは、ユーザーの質問を最も関連性の高いビデオセグメントと一致させる能力です。このプロセスは3つの重要な手順で機能します:
TwelveLabsのMarengoモデルを使用して、ユーザーのテキストクエリをベクトル埋め込みに変換します。
Weaviateをスキャンし、クエリ埋め込みに最も類似しているビデオセグメントの埋め込みを特定します。
最も関連性の高いビデオセグメントが特定されたら、それに関連付けられたPegasusのビデオIDを使用して、そのセグメントに特化した正確な回答を生成します。
この対象を絞った処理方法により、ビデオコンテンツの最も関連性の高い部分だけを処理させ、効率と応答品質を両立して大幅に向上させます。
まず、Marengoを使用してテキストクエリを埋め込みベクトルに変換します。
sample_question = "What technique did David Tyree use to catch the ball?" embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=sample_question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float
次に、最も関連性の高いクリップを特定します。filters=(Filter.by_property("type").equal("clip"))を使用して、ビデオ全体の埋め込みを無視し、クリップの埋め込みのみが返されるようにします。
response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) video_file = response.objects[0].properties.get("video_file") print(video_file)
検索結果により、4番目のクリップ(インデックス3)であるfootball_480_segment_003.mp4が返されたことが確認できます。
抽出されたクリップを確認してみましょう:
import matplotlib.pyplot as plt from matplotlib import animation from IPython.display import HTML video_file = response.objects[0].properties.get("video_file") video = sampled_video_files[video_file] fig = plt.figure() im = plt.imshow(video[0,:,:,:]) plt.close() # this is required to not display the generated image def init(): im.set_data(video[0,:,:,:]) def animate(i): im.set_data(video[i,:,:,:]) return im anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0], interval=100) HTML(anim.to_html5_video())
この方法によって、ヘルメットキャッチが行われたまさにその瞬間が正確に特定されています。

適切なセグメントが抽出された状態となったため、PegasusとLLaVa-NeXT-Videoがこの短いクリップに対してどのような挙動を示すかを確認します。
ビデオセグメントとの対話:Pegasus 対 LLaVa-NeTX-Video
最初にPegasusがどのように応答するかを見てみましょう。
pegasus_video_id = response.objects[0].properties.get("pegasus_video_id") print(sample_question) res = twelve_labs_client.generate.text( video_id=pegasus_video_id, prompt=sample_question ) print(f"{res.data}")
What technique did David Tyree use to catch the ball?
デビッド・タイリーは、ボールをヘルメットに押し付けてキャッチを確実にする技術を使用しました。これは、ニューヨーク・ジャイアンツがポゼッションを維持し、攻撃を継続することを可能にした極めて重要なプレーでした。
Pegasusが見事な回答を提供しました。ヘルメットキャッチのプレーと、それがジャイアンツにとって重要な推進力となった点に言及できています。
次に、LLaVa-NeTX-Videoが同じセグメントをスキャンした際に、より洗練された回答を返せるか確認します。
video_file = response.objects[0].properties.get("video_file") sampled_video = sampled_video_files[video_file] generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video) print(generated_text)
USER: What technique did David Tyree use to catch the ball? ASSISTANT: キャッチに成功した選手は、両手で頭上に持ってくるキャッチング技術を使用しました。この技術は、ディフェンス陣に囲まれる中で確実にボールを手元に引き込むためのものです。意図した軌道からボールが偏向した場合にはリスクの高い技術ですが、混戦の中で空中のボールを鷲掴みするのに非常に効果的な方法です。デビッド・タイリーは...
頭上でのハンドキャッチという点に言及するものの、ヘルメットキャッチに成功したという詳細(最も特徴的な要素)を認識できていません。さらに、後半で同じ結論を繰り返し饒舌になっています。
6 - Marengo、Weaviate、およびPegasusを使用した複数ビデオRAG
単一のビデオから個々のクリップに対してMarengoの埋め込みがどのように動作するかを理解した上で、より実用的なRAGユースケースに向けて、複数のビデオにまたがる埋め込みを使用する方法を示します。
すべてのビデオについてMarengo埋め込みを取得する
まず、すべてのビデオ用のMarengoタスクIDを用いて、marengo_task_idsの辞書を更新します。
for video_file_name in os.listdir(upscaled_video_dir): if video_file_name in marengo_task_ids: print(f"skipping {video_file_name} because embeddings already exist") continue print(f"processing {video_file_name}") file_path = os.path.join(upscaled_video_dir, video_file_name) task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=file_path, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") marengo_task_ids[video_file_name] = task.id
残りのビデオをセグメントに分割する
残りのビデオを以前と同様にいくつかの短いセグメントに分割します。
# Create output folder if it doesn't exist os.makedirs(upscaled_video_dir, exist_ok=True) # Get all video files video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))] # Process each video for video_file in video_files: split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)
すべてのビデオおよびそのセグメントに対してPegasusビデオIDを取得する
次に、残りのセグメントと元動画すべてに対するPegasusビデオIDを抽出します。時間を節約するために、この処理を並行して実行します。
import concurrent.futures import os from tqdm import tqdm # Use standard tqdm instead of tqdm.notebook def process_video(video_path): video_file_name = video_path.split("/")[-1] try: video_id = upload_video_to_twelve_labs_pegasus(video_path) return video_file_name, video_id except Exception as e: print(f"Error processing {video_file_name}: {str(e)}") return video_file_name, None # Filter out videos that are already processed segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')] full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')] all_video_files = segment_video_files + full_video_files videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids] print(f"Processing {len(videos_to_process)} videos in parallel...") # Use ThreadPoolExecutor for I/O-bound operations like API calls with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks and create a dictionary mapping futures to their video files future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process} # Process results as they complete with a progress bar for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)): video_file_name, video_id = future.result() if video_id: pegasus_video_ids[video_file_name] = video_id print("All videos processed!") print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")
Weaviateへのデータアップロード
残りの埋め込みデータをWeaviateにアップロードします。
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): if record["pegasus_video_id"] is None: continue batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
RAGパフォーマンス評価:クリップ 対 フルビデオ
Marengoの埋め込みとPegasusのビデオIDがWeaviateに正常にインデックス化されたので、このRAGシステムの有効性を評価します。この評価は以下の2つの重要な側面に焦点を当てます:
回答品質:クリップレベルの検索とビデオ全体の検索の違いによって、質問に対するシステムの応答の正確性はどのように変化するか?
処理効率:応答時間および計算リソースの消費量に関してどのような性能差異があるか?
この比較により、これら違いを定量的に測定可能にし、特により長尺なビデオや、ビデオ内の特定の瞬間を提示する複雑な質問に対して、最も適切な箇所だけに対象を動的に絞ることで、RAGが処理効率と品質を同時に向上させる仕組みを実証します。
まず、複数の異なるスポーツジャンルをカバーし、かつビデオに含まれる特定のイベントを解釈することを必要とする、様々なテスト用質問セットを定義します。
video_questions = [ "In the American Football Video, what are the teams playing?", "What technique does David Tyree use to catch the ball?", "In the tennis match video, who is playing?", "What foot does Messi shoot at the goal with?", "When does Keri Strug hurt her foot?" ]
PegasusによるマルチビデオRAG
最初に、フルビデオを取得して質問した際の回答動作を測定します:
from weaviate.classes.query import MetadataQuery, Filter import time # Perform query using the whole video dataset pegasus_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_full_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 72 seconds
次に、これを10秒のセグメントクリップでの実行と比較します。
pegasus_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_clip_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 20 seconds
抽出したセグメントクリップとフルビデオそれぞれの検索により生成された回答を比較します。
for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
双方から十分に正確で一貫性の高い回答が得られたことが分かります。しかし、クリップを用いたアプローチがわずか20秒で完了したのに対して、ビデオ全体を毎回処理するアプローチは完了に72秒を要しました。
LLaVa-NeXT-VideoによるマルチビデオRAG
次に、LLaVa-NeXT-Videoモデルを使用して同様のテストを行います。ただし、まずは事前にすべてのビデオファイルをサンプリングする必要があります。
for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
フルビデオの読み込みによる質問処理から開始します。
llava_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_full_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
次に、これをセグメントクリップと比較します。
from weaviate.classes.query import MetadataQuery import time llava_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_clip_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
各実行で全く同等の時間を要したことが分かります。これは、ビデオの秒数に関係なく、各項目から常に均等に40枚のフレームだけを抽出して評価するためです。
次に、LLaVa-NeXT-Videoが抽出したクリップとフルビデオに対してそれぞれ生成した返答内容を調査します。
for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
今回のケースでは、セグメントクリップを処理した際に、LLaVa-NeXT-Videoは5問中2問で正しい解を導き出しました:
最初の質問において、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツが対戦中である点を認識できており、また
3番目の質問において、テニスの試合がロジャー・フェデラーとノバク・ジョコビッチの間で行われている点を正確に見分けています。
7 - まとめ:TwelveLabsとWeaviateによる効率的なビデオ理解のためのRAGの活用
ビデオ処理における検索拡張生成(RAG)の調査は、効率と正確性の両面で大きな利点を示しました。TwelveLabsの高度なビデオ理解機能とWeaviateの強力なベクトルデータベースを組み合わせることで、ビデオ全体ではなく、最も関連性の高いビデオセグメントのみを論理的に処理するシステムを構築しました。
主な知見
パフォーマンスの改善:Weaviateを統合したRAGシステムにTwelveLabsのPegasusを適応すると、元ビデオを毎回全て読み込ませる代わりに、抽出されたクリップだけをクエリ処理することで、レスポンス速度を劇的に改善できました。
正確性の強化:LLaVa-NeXT-Videoなどのオープンソースモデルにおいて、注目箇所をあらかじめ切り分けたクリップセグメントに限定することで、ビデオ全体の処理に比べて回答の正確さが大きく向上することが示されました。
スケーラビリティを備えたアーキテクチャ:このRAGモデルの検証は、TwelveLabsの埋め込みモデル(Marengo)とWeaviateの高性能ベクトルデータベースの連携が、いかに強力な処理性能を提供するかを示しています。高次元の埋め込みデータを低遅延で検索して保存できるWeaviateの技術は、ビデオ理解などの複雑なタスクを実用的なアプリケーションに落とし込むための極めて重要なコアコンポーネントです。
ユースケース
TwelveLabsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせることで、多数の産業で強力なアプリケーションの構築が可能になります:
メディア&エンターテインメント:クリエイターは大規模なアーカイブから目的のシーンを瞬時に検索可能になり、編集作業の迅速化やSNS向け動画クリップの自動生成、アーカイブの効率的な再活用などが可能になります。
スポーツアナリティクス:コーチやアナリストは、ビデオを手動で早送り・巻き戻しして探すことなく、特定のアクション(例:『相手チームの裏に投げる短いパスのシーン』)をただテキストで定義するだけで、即座に関連クリップを全試合データから呼び出せるようになります。
小売・Eコマース:小売事業者は自社のチュートリアル動画を、ただ受動的に見せる一方向のコンテンツから、双方向で質問に答えるインタラクティブなショッピング体験にシフトさせることができます。例えば、顧客が『ストラップの調整方法を教えて』『バックパックに入れたときのサイズ感を見せて』と質問した際、説明動画の該当部分のみを自動で瞬時にカットして再生表示するといったことが容易に実現します。
TwelveLabsとWeaviateの組み合わせにより、膨大なビデオデータを扱う環境でも、情報抽出の効率化と高度な対話システムを同時に満たす、強力なVideo RAG基盤を構築できます。
草稿を確認してくれたWeaviateチームの Tuana Celik と Erika Cardenas に深く感謝します!
ビデオ処理は、特に長尺コンテンツを分析する場合、計算コストが高く時間がかかります。検索拡張生成(RAG)は、システムがビデオ全体ではなく最も関連性の高いビデオセグメントのみを処理できるようにすることで、この課題を解決します。この的を絞ったアプローチにより、応答の品質を維持または向上させながら、処理時間を大幅に削減できます。
この記事では、Twelve Labsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせて、ビデオコンテンツ用の効率的なRAGシステムを構築する方法を探ります。ビデオをセグメント化し、埋め込みを使用して分析に最も関連する部分のみを検索することで、精度を維持または向上させながら処理時間を大幅に改善できます。
私たちのアプローチは、いくつかの主要技術を活用しています:
ビデオの理解と埋め込み生成のためのTwelveLabs PegasusおよびMarengoモデル
ビデオセグメントを効率的に保存・収集するためのWeaviateベクトルデータベース
ビデオ分析の比較対象としてのオープンソースの LLaVA-NeXT-Video モデル
このRAGベースのアプローチにより、最も関連性の高いセグメントのみに焦点を当てることで、ビデオ処理の計算負荷を軽減し、より長いビデオを効率的に分析できるようになることを実証します。コンテンツモデレーション、スポーツ分析、または教育コンテンツ向けのアプリケーションのいずれを構築している場合でも、このアプローチは高品質な結果を維持しながらビデオ処理能力を拡張するのに役立ちます。
1 - TwelveLabsとWeaviateのセットアップ
TwelveLabs
まだTwelve Labsに登録していない場合は、こちらから登録できます。アカウントをセットアップしたら、Playgroundに移動し、画面右上隅のユーザーアイコンをクリックしてAPI Keyにアクセスします。
お使いのノートブックの左側にある鍵のアイコンをクリックし、この値をTL_API_KEYとしてシークレットを作成します。
Weaviate
Weaviateのアカウントをお持ちでない場合は、こちらからサインアップできます。アカウント作成後、クラウドダッシュボードに移動し、新しいクラスターを作成します。クラスターのセットアップが完了したら、ノートブックのシークレットセクションに2つの値を設定する必要があります。
REST Endpointの下にあるURLをWEAVIATE_URL変数として追加します。API Keysの下にあるAdminキーをコピーし、WEAVIATE_API_KEYに保存します。
2 - GPUランタイムの選択
LLaVA-NeXT-Videoモデルを実行するにはGPUが必要です。ノートブックでランタイム > ランタイムのタイプを変更に移動し、T4 GPUを選択します。
3 - 環境のセットアップ
依存関係のインストール
まず、TwelveLabsとWeaviateのSDKをインストールする必要があります:
!python -m pip install -U -q twelvelabs !python -m pip install -U -q "weaviate-client>=4.0.0"
次に、残りの依存関係をインストールします。
!python -m pip install torch !python -m pip install -q av !python -m pip install --upgrade -q accelerate !python -m pip install -U bitsandbytes !python -m pip install git
!python -m pip install pillow !python -m pip install sentencepiece !python -m
TwelveLabsおよびWeaviate SDKのセットアップ
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') weaviate_url = userdata.get("WEAVIATE_URL") weaviate_api_key = userdata.get("WEAVIATE_API_KEY")
次に、TwelveLabsクライアントを初期化します。
from twelvelabs import TwelveLabs # Initialize the Twelve Labs client twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)
最後に、Weaviateクライアントをセットアップし、Video_Embeddingsコレクションを初期化します。
import weaviate from weaviate.classes.init import Auth # Connect to Weaviate Cloud weaviate_client = weaviate.connect_to_weaviate_cloud( cluster_url=weaviate_url, auth_credentials=Auth.api_key(weaviate_api_key), ) # Get or create collection try: collection = weaviate_client.collections.get("Video_Embeddings") except: collection = weaviate_client.collections.create(name="Video_Embeddings")
ビデオデータのセットアップ
次に、埋め込み用のビデオデータを取得する必要があります。このリンクを使用して、Googleドライブのフォルダ内でビデオデータを見つけることができます。基本のGoogleドライブフォルダの中に 「TwelveLabs-Weaviate」 という名前のフォルダを作成し、そこへコピーします。以下のセルを使用してドライブをマウントし、ノートブックからビデオファイルにアクセスできるようにします。
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate" raw_video_dir = base_folder_path + "/sports_videos" upscaled_video_dir = base_folder_path + "/upscaled_videos/" video_segments_dir = base_folder_path + "/video_segments/"
ビデオのアップスケーリング
いくつかのビデオは、解像度が低すぎて埋め込みモデルで使用できません。これらを使用する前にアップスケーリングする必要があります。
ここでアップスケーリング関数を作成します。read_video_pyavは、LLaVa-NeXT-VideoのColabノートブックから直接引用したもので、推論に適した正しいnumpy表現にビデオをフォーマットします。
import av import numpy as np def upscale_video(input_file, output_file, target_width=1280, target_height=720): input_container = av.open(input_file) output_container = av.open(output_file, mode='w') input_stream = input_container.streams.video[0] output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate) output_stream.width = target_width output_stream.height = target_height output_stream.pix_fmt = 'yuv420p' for frame in input_container.decode(input_stream): frame = frame.reformat(width=target_width, height=target_height) packet = output_stream.encode(frame) output_container.mux(packet) # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the containers input_container.close() output_container.close() def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame) return np.stack([x.to_ndarray(format="rgb24") for x in frames])
raw_video_dirフォルダ内のビデオを取得し、アップスケーリングしてupscaled_video_dirフォルダに保存します。
# Create output directory if it doesn't exist if not os.path.exists(upscaled_video_dir): os.makedirs(upscaled_video_dir) # Iterate over all files in the raw video directory for filename in os.listdir(raw_video_dir): # Check if the file is a video file if filename.endswith(".mp4"): print(filename) # Get the file name without extension input_file_no_ext = os.path.splitext(filename)[0] # Define the output file name output_file = f"{input_file_no_ext}_480.mp4" if output_file in os.listdir(upscaled_video_dir): continue # Define the full path for the input and output files input_file_path = os.path.join(raw_video_dir, filename) output_file_path = os.path.join(upscaled_video_dir, output_file) # Upscale the video upscale_video(input_file_path, output_file_path)
4 - 単一ビデオによるPegasusとLLaVa-NeXT-Videoの比較
Pegasus と LLaVa-NeXT-Video は、どちらもビデオを取り込んでそれについて質問できるビデオ理解モデルです。
まず、ビデオコレクションから切り倒した単一のビデオでPegasusとLLaVa-NeXT-Videoを比較することから始めます。ビデオは、ニューヨーク・ジャイアンツがニューイングランド・ペイトリオッツと対戦する、第42回スーパーボウルのシーケンスを示しています。これは「ヘルメットキャッチ」と呼ばれる有名なレセプションで、ジャイアンツのクォーターバックであるイーライ・マニングがパスを投げ、これを受けたジャイアンツのワイドレシーバー、デビッド・タイリーが試合の残り2分で自身のヘルメットにボールを押し付けながらキャッチを見事に成功させたものです。
ビデオの背景を把握したところで、「このビデオで何が起きていますか?」と質問した際に、2つのモデルがビデオをどれだけ理解できるかを判定します。
Pegasusを使用してビデオと対話する
始める前に、ビデオをロードするためのPegasusインデックスをセットアップする必要があります。
models = [ { "name": "pegasus1.2", "options": ["visual"] } ] index_name = "sports_videos" indices_list = twelve_labs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelve_labs_client.index.create( name=index_name, models=models ) print(f"A new index has been created: id={index.id} name={index.name} models={index.models}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} models={index.models}")
次に、インデックスにビデオをアップロードする関数を作成します。これにより、ビデオについて質問するために使用できるPegasusのビデオIDが返されます。
# Monitor the status of the video task def on_task_update(task): print(f" Status={task.status}") def upload_video_to_twelve_labs_pegasus(video_path): task = twelve_labs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifer of your video is {task.video_id}.") return task.video_id
ビデオをアップロードし、PegasusビデオIDをsingle_video_idに保存します。
# Define the video file path single_video_file = upscaled_video_dir + "football_480.mp4" single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)
Pegasusがビデオを本当に理解しているかを確認するために、「このビデオで何が起きていますか?簡潔に答えてください。」と質問します。
single_video_query = "What is going on in this video? Please be concise." res = twelve_labs_client.generate.text( video_id=single_video_id, prompt=single_video_query ) print(f"{res.data}")
Pegasusから次の返答が届きました:
ビデオは、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツのアメリカンフットボールの試合における極めて重要な瞬間を紹介しています。ジャイアンツのクォーターバックであるイーライ・マニングが投げたパスを、デビッド・タイリーがアウト・オブ・バウンズに倒れ込みながら、ボールをヘルメットに固定して見事なキャッチを決めました。複数のアングルがそのキャッチをリプレイし、その難しさと正確さを強調しています。プレイ後、タイリーは短く喜びを祝い、ビデオは彼と他のチームメイトがフィールドから立ち去るシーンで終了します。
この応答から、Pegasusがビデオを深く理解していることが分かります。これがジャイアンツとペイトリオッツのフットボールの試合であることを把握しています。また、イーライ・マニングがボールを投げ、デビッド・タイリーがそれを受け止めた、試合の勝負を分ける瞬間であることも理解しています。
Pegasusからはこれがスーパーボウルであることは言及されていなかったので、確認のためにさらに質問してみます。
res = twelve_labs_client.generate.text( video_id=single_video_id, prompt="What game is this?" ) print(f"{res.data}")
Pegasusはこれは、第42回スーパーボウルの試合です。と答えました。これは正しい回答です。
次に、LLaVa-NeXT-Videoがどの程度ビデオを解釈できるかを見てみましょう。
LLaVa-NeXT-Videoを使用してビデオと対話する
LLaVa-NeXT-Videoでは、推論の前に特定の形式でビデオデータを準備する必要があります。モデルはビデオストリーム全体を一度に処理できないため、ビデオ全体から均一にフレームを抽出します。ここでは、各ビデオから40枚の均等に分散されたフレームを抽出するサンプリング関数を作成し、コンテンツ全体の重要な瞬間を逃さず捉えます。このサンプリング手法はLLaVA-NeXT-Videoの公式実装に準拠しています。サンプリング後、Hugging Face Hubからモデルをロードし、モデルの要件に適合するように入力を構成したうえで、質問に対する応答を生成するための推論を実行します。
def sample_video(video_path, num_samples=8): container = av.open(video_path) # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames sampled_video = sample_video(single_video_file, num_samples=40)
ビデオのサンプリングが適切に行われたら、モデルをセットアップします。
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
次に、モデルに対してクエリを実行する関数を作成します。
def query_llava_next(query,model,processor,sampled_video): # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) # prompt_len = len(prompt) inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) return generated_text[0]
最後に、同様の質問をしてPegasusとの出力を比較します。
llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
次のような回答が返信されました:
USER: このビデオで何が起きていますか?簡潔に。 ASSISTANT: 進行中のフットボールの試合が映っており、フィールドには複数のプレイヤーがいます。一部の選手が着用しているジャージの番号や古いスタイルのヘルメットから判断すると、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツの間の第3回スーパーボウルであるようです。1人の選手が空中姿勢でボールをキャッチしようとしており、他の選手にタックルされかけています。また、レフェリーはファーストダウンのシグナルを送っています。コーチ陣やその他の試合関係者も...
このモデルはジャイアンツとペイトリオッツのフットボールの試合であることを認識していますが、試合を第3回スーパーボウルと誤認しています。また、ビデオの最も重要な場面、すなわちヘルメットキャッチの箇所を明確に識別できていません。
先ほどと同じ「これはどの試合ですか?」という質問をしてみます。わずかに正確な内容に近づいているものの、正解には至っていません。
llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
USER: what game is this? ASSISTANT: 提供された画像は、現在進行中のフットボールの試合、特に第41回スーパーボウルの一部です。これはニューイングランド・ペイトリオッツ対ニューヨーク・ジャイアンツの試合です。画像に写っているプレイヤーはジャイアンツとペイトリオッツの選手です。
5 - 単一ビデオにおけるセグメントレベル・クエリのためのRAG
比較結果により、Pegasusはビデオ全体を分析する際により優れた処理を提供し、処理時間を短縮しつつ、より正確で一貫性のある回答を返すことが分かりました。
しかしながら、モデルの注目範囲を最も関連性の高いビデオセグメントのみに絞り込むことで、さらにパフォーマンスを向上させる余地があります。これこそが検索拡張生成(RAG)の有用性が発揮される理由です。ビデオ全体を処理する代わりに、特定のクエリに関連する情報が格納されたセグメントのみを特定して分析することが可能となります。
このアプローチを実装するために、ビデオセグメントのセマンティック(意味的内容)を取得する高品質な埋め込みの作成を専門とする、TwelveLabsのMarengoモデルを活用します。これらの埋め込みによって以下が可能となります:
一度に、ビデオの各セグメントを個別にインデックス化する。
ユーザーのクエリに最も一致する関連セグメントを特定する。
特定の処理セグメントのみをビデオ理解モデルによって処理する。
まず、ビデオを各セグメントに分割し、Marengoモデルを使用してそれぞれに対して埋め込みを生成します。これらの埋め込みにより、RAGシステムの基盤が確立されます。
Marengoを使用して、フルビデオおよびビデオクリップの埋め込みを作成する
セグメントの長さをMarengoでサポートされる最大の長さである10秒に設定します。
# Define the video segment length segment_length = 10
次に、Marengoを使用してビデオの埋め込み処理を行います。注意:Marengoがビデオ全体の埋め込みと、ビデオ内の各10秒のクリップの埋め込みの両方を確実に返すようにするため、video_embedding_scopes=["clip", "video"]およびvideo_clip_length=segment_lengthを設定します。
task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=single_video_file, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}")
埋め込み処理が完了したら、必要に応じてそれらを抽出するためにMarengoのタスクIDを保存できます。後ほどWeaviateデータベースにデータを入力する際に使用できるよう、タスクIDをmarengo_task_idsに保持します。
single_video_task_id = task.id marengo_task_ids = {} single_video_file_name = single_video_file.split("/")[-1] marengo_task_ids[single_video_file_name] = single_video_task_id
RAG用ビデオセグメントの準備
効率的なRAGパイプラインを構築するために、データベース内でPegasusビデオIDをMarengoタスクIDと関連付けます。これにより、ベクトル検索で対応するセグメントが返された際に、そのビデオセグメントについてチャットできるようになります。これを実行するために、各ビデオセグメントをインデックス構築用のPegasusにアップロードする必要があります。
まず、Pegasusにアップロードするための、ビデオを10秒に分割するsplit_video関数を作成します。同時に、各セグメントがPegasusでの最低要件である4秒を超えていることを保証する必要があります。最後のクリップが5秒未満の場合は、最後から2番目のクリップと一部を重複させることで、これを実現します。
import os import subprocess import json def split_video(input_path, output_dir, segment_duration=10): """ Split a video into segments of the specified duration. Regular segments will be exactly segment_duration seconds. The last segment will be at least 5 seconds long, potentially overlapping with the previous segment if needed. Args: input_path: Path to the input video file output_dir: Directory to save the output segments segment_duration: Duration of each segment in seconds (default: 10) """ # Minimum length for the last segment min_last_segment_len = 5 # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Get base filename without extension base_name = os.path.splitext(os.path.basename(input_path))[0] # Get video duration using ffprobe probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", input_path ] try: probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) video_info = json.loads(probe_result.stdout) duration = float(video_info["format"]["duration"]) except Exception as e: print(f"Error getting video duration: {e}") return 0 # Calculate number of full segments num_full_segments = int(duration / segment_duration) # Calculate remaining duration remaining_duration = duration - (num_full_segments * segment_duration) # Determine total number of segments and if we need to adjust the last segment if remaining_duration > 0: if remaining_duration < min_last_segment_len: # Last segment would be too short, so we'll adjust its start time num_segments = num_full_segments + 1 needs_adjustment = True else: # Last segment is already long enough num_segments = num_full_segments + 1 needs_adjustment = False else: # No remaining duration, all segments are complete num_segments = num_full_segments needs_adjustment = False print(f"Video {base_name} is {duration:.2f} seconds long") print(f"Creating {num_segments} segments") # Create each segment for i in range(num_segments): # For regular segments, start at the segment boundary if i < num_full_segments: start_time = i * segment_duration actual_duration = segment_duration else: # This is the last segment if needs_adjustment: # Start earlier to ensure it's at least min_last_segment_len seconds start_time = duration - min_last_segment_len actual_duration = min_last_segment_len else: # Last segment is already long enough start_time = i * segment_duration actual_duration = remaining_duration output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4") # For all segments, use copy mode for speed cmd = [ "ffmpeg", "-y", "-ss", str(start_time), "-i", input_path, "-t", str(actual_duration), "-c:v", "copy", "-c:a", "copy", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error creating segment {i+1}: {result.stderr[:100]}...") else: end_time = start_time + actual_duration if i == num_segments - 1 and needs_adjustment: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)") else: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s") print(f"Successfully split {base_name} into {num_segments} segments") return num_segments
新しく作成したvideo_segments_dirにビデオを保存します。
split_video(single_video_file, video_segments_dir,segment_length)
次に、pegasus_video_ids(ファイル名とPegasusビデオIDを関連付ける辞書)を作成し、元のビデオ全体のビデオIDを追加します。
pegasus_video_ids = {} fname = single_video_file.split("/")[-1] pegasus_video_ids[fname] = single_video_id
続いて、分割したビデオセグメントをPegasusにアップロードし、各ビデオIDをpegasus_video_idsに割り当てます。
segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))] # Process each video for segment_video_file in segment_video_files: if segment_video_file in pegasus_video_ids: print("skip file",segment_video_file) continue print("processing file",segment_video_file) try: video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file) pegasus_video_ids[segment_video_file] = video_id except: print("error",segment_video_file) continue
最後に、LLaVa-NeXT-Videoモデルで効率的に処理できるよう、すべてのビデオをサンプリングします。
sampled_video_files = {} for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video for video_file in os.listdir(upscaled_video_dir): print(video_file) sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
Weaviateへの埋め込みデータのアップロード
Weaviateは、コレクションに登録する際にメタデータレコードと埋め込みベクトルが区別されていることを想定しています。MarengoタスクIDとPegasusビデオIDを取得し、アップロード用のrecords(レコード配列)とvectors(ベクトル配列)を準備するためのprepare_marengo_embeddings_for_weaviate関数を作成します。
def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids): # Prepare data for Weaviate upload records = [] vectors = [] for video_file_name in marengo_task_ids.keys(): marengo_task_id = marengo_task_ids[video_file_name] # Retrieve marengo full video and clip embeddings marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id) #track segment number to match with fiel segment_number = 0 for segment in marengo_embeddings_result.video_embedding.segments: # Determine if this is a video or clip segment is_video = segment.embedding_scope == "video" #Update the file name if segment updated_file_name = video_file_name if not is_video: updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4") segment_number += 1 video_name = video_file_name.replace(".mp4","") pegasus_video_id = None if updated_file_name in pegasus_video_ids: pegasus_video_id = pegasus_video_ids[updated_file_name] record = { 'video_name':video_name, 'segment_number': 0 if is_video else segment_number, 'video_file': updated_file_name, 'start_time': getattr(segment, 'start_offset_sec', 0), 'end_time': getattr(segment, 'end_offset_sec', 0), 'type': 'video' if is_video else 'clip', 'task_id': marengo_task_id, 'pegasus_video_id': pegasus_video_id } # Get the embedding vector embedding_vector = [float(x) for x in segment.embeddings_float] # Add to our lists records.append(record) vectors.append(embedding_vector) # Print summary print(f"Prepared {len(records)} segments for upload to Weaviate") print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}") print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}") return records, vectors
作成した関数を使用して、Weaviateにアップロードするレコードとベクトルを取得します。
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
ベクトル検索のテスト
すべてのデータがコレクションに揃ったので、Weaviateでのベクトル検索が正しいビデオを返すかテストできます。Weaviateのnear_vector検索を使用しているため、ビデオのベクトルで検索した場合は、同一オブジェクトとして距離が「0」として返出されます。
今回はコレクションから5番目のベクトル(インデックス5)を取り出して検索します。これにより、対応するビデオセグメントが距離「0」で返されます。
from weaviate.classes.query import MetadataQuery, Filter # Use a specific vector for the query query_vector = vectors[5] # Perform vector search response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), ) print(f"Found {len(response.objects)} results for vector search") for obj in response.objects: print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}") if 'segment_id' in obj.properties: print(f"Segment: {obj.properties['segment_id']}") if 'text' in obj.properties and obj.properties['text']: print(f"Text: {obj.properties['text']}") print(f"Distance: {obj.metadata.distance}") print("-" * 50)
この出力結果は、埋め込みが正常に保存され、正しく検索可能であることを示しています。
RAG用の関連性の高いビデオセグメントの取得
RAGパイプラインの核となるのは、ユーザーの質問を最も関連性の高いビデオセグメントと一致させる能力です。このプロセスは3つの重要な手順で機能します:
TwelveLabsのMarengoモデルを使用して、ユーザーのテキストクエリをベクトル埋め込みに変換します。
Weaviateをスキャンし、クエリ埋め込みに最も類似しているビデオセグメントの埋め込みを特定します。
最も関連性の高いビデオセグメントが特定されたら、それに関連付けられたPegasusのビデオIDを使用して、そのセグメントに特化した正確な回答を生成します。
この対象を絞った処理方法により、ビデオコンテンツの最も関連性の高い部分だけを処理させ、効率と応答品質を両立して大幅に向上させます。
まず、Marengoを使用してテキストクエリを埋め込みベクトルに変換します。
sample_question = "What technique did David Tyree use to catch the ball?" embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=sample_question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float
次に、最も関連性の高いクリップを特定します。filters=(Filter.by_property("type").equal("clip"))を使用して、ビデオ全体の埋め込みを無視し、クリップの埋め込みのみが返されるようにします。
response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) video_file = response.objects[0].properties.get("video_file") print(video_file)
検索結果により、4番目のクリップ(インデックス3)であるfootball_480_segment_003.mp4が返されたことが確認できます。
抽出されたクリップを確認してみましょう:
import matplotlib.pyplot as plt from matplotlib import animation from IPython.display import HTML video_file = response.objects[0].properties.get("video_file") video = sampled_video_files[video_file] fig = plt.figure() im = plt.imshow(video[0,:,:,:]) plt.close() # this is required to not display the generated image def init(): im.set_data(video[0,:,:,:]) def animate(i): im.set_data(video[i,:,:,:]) return im anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0], interval=100) HTML(anim.to_html5_video())
この方法によって、ヘルメットキャッチが行われたまさにその瞬間が正確に特定されています。

適切なセグメントが抽出された状態となったため、PegasusとLLaVa-NeXT-Videoがこの短いクリップに対してどのような挙動を示すかを確認します。
ビデオセグメントとの対話:Pegasus 対 LLaVa-NeTX-Video
最初にPegasusがどのように応答するかを見てみましょう。
pegasus_video_id = response.objects[0].properties.get("pegasus_video_id") print(sample_question) res = twelve_labs_client.generate.text( video_id=pegasus_video_id, prompt=sample_question ) print(f"{res.data}")
What technique did David Tyree use to catch the ball?
デビッド・タイリーは、ボールをヘルメットに押し付けてキャッチを確実にする技術を使用しました。これは、ニューヨーク・ジャイアンツがポゼッションを維持し、攻撃を継続することを可能にした極めて重要なプレーでした。
Pegasusが見事な回答を提供しました。ヘルメットキャッチのプレーと、それがジャイアンツにとって重要な推進力となった点に言及できています。
次に、LLaVa-NeTX-Videoが同じセグメントをスキャンした際に、より洗練された回答を返せるか確認します。
video_file = response.objects[0].properties.get("video_file") sampled_video = sampled_video_files[video_file] generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video) print(generated_text)
USER: What technique did David Tyree use to catch the ball? ASSISTANT: キャッチに成功した選手は、両手で頭上に持ってくるキャッチング技術を使用しました。この技術は、ディフェンス陣に囲まれる中で確実にボールを手元に引き込むためのものです。意図した軌道からボールが偏向した場合にはリスクの高い技術ですが、混戦の中で空中のボールを鷲掴みするのに非常に効果的な方法です。デビッド・タイリーは...
頭上でのハンドキャッチという点に言及するものの、ヘルメットキャッチに成功したという詳細(最も特徴的な要素)を認識できていません。さらに、後半で同じ結論を繰り返し饒舌になっています。
6 - Marengo、Weaviate、およびPegasusを使用した複数ビデオRAG
単一のビデオから個々のクリップに対してMarengoの埋め込みがどのように動作するかを理解した上で、より実用的なRAGユースケースに向けて、複数のビデオにまたがる埋め込みを使用する方法を示します。
すべてのビデオについてMarengo埋め込みを取得する
まず、すべてのビデオ用のMarengoタスクIDを用いて、marengo_task_idsの辞書を更新します。
for video_file_name in os.listdir(upscaled_video_dir): if video_file_name in marengo_task_ids: print(f"skipping {video_file_name} because embeddings already exist") continue print(f"processing {video_file_name}") file_path = os.path.join(upscaled_video_dir, video_file_name) task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=file_path, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") marengo_task_ids[video_file_name] = task.id
残りのビデオをセグメントに分割する
残りのビデオを以前と同様にいくつかの短いセグメントに分割します。
# Create output folder if it doesn't exist os.makedirs(upscaled_video_dir, exist_ok=True) # Get all video files video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))] # Process each video for video_file in video_files: split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)
すべてのビデオおよびそのセグメントに対してPegasusビデオIDを取得する
次に、残りのセグメントと元動画すべてに対するPegasusビデオIDを抽出します。時間を節約するために、この処理を並行して実行します。
import concurrent.futures import os from tqdm import tqdm # Use standard tqdm instead of tqdm.notebook def process_video(video_path): video_file_name = video_path.split("/")[-1] try: video_id = upload_video_to_twelve_labs_pegasus(video_path) return video_file_name, video_id except Exception as e: print(f"Error processing {video_file_name}: {str(e)}") return video_file_name, None # Filter out videos that are already processed segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')] full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')] all_video_files = segment_video_files + full_video_files videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids] print(f"Processing {len(videos_to_process)} videos in parallel...") # Use ThreadPoolExecutor for I/O-bound operations like API calls with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks and create a dictionary mapping futures to their video files future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process} # Process results as they complete with a progress bar for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)): video_file_name, video_id = future.result() if video_id: pegasus_video_ids[video_file_name] = video_id print("All videos processed!") print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")
Weaviateへのデータアップロード
残りの埋め込みデータをWeaviateにアップロードします。
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): if record["pegasus_video_id"] is None: continue batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
RAGパフォーマンス評価:クリップ 対 フルビデオ
Marengoの埋め込みとPegasusのビデオIDがWeaviateに正常にインデックス化されたので、このRAGシステムの有効性を評価します。この評価は以下の2つの重要な側面に焦点を当てます:
回答品質:クリップレベルの検索とビデオ全体の検索の違いによって、質問に対するシステムの応答の正確性はどのように変化するか?
処理効率:応答時間および計算リソースの消費量に関してどのような性能差異があるか?
この比較により、これら違いを定量的に測定可能にし、特により長尺なビデオや、ビデオ内の特定の瞬間を提示する複雑な質問に対して、最も適切な箇所だけに対象を動的に絞ることで、RAGが処理効率と品質を同時に向上させる仕組みを実証します。
まず、複数の異なるスポーツジャンルをカバーし、かつビデオに含まれる特定のイベントを解釈することを必要とする、様々なテスト用質問セットを定義します。
video_questions = [ "In the American Football Video, what are the teams playing?", "What technique does David Tyree use to catch the ball?", "In the tennis match video, who is playing?", "What foot does Messi shoot at the goal with?", "When does Keri Strug hurt her foot?" ]
PegasusによるマルチビデオRAG
最初に、フルビデオを取得して質問した際の回答動作を測定します:
from weaviate.classes.query import MetadataQuery, Filter import time # Perform query using the whole video dataset pegasus_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_full_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 72 seconds
次に、これを10秒のセグメントクリップでの実行と比較します。
pegasus_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_clip_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 20 seconds
抽出したセグメントクリップとフルビデオそれぞれの検索により生成された回答を比較します。
for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
双方から十分に正確で一貫性の高い回答が得られたことが分かります。しかし、クリップを用いたアプローチがわずか20秒で完了したのに対して、ビデオ全体を毎回処理するアプローチは完了に72秒を要しました。
LLaVa-NeXT-VideoによるマルチビデオRAG
次に、LLaVa-NeXT-Videoモデルを使用して同様のテストを行います。ただし、まずは事前にすべてのビデオファイルをサンプリングする必要があります。
for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
フルビデオの読み込みによる質問処理から開始します。
llava_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_full_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
次に、これをセグメントクリップと比較します。
from weaviate.classes.query import MetadataQuery import time llava_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_clip_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
各実行で全く同等の時間を要したことが分かります。これは、ビデオの秒数に関係なく、各項目から常に均等に40枚のフレームだけを抽出して評価するためです。
次に、LLaVa-NeXT-Videoが抽出したクリップとフルビデオに対してそれぞれ生成した返答内容を調査します。
for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
今回のケースでは、セグメントクリップを処理した際に、LLaVa-NeXT-Videoは5問中2問で正しい解を導き出しました:
最初の質問において、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツが対戦中である点を認識できており、また
3番目の質問において、テニスの試合がロジャー・フェデラーとノバク・ジョコビッチの間で行われている点を正確に見分けています。
7 - まとめ:TwelveLabsとWeaviateによる効率的なビデオ理解のためのRAGの活用
ビデオ処理における検索拡張生成(RAG)の調査は、効率と正確性の両面で大きな利点を示しました。TwelveLabsの高度なビデオ理解機能とWeaviateの強力なベクトルデータベースを組み合わせることで、ビデオ全体ではなく、最も関連性の高いビデオセグメントのみを論理的に処理するシステムを構築しました。
主な知見
パフォーマンスの改善:Weaviateを統合したRAGシステムにTwelveLabsのPegasusを適応すると、元ビデオを毎回全て読み込ませる代わりに、抽出されたクリップだけをクエリ処理することで、レスポンス速度を劇的に改善できました。
正確性の強化:LLaVa-NeXT-Videoなどのオープンソースモデルにおいて、注目箇所をあらかじめ切り分けたクリップセグメントに限定することで、ビデオ全体の処理に比べて回答の正確さが大きく向上することが示されました。
スケーラビリティを備えたアーキテクチャ:このRAGモデルの検証は、TwelveLabsの埋め込みモデル(Marengo)とWeaviateの高性能ベクトルデータベースの連携が、いかに強力な処理性能を提供するかを示しています。高次元の埋め込みデータを低遅延で検索して保存できるWeaviateの技術は、ビデオ理解などの複雑なタスクを実用的なアプリケーションに落とし込むための極めて重要なコアコンポーネントです。
ユースケース
TwelveLabsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせることで、多数の産業で強力なアプリケーションの構築が可能になります:
メディア&エンターテインメント:クリエイターは大規模なアーカイブから目的のシーンを瞬時に検索可能になり、編集作業の迅速化やSNS向け動画クリップの自動生成、アーカイブの効率的な再活用などが可能になります。
スポーツアナリティクス:コーチやアナリストは、ビデオを手動で早送り・巻き戻しして探すことなく、特定のアクション(例:『相手チームの裏に投げる短いパスのシーン』)をただテキストで定義するだけで、即座に関連クリップを全試合データから呼び出せるようになります。
小売・Eコマース:小売事業者は自社のチュートリアル動画を、ただ受動的に見せる一方向のコンテンツから、双方向で質問に答えるインタラクティブなショッピング体験にシフトさせることができます。例えば、顧客が『ストラップの調整方法を教えて』『バックパックに入れたときのサイズ感を見せて』と質問した際、説明動画の該当部分のみを自動で瞬時にカットして再生表示するといったことが容易に実現します。
TwelveLabsとWeaviateの組み合わせにより、膨大なビデオデータを扱う環境でも、情報抽出の効率化と高度な対話システムを同時に満たす、強力なVideo RAG基盤を構築できます。
草稿を確認してくれたWeaviateチームの Tuana Celik と Erika Cardenas に深く感謝します!
ビデオ処理は、特に長尺コンテンツを分析する場合、計算コストが高く時間がかかります。検索拡張生成(RAG)は、システムがビデオ全体ではなく最も関連性の高いビデオセグメントのみを処理できるようにすることで、この課題を解決します。この的を絞ったアプローチにより、応答の品質を維持または向上させながら、処理時間を大幅に削減できます。
この記事では、Twelve Labsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせて、ビデオコンテンツ用の効率的なRAGシステムを構築する方法を探ります。ビデオをセグメント化し、埋め込みを使用して分析に最も関連する部分のみを検索することで、精度を維持または向上させながら処理時間を大幅に改善できます。
私たちのアプローチは、いくつかの主要技術を活用しています:
ビデオの理解と埋め込み生成のためのTwelveLabs PegasusおよびMarengoモデル
ビデオセグメントを効率的に保存・収集するためのWeaviateベクトルデータベース
ビデオ分析の比較対象としてのオープンソースの LLaVA-NeXT-Video モデル
このRAGベースのアプローチにより、最も関連性の高いセグメントのみに焦点を当てることで、ビデオ処理の計算負荷を軽減し、より長いビデオを効率的に分析できるようになることを実証します。コンテンツモデレーション、スポーツ分析、または教育コンテンツ向けのアプリケーションのいずれを構築している場合でも、このアプローチは高品質な結果を維持しながらビデオ処理能力を拡張するのに役立ちます。
1 - TwelveLabsとWeaviateのセットアップ
TwelveLabs
まだTwelve Labsに登録していない場合は、こちらから登録できます。アカウントをセットアップしたら、Playgroundに移動し、画面右上隅のユーザーアイコンをクリックしてAPI Keyにアクセスします。
お使いのノートブックの左側にある鍵のアイコンをクリックし、この値をTL_API_KEYとしてシークレットを作成します。
Weaviate
Weaviateのアカウントをお持ちでない場合は、こちらからサインアップできます。アカウント作成後、クラウドダッシュボードに移動し、新しいクラスターを作成します。クラスターのセットアップが完了したら、ノートブックのシークレットセクションに2つの値を設定する必要があります。
REST Endpointの下にあるURLをWEAVIATE_URL変数として追加します。API Keysの下にあるAdminキーをコピーし、WEAVIATE_API_KEYに保存します。
2 - GPUランタイムの選択
LLaVA-NeXT-Videoモデルを実行するにはGPUが必要です。ノートブックでランタイム > ランタイムのタイプを変更に移動し、T4 GPUを選択します。
3 - 環境のセットアップ
依存関係のインストール
まず、TwelveLabsとWeaviateのSDKをインストールする必要があります:
!python -m pip install -U -q twelvelabs !python -m pip install -U -q "weaviate-client>=4.0.0"
次に、残りの依存関係をインストールします。
!python -m pip install torch !python -m pip install -q av !python -m pip install --upgrade -q accelerate !python -m pip install -U bitsandbytes !python -m pip install git
!python -m pip install pillow !python -m pip install sentencepiece !python -m
TwelveLabsおよびWeaviate SDKのセットアップ
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') weaviate_url = userdata.get("WEAVIATE_URL") weaviate_api_key = userdata.get("WEAVIATE_API_KEY")
次に、TwelveLabsクライアントを初期化します。
from twelvelabs import TwelveLabs # Initialize the Twelve Labs client twelve_labs_client = TwelveLabs(api_key=TL_API_KEY)
最後に、Weaviateクライアントをセットアップし、Video_Embeddingsコレクションを初期化します。
import weaviate from weaviate.classes.init import Auth # Connect to Weaviate Cloud weaviate_client = weaviate.connect_to_weaviate_cloud( cluster_url=weaviate_url, auth_credentials=Auth.api_key(weaviate_api_key), ) # Get or create collection try: collection = weaviate_client.collections.get("Video_Embeddings") except: collection = weaviate_client.collections.create(name="Video_Embeddings")
ビデオデータのセットアップ
次に、埋め込み用のビデオデータを取得する必要があります。このリンクを使用して、Googleドライブのフォルダ内でビデオデータを見つけることができます。基本のGoogleドライブフォルダの中に 「TwelveLabs-Weaviate」 という名前のフォルダを作成し、そこへコピーします。以下のセルを使用してドライブをマウントし、ノートブックからビデオファイルにアクセスできるようにします。
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Weaviate" raw_video_dir = base_folder_path + "/sports_videos" upscaled_video_dir = base_folder_path + "/upscaled_videos/" video_segments_dir = base_folder_path + "/video_segments/"
ビデオのアップスケーリング
いくつかのビデオは、解像度が低すぎて埋め込みモデルで使用できません。これらを使用する前にアップスケーリングする必要があります。
ここでアップスケーリング関数を作成します。read_video_pyavは、LLaVa-NeXT-VideoのColabノートブックから直接引用したもので、推論に適した正しいnumpy表現にビデオをフォーマットします。
import av import numpy as np def upscale_video(input_file, output_file, target_width=1280, target_height=720): input_container = av.open(input_file) output_container = av.open(output_file, mode='w') input_stream = input_container.streams.video[0] output_stream = output_container.add_stream('libx264', rate=input_stream.average_rate) output_stream.width = target_width output_stream.height = target_height output_stream.pix_fmt = 'yuv420p' for frame in input_container.decode(input_stream): frame = frame.reformat(width=target_width, height=target_height) packet = output_stream.encode(frame) output_container.mux(packet) # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the containers input_container.close() output_container.close() def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame) return np.stack([x.to_ndarray(format="rgb24") for x in frames])
raw_video_dirフォルダ内のビデオを取得し、アップスケーリングしてupscaled_video_dirフォルダに保存します。
# Create output directory if it doesn't exist if not os.path.exists(upscaled_video_dir): os.makedirs(upscaled_video_dir) # Iterate over all files in the raw video directory for filename in os.listdir(raw_video_dir): # Check if the file is a video file if filename.endswith(".mp4"): print(filename) # Get the file name without extension input_file_no_ext = os.path.splitext(filename)[0] # Define the output file name output_file = f"{input_file_no_ext}_480.mp4" if output_file in os.listdir(upscaled_video_dir): continue # Define the full path for the input and output files input_file_path = os.path.join(raw_video_dir, filename) output_file_path = os.path.join(upscaled_video_dir, output_file) # Upscale the video upscale_video(input_file_path, output_file_path)
4 - 単一ビデオによるPegasusとLLaVa-NeXT-Videoの比較
Pegasus と LLaVa-NeXT-Video は、どちらもビデオを取り込んでそれについて質問できるビデオ理解モデルです。
まず、ビデオコレクションから切り倒した単一のビデオでPegasusとLLaVa-NeXT-Videoを比較することから始めます。ビデオは、ニューヨーク・ジャイアンツがニューイングランド・ペイトリオッツと対戦する、第42回スーパーボウルのシーケンスを示しています。これは「ヘルメットキャッチ」と呼ばれる有名なレセプションで、ジャイアンツのクォーターバックであるイーライ・マニングがパスを投げ、これを受けたジャイアンツのワイドレシーバー、デビッド・タイリーが試合の残り2分で自身のヘルメットにボールを押し付けながらキャッチを見事に成功させたものです。
ビデオの背景を把握したところで、「このビデオで何が起きていますか?」と質問した際に、2つのモデルがビデオをどれだけ理解できるかを判定します。
Pegasusを使用してビデオと対話する
始める前に、ビデオをロードするためのPegasusインデックスをセットアップする必要があります。
models = [ { "name": "pegasus1.2", "options": ["visual"] } ] index_name = "sports_videos" indices_list = twelve_labs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelve_labs_client.index.create( name=index_name, models=models ) print(f"A new index has been created: id={index.id} name={index.name} models={index.models}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} models={index.models}")
次に、インデックスにビデオをアップロードする関数を作成します。これにより、ビデオについて質問するために使用できるPegasusのビデオIDが返されます。
# Monitor the status of the video task def on_task_update(task): print(f" Status={task.status}") def upload_video_to_twelve_labs_pegasus(video_path): task = twelve_labs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifer of your video is {task.video_id}.") return task.video_id
ビデオをアップロードし、PegasusビデオIDをsingle_video_idに保存します。
# Define the video file path single_video_file = upscaled_video_dir + "football_480.mp4" single_video_id = upload_video_to_twelve_labs_pegasus(single_video_file)
Pegasusがビデオを本当に理解しているかを確認するために、「このビデオで何が起きていますか?簡潔に答えてください。」と質問します。
single_video_query = "What is going on in this video? Please be concise." res = twelve_labs_client.generate.text( video_id=single_video_id, prompt=single_video_query ) print(f"{res.data}")
Pegasusから次の返答が届きました:
ビデオは、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツのアメリカンフットボールの試合における極めて重要な瞬間を紹介しています。ジャイアンツのクォーターバックであるイーライ・マニングが投げたパスを、デビッド・タイリーがアウト・オブ・バウンズに倒れ込みながら、ボールをヘルメットに固定して見事なキャッチを決めました。複数のアングルがそのキャッチをリプレイし、その難しさと正確さを強調しています。プレイ後、タイリーは短く喜びを祝い、ビデオは彼と他のチームメイトがフィールドから立ち去るシーンで終了します。
この応答から、Pegasusがビデオを深く理解していることが分かります。これがジャイアンツとペイトリオッツのフットボールの試合であることを把握しています。また、イーライ・マニングがボールを投げ、デビッド・タイリーがそれを受け止めた、試合の勝負を分ける瞬間であることも理解しています。
Pegasusからはこれがスーパーボウルであることは言及されていなかったので、確認のためにさらに質問してみます。
res = twelve_labs_client.generate.text( video_id=single_video_id, prompt="What game is this?" ) print(f"{res.data}")
Pegasusはこれは、第42回スーパーボウルの試合です。と答えました。これは正しい回答です。
次に、LLaVa-NeXT-Videoがどの程度ビデオを解釈できるかを見てみましょう。
LLaVa-NeXT-Videoを使用してビデオと対話する
LLaVa-NeXT-Videoでは、推論の前に特定の形式でビデオデータを準備する必要があります。モデルはビデオストリーム全体を一度に処理できないため、ビデオ全体から均一にフレームを抽出します。ここでは、各ビデオから40枚の均等に分散されたフレームを抽出するサンプリング関数を作成し、コンテンツ全体の重要な瞬間を逃さず捉えます。このサンプリング手法はLLaVA-NeXT-Videoの公式実装に準拠しています。サンプリング後、Hugging Face Hubからモデルをロードし、モデルの要件に適合するように入力を構成したうえで、質問に対する応答を生成するための推論を実行します。
def sample_video(video_path, num_samples=8): container = av.open(video_path) # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames sampled_video = sample_video(single_video_file, num_samples=40)
ビデオのサンプリングが適切に行われたら、モデルをセットアップします。
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) llava_next_processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") llava_next_model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
次に、モデルに対してクエリを実行する関数を作成します。
def query_llava_next(query,model,processor,sampled_video): # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) # prompt_len = len(prompt) inputs = processor([prompt], videos=[sampled_video], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) return generated_text[0]
最後に、同様の質問をしてPegasusとの出力を比較します。
llava_next_result = query_llava_next(single_video_query,llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
次のような回答が返信されました:
USER: このビデオで何が起きていますか?簡潔に。 ASSISTANT: 進行中のフットボールの試合が映っており、フィールドには複数のプレイヤーがいます。一部の選手が着用しているジャージの番号や古いスタイルのヘルメットから判断すると、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツの間の第3回スーパーボウルであるようです。1人の選手が空中姿勢でボールをキャッチしようとしており、他の選手にタックルされかけています。また、レフェリーはファーストダウンのシグナルを送っています。コーチ陣やその他の試合関係者も...
このモデルはジャイアンツとペイトリオッツのフットボールの試合であることを認識していますが、試合を第3回スーパーボウルと誤認しています。また、ビデオの最も重要な場面、すなわちヘルメットキャッチの箇所を明確に識別できていません。
先ほどと同じ「これはどの試合ですか?」という質問をしてみます。わずかに正確な内容に近づいているものの、正解には至っていません。
llava_next_result = query_llava_next("what game is this?",llava_next_model,llava_next_processor,sampled_video) print(llava_next_result)
USER: what game is this? ASSISTANT: 提供された画像は、現在進行中のフットボールの試合、特に第41回スーパーボウルの一部です。これはニューイングランド・ペイトリオッツ対ニューヨーク・ジャイアンツの試合です。画像に写っているプレイヤーはジャイアンツとペイトリオッツの選手です。
5 - 単一ビデオにおけるセグメントレベル・クエリのためのRAG
比較結果により、Pegasusはビデオ全体を分析する際により優れた処理を提供し、処理時間を短縮しつつ、より正確で一貫性のある回答を返すことが分かりました。
しかしながら、モデルの注目範囲を最も関連性の高いビデオセグメントのみに絞り込むことで、さらにパフォーマンスを向上させる余地があります。これこそが検索拡張生成(RAG)の有用性が発揮される理由です。ビデオ全体を処理する代わりに、特定のクエリに関連する情報が格納されたセグメントのみを特定して分析することが可能となります。
このアプローチを実装するために、ビデオセグメントのセマンティック(意味的内容)を取得する高品質な埋め込みの作成を専門とする、TwelveLabsのMarengoモデルを活用します。これらの埋め込みによって以下が可能となります:
一度に、ビデオの各セグメントを個別にインデックス化する。
ユーザーのクエリに最も一致する関連セグメントを特定する。
特定の処理セグメントのみをビデオ理解モデルによって処理する。
まず、ビデオを各セグメントに分割し、Marengoモデルを使用してそれぞれに対して埋め込みを生成します。これらの埋め込みにより、RAGシステムの基盤が確立されます。
Marengoを使用して、フルビデオおよびビデオクリップの埋め込みを作成する
セグメントの長さをMarengoでサポートされる最大の長さである10秒に設定します。
# Define the video segment length segment_length = 10
次に、Marengoを使用してビデオの埋め込み処理を行います。注意:Marengoがビデオ全体の埋め込みと、ビデオ内の各10秒のクリップの埋め込みの両方を確実に返すようにするため、video_embedding_scopes=["clip", "video"]およびvideo_clip_length=segment_lengthを設定します。
task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=single_video_file, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}")
埋め込み処理が完了したら、必要に応じてそれらを抽出するためにMarengoのタスクIDを保存できます。後ほどWeaviateデータベースにデータを入力する際に使用できるよう、タスクIDをmarengo_task_idsに保持します。
single_video_task_id = task.id marengo_task_ids = {} single_video_file_name = single_video_file.split("/")[-1] marengo_task_ids[single_video_file_name] = single_video_task_id
RAG用ビデオセグメントの準備
効率的なRAGパイプラインを構築するために、データベース内でPegasusビデオIDをMarengoタスクIDと関連付けます。これにより、ベクトル検索で対応するセグメントが返された際に、そのビデオセグメントについてチャットできるようになります。これを実行するために、各ビデオセグメントをインデックス構築用のPegasusにアップロードする必要があります。
まず、Pegasusにアップロードするための、ビデオを10秒に分割するsplit_video関数を作成します。同時に、各セグメントがPegasusでの最低要件である4秒を超えていることを保証する必要があります。最後のクリップが5秒未満の場合は、最後から2番目のクリップと一部を重複させることで、これを実現します。
import os import subprocess import json def split_video(input_path, output_dir, segment_duration=10): """ Split a video into segments of the specified duration. Regular segments will be exactly segment_duration seconds. The last segment will be at least 5 seconds long, potentially overlapping with the previous segment if needed. Args: input_path: Path to the input video file output_dir: Directory to save the output segments segment_duration: Duration of each segment in seconds (default: 10) """ # Minimum length for the last segment min_last_segment_len = 5 # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Get base filename without extension base_name = os.path.splitext(os.path.basename(input_path))[0] # Get video duration using ffprobe probe_cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", input_path ] try: probe_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) video_info = json.loads(probe_result.stdout) duration = float(video_info["format"]["duration"]) except Exception as e: print(f"Error getting video duration: {e}") return 0 # Calculate number of full segments num_full_segments = int(duration / segment_duration) # Calculate remaining duration remaining_duration = duration - (num_full_segments * segment_duration) # Determine total number of segments and if we need to adjust the last segment if remaining_duration > 0: if remaining_duration < min_last_segment_len: # Last segment would be too short, so we'll adjust its start time num_segments = num_full_segments + 1 needs_adjustment = True else: # Last segment is already long enough num_segments = num_full_segments + 1 needs_adjustment = False else: # No remaining duration, all segments are complete num_segments = num_full_segments needs_adjustment = False print(f"Video {base_name} is {duration:.2f} seconds long") print(f"Creating {num_segments} segments") # Create each segment for i in range(num_segments): # For regular segments, start at the segment boundary if i < num_full_segments: start_time = i * segment_duration actual_duration = segment_duration else: # This is the last segment if needs_adjustment: # Start earlier to ensure it's at least min_last_segment_len seconds start_time = duration - min_last_segment_len actual_duration = min_last_segment_len else: # Last segment is already long enough start_time = i * segment_duration actual_duration = remaining_duration output_path = os.path.join(output_dir, f"{base_name}_segment_{i:03d}.mp4") # For all segments, use copy mode for speed cmd = [ "ffmpeg", "-y", "-ss", str(start_time), "-i", input_path, "-t", str(actual_duration), "-c:v", "copy", "-c:a", "copy", output_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Error creating segment {i+1}: {result.stderr[:100]}...") else: end_time = start_time + actual_duration if i == num_segments - 1 and needs_adjustment: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s (adjusted to ensure at least {min_last_segment_len}s)") else: print(f"Created segment {i+1}/{num_segments}: {start_time:.1f}s to {end_time:.1f}s") print(f"Successfully split {base_name} into {num_segments} segments") return num_segments
新しく作成したvideo_segments_dirにビデオを保存します。
split_video(single_video_file, video_segments_dir,segment_length)
次に、pegasus_video_ids(ファイル名とPegasusビデオIDを関連付ける辞書)を作成し、元のビデオ全体のビデオIDを追加します。
pegasus_video_ids = {} fname = single_video_file.split("/")[-1] pegasus_video_ids[fname] = single_video_id
続いて、分割したビデオセグメントをPegasusにアップロードし、各ビデオIDをpegasus_video_idsに割り当てます。
segment_video_files = [f for f in os.listdir(video_segments_dir) if f.endswith(('.mp4'))] # Process each video for segment_video_file in segment_video_files: if segment_video_file in pegasus_video_ids: print("skip file",segment_video_file) continue print("processing file",segment_video_file) try: video_id = upload_video_to_twelve_labs_pegasus(video_segments_dir+segment_video_file) pegasus_video_ids[segment_video_file] = video_id except: print("error",segment_video_file) continue
最後に、LLaVa-NeXT-Videoモデルで効率的に処理できるよう、すべてのビデオをサンプリングします。
sampled_video_files = {} for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video for video_file in os.listdir(upscaled_video_dir): print(video_file) sampled_video = sample_video(upscaled_video_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
Weaviateへの埋め込みデータのアップロード
Weaviateは、コレクションに登録する際にメタデータレコードと埋め込みベクトルが区別されていることを想定しています。MarengoタスクIDとPegasusビデオIDを取得し、アップロード用のrecords(レコード配列)とvectors(ベクトル配列)を準備するためのprepare_marengo_embeddings_for_weaviate関数を作成します。
def prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids): # Prepare data for Weaviate upload records = [] vectors = [] for video_file_name in marengo_task_ids.keys(): marengo_task_id = marengo_task_ids[video_file_name] # Retrieve marengo full video and clip embeddings marengo_embeddings_result = twelve_labs_client.embed.task.retrieve(marengo_task_id) #track segment number to match with fiel segment_number = 0 for segment in marengo_embeddings_result.video_embedding.segments: # Determine if this is a video or clip segment is_video = segment.embedding_scope == "video" #Update the file name if segment updated_file_name = video_file_name if not is_video: updated_file_name = updated_file_name.replace(".mp4",f"_segment_{segment_number:03d}.mp4") segment_number += 1 video_name = video_file_name.replace(".mp4","") pegasus_video_id = None if updated_file_name in pegasus_video_ids: pegasus_video_id = pegasus_video_ids[updated_file_name] record = { 'video_name':video_name, 'segment_number': 0 if is_video else segment_number, 'video_file': updated_file_name, 'start_time': getattr(segment, 'start_offset_sec', 0), 'end_time': getattr(segment, 'end_offset_sec', 0), 'type': 'video' if is_video else 'clip', 'task_id': marengo_task_id, 'pegasus_video_id': pegasus_video_id } # Get the embedding vector embedding_vector = [float(x) for x in segment.embeddings_float] # Add to our lists records.append(record) vectors.append(embedding_vector) # Print summary print(f"Prepared {len(records)} segments for upload to Weaviate") print(f"- Video embeddings: {sum(1 for r in records if r['type'] == 'video')}") print(f"- Clip embeddings: {sum(1 for r in records if r['type'] == 'clip')}") return records, vectors
作成した関数を使用して、Weaviateにアップロードするレコードとベクトルを取得します。
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
ベクトル検索のテスト
すべてのデータがコレクションに揃ったので、Weaviateでのベクトル検索が正しいビデオを返すかテストできます。Weaviateのnear_vector検索を使用しているため、ビデオのベクトルで検索した場合は、同一オブジェクトとして距離が「0」として返出されます。
今回はコレクションから5番目のベクトル(インデックス5)を取り出して検索します。これにより、対応するビデオセグメントが距離「0」で返されます。
from weaviate.classes.query import MetadataQuery, Filter # Use a specific vector for the query query_vector = vectors[5] # Perform vector search response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), ) print(f"Found {len(response.objects)} results for vector search") for obj in response.objects: print(f"Video: {obj.properties['video_file']}, Type: {obj.properties['type']}") if 'segment_id' in obj.properties: print(f"Segment: {obj.properties['segment_id']}") if 'text' in obj.properties and obj.properties['text']: print(f"Text: {obj.properties['text']}") print(f"Distance: {obj.metadata.distance}") print("-" * 50)
この出力結果は、埋め込みが正常に保存され、正しく検索可能であることを示しています。
RAG用の関連性の高いビデオセグメントの取得
RAGパイプラインの核となるのは、ユーザーの質問を最も関連性の高いビデオセグメントと一致させる能力です。このプロセスは3つの重要な手順で機能します:
TwelveLabsのMarengoモデルを使用して、ユーザーのテキストクエリをベクトル埋め込みに変換します。
Weaviateをスキャンし、クエリ埋め込みに最も類似しているビデオセグメントの埋め込みを特定します。
最も関連性の高いビデオセグメントが特定されたら、それに関連付けられたPegasusのビデオIDを使用して、そのセグメントに特化した正確な回答を生成します。
この対象を絞った処理方法により、ビデオコンテンツの最も関連性の高い部分だけを処理させ、効率と応答品質を両立して大幅に向上させます。
まず、Marengoを使用してテキストクエリを埋め込みベクトルに変換します。
sample_question = "What technique did David Tyree use to catch the ball?" embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=sample_question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float
次に、最も関連性の高いクリップを特定します。filters=(Filter.by_property("type").equal("clip"))を使用して、ビデオ全体の埋め込みを無視し、クリップの埋め込みのみが返されるようにします。
response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) video_file = response.objects[0].properties.get("video_file") print(video_file)
検索結果により、4番目のクリップ(インデックス3)であるfootball_480_segment_003.mp4が返されたことが確認できます。
抽出されたクリップを確認してみましょう:
import matplotlib.pyplot as plt from matplotlib import animation from IPython.display import HTML video_file = response.objects[0].properties.get("video_file") video = sampled_video_files[video_file] fig = plt.figure() im = plt.imshow(video[0,:,:,:]) plt.close() # this is required to not display the generated image def init(): im.set_data(video[0,:,:,:]) def animate(i): im.set_data(video[i,:,:,:]) return im anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0], interval=100) HTML(anim.to_html5_video())
この方法によって、ヘルメットキャッチが行われたまさにその瞬間が正確に特定されています。

適切なセグメントが抽出された状態となったため、PegasusとLLaVa-NeXT-Videoがこの短いクリップに対してどのような挙動を示すかを確認します。
ビデオセグメントとの対話:Pegasus 対 LLaVa-NeTX-Video
最初にPegasusがどのように応答するかを見てみましょう。
pegasus_video_id = response.objects[0].properties.get("pegasus_video_id") print(sample_question) res = twelve_labs_client.generate.text( video_id=pegasus_video_id, prompt=sample_question ) print(f"{res.data}")
What technique did David Tyree use to catch the ball?
デビッド・タイリーは、ボールをヘルメットに押し付けてキャッチを確実にする技術を使用しました。これは、ニューヨーク・ジャイアンツがポゼッションを維持し、攻撃を継続することを可能にした極めて重要なプレーでした。
Pegasusが見事な回答を提供しました。ヘルメットキャッチのプレーと、それがジャイアンツにとって重要な推進力となった点に言及できています。
次に、LLaVa-NeTX-Videoが同じセグメントをスキャンした際に、より洗練された回答を返せるか確認します。
video_file = response.objects[0].properties.get("video_file") sampled_video = sampled_video_files[video_file] generated_text = query_llava_next(sample_question,llava_next_model,llava_next_processor,sampled_video) print(generated_text)
USER: What technique did David Tyree use to catch the ball? ASSISTANT: キャッチに成功した選手は、両手で頭上に持ってくるキャッチング技術を使用しました。この技術は、ディフェンス陣に囲まれる中で確実にボールを手元に引き込むためのものです。意図した軌道からボールが偏向した場合にはリスクの高い技術ですが、混戦の中で空中のボールを鷲掴みするのに非常に効果的な方法です。デビッド・タイリーは...
頭上でのハンドキャッチという点に言及するものの、ヘルメットキャッチに成功したという詳細(最も特徴的な要素)を認識できていません。さらに、後半で同じ結論を繰り返し饒舌になっています。
6 - Marengo、Weaviate、およびPegasusを使用した複数ビデオRAG
単一のビデオから個々のクリップに対してMarengoの埋め込みがどのように動作するかを理解した上で、より実用的なRAGユースケースに向けて、複数のビデオにまたがる埋め込みを使用する方法を示します。
すべてのビデオについてMarengo埋め込みを取得する
まず、すべてのビデオ用のMarengoタスクIDを用いて、marengo_task_idsの辞書を更新します。
for video_file_name in os.listdir(upscaled_video_dir): if video_file_name in marengo_task_ids: print(f"skipping {video_file_name} because embeddings already exist") continue print(f"processing {video_file_name}") file_path = os.path.join(upscaled_video_dir, video_file_name) task = twelve_labs_client.embed.task.create( model_name="Marengo-retrieval-2.7", video_file=file_path, video_clip_length=segment_length, video_embedding_scopes=["clip", "video"] ) print( f"Created task: id={task.id} model_name={task.model_name} status={task.status}" ) # Monitor the status of the video embedding task status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") marengo_task_ids[video_file_name] = task.id
残りのビデオをセグメントに分割する
残りのビデオを以前と同様にいくつかの短いセグメントに分割します。
# Create output folder if it doesn't exist os.makedirs(upscaled_video_dir, exist_ok=True) # Get all video files video_files = [f for f in os.listdir(upscaled_video_dir) if f.endswith(('.mp4', '.avi', '.mov'))] # Process each video for video_file in video_files: split_video(upscaled_video_dir + video_file,video_segments_dir,segment_length)
すべてのビデオおよびそのセグメントに対してPegasusビデオIDを取得する
次に、残りのセグメントと元動画すべてに対するPegasusビデオIDを抽出します。時間を節約するために、この処理を並行して実行します。
import concurrent.futures import os from tqdm import tqdm # Use standard tqdm instead of tqdm.notebook def process_video(video_path): video_file_name = video_path.split("/")[-1] try: video_id = upload_video_to_twelve_labs_pegasus(video_path) return video_file_name, video_id except Exception as e: print(f"Error processing {video_file_name}: {str(e)}") return video_file_name, None # Filter out videos that are already processed segment_video_files = [ video_segments_dir + f for f in os.listdir(video_segments_dir) if f.endswith('.mp4')] full_video_files = [ upscaled_video_dir + f for f in os.listdir(upscaled_video_dir) if f.endswith('.mp4')] all_video_files = segment_video_files + full_video_files videos_to_process = [f for f in all_video_files if f.split("/")[-1] not in pegasus_video_ids] print(f"Processing {len(videos_to_process)} videos in parallel...") # Use ThreadPoolExecutor for I/O-bound operations like API calls with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks and create a dictionary mapping futures to their video files future_to_video = {executor.submit(process_video, video_path): video_path for video_path in videos_to_process} # Process results as they complete with a progress bar for future in tqdm(concurrent.futures.as_completed(future_to_video), total=len(videos_to_process)): video_file_name, video_id = future.result() if video_id: pegasus_video_ids[video_file_name] = video_id print("All videos processed!") print(f"Successfully processed {len([v for v in pegasus_video_ids.values() if v is not None])} videos")
Weaviateへのデータアップロード
残りの埋め込みデータをWeaviateにアップロードします。
records, vectors = prepare_marengo_embeddings_for_weaviate(marengo_task_ids,pegasus_video_ids) with collection.batch.dynamic() as batch: for i, record in enumerate(records): if record["pegasus_video_id"] is None: continue batch.add_object( properties=record, vector=vectors[i] ) print(f"Added {len(records)} embeddings to Weaviate")
RAGパフォーマンス評価:クリップ 対 フルビデオ
Marengoの埋め込みとPegasusのビデオIDがWeaviateに正常にインデックス化されたので、このRAGシステムの有効性を評価します。この評価は以下の2つの重要な側面に焦点を当てます:
回答品質:クリップレベルの検索とビデオ全体の検索の違いによって、質問に対するシステムの応答の正確性はどのように変化するか?
処理効率:応答時間および計算リソースの消費量に関してどのような性能差異があるか?
この比較により、これら違いを定量的に測定可能にし、特により長尺なビデオや、ビデオ内の特定の瞬間を提示する複雑な質問に対して、最も適切な箇所だけに対象を動的に絞ることで、RAGが処理効率と品質を同時に向上させる仕組みを実証します。
まず、複数の異なるスポーツジャンルをカバーし、かつビデオに含まれる特定のイベントを解釈することを必要とする、様々なテスト用質問セットを定義します。
video_questions = [ "In the American Football Video, what are the teams playing?", "What technique does David Tyree use to catch the ball?", "In the tennis match video, who is playing?", "What foot does Messi shoot at the goal with?", "When does Keri Strug hurt her foot?" ]
PegasusによるマルチビデオRAG
最初に、フルビデオを取得して質問した際の回答動作を測定します:
from weaviate.classes.query import MetadataQuery, Filter import time # Perform query using the whole video dataset pegasus_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_full_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 72 seconds
次に、これを10秒のセグメントクリップでの実行と比較します。
pegasus_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start", ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_name = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] res = twelve_labs_client.generate.text( video_id=selected_video_id, prompt=question ) pegasus_clip_video_answers.append([question,selected_video_name,res.data]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 20 seconds
抽出したセグメントクリップとフルビデオそれぞれの検索により生成された回答を比較します。
for clip_answer, full_answer in zip(pegasus_clip_video_answers, pegasus_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
双方から十分に正確で一貫性の高い回答が得られたことが分かります。しかし、クリップを用いたアプローチがわずか20秒で完了したのに対して、ビデオ全体を毎回処理するアプローチは完了に72秒を要しました。
LLaVa-NeXT-VideoによるマルチビデオRAG
次に、LLaVa-NeXT-Videoモデルを使用して同様のテストを行います。ただし、まずは事前にすべてのビデオファイルをサンプリングする必要があります。
for video_file in os.listdir(video_segments_dir): print(video_file) sampled_video = sample_video(video_segments_dir + video_file,num_samples=40) sampled_video_files[video_file] = sampled_video
フルビデオの読み込みによる質問処理から開始します。
llava_full_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("video")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_full_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
次に、これをセグメントクリップと比較します。
from weaviate.classes.query import MetadataQuery import time llava_clip_video_answers = [] start_time = time.time() for question in video_questions: embedding = twelve_labs_client.embed.create( model_name="Marengo-retrieval-2.7", text=question, text_truncate="start" ) query_vector = embedding.text_embedding.segments[0].embeddings_float response = collection.query.near_vector( near_vector=query_vector, limit=1, return_metadata=MetadataQuery(distance=True), filters=(Filter.by_property("type").equal("clip")) ) selected_video_file = response.objects[0].properties["video_file"] selected_video_id = response.objects[0].properties["pegasus_video_id"] sampled_video = sampled_video_files[selected_video_file] generated_text = query_llava_next(question,llava_next_model,llava_next_processor,sampled_video) llava_clip_video_answers.append([question,selected_video_name,generated_text]) end_time = time.time() execution_time = end_time - start_time print(f"Execution time: {int(execution_time)} seconds")
Execution time: 24 seconds
各実行で全く同等の時間を要したことが分かります。これは、ビデオの秒数に関係なく、各項目から常に均等に40枚のフレームだけを抽出して評価するためです。
次に、LLaVa-NeXT-Videoが抽出したクリップとフルビデオに対してそれぞれ生成した返答内容を調査します。
for clip_answer, full_answer in zip(llava_clip_video_answers, llava_full_video_answers): print("question",clip_answer[0]) print("clip: ",clip_answer[2]) print("full: ",full_answer[2]) print("\n")
今回のケースでは、セグメントクリップを処理した際に、LLaVa-NeXT-Videoは5問中2問で正しい解を導き出しました:
最初の質問において、ニューヨーク・ジャイアンツとニューイングランド・ペイトリオッツが対戦中である点を認識できており、また
3番目の質問において、テニスの試合がロジャー・フェデラーとノバク・ジョコビッチの間で行われている点を正確に見分けています。
7 - まとめ:TwelveLabsとWeaviateによる効率的なビデオ理解のためのRAGの活用
ビデオ処理における検索拡張生成(RAG)の調査は、効率と正確性の両面で大きな利点を示しました。TwelveLabsの高度なビデオ理解機能とWeaviateの強力なベクトルデータベースを組み合わせることで、ビデオ全体ではなく、最も関連性の高いビデオセグメントのみを論理的に処理するシステムを構築しました。
主な知見
パフォーマンスの改善:Weaviateを統合したRAGシステムにTwelveLabsのPegasusを適応すると、元ビデオを毎回全て読み込ませる代わりに、抽出されたクリップだけをクエリ処理することで、レスポンス速度を劇的に改善できました。
正確性の強化:LLaVa-NeXT-Videoなどのオープンソースモデルにおいて、注目箇所をあらかじめ切り分けたクリップセグメントに限定することで、ビデオ全体の処理に比べて回答の正確さが大きく向上することが示されました。
スケーラビリティを備えたアーキテクチャ:このRAGモデルの検証は、TwelveLabsの埋め込みモデル(Marengo)とWeaviateの高性能ベクトルデータベースの連携が、いかに強力な処理性能を提供するかを示しています。高次元の埋め込みデータを低遅延で検索して保存できるWeaviateの技術は、ビデオ理解などの複雑なタスクを実用的なアプリケーションに落とし込むための極めて重要なコアコンポーネントです。
ユースケース
TwelveLabsのビデオ理解機能とWeaviateのベクトルデータベースを組み合わせることで、多数の産業で強力なアプリケーションの構築が可能になります:
メディア&エンターテインメント:クリエイターは大規模なアーカイブから目的のシーンを瞬時に検索可能になり、編集作業の迅速化やSNS向け動画クリップの自動生成、アーカイブの効率的な再活用などが可能になります。
スポーツアナリティクス:コーチやアナリストは、ビデオを手動で早送り・巻き戻しして探すことなく、特定のアクション(例:『相手チームの裏に投げる短いパスのシーン』)をただテキストで定義するだけで、即座に関連クリップを全試合データから呼び出せるようになります。
小売・Eコマース:小売事業者は自社のチュートリアル動画を、ただ受動的に見せる一方向のコンテンツから、双方向で質問に答えるインタラクティブなショッピング体験にシフトさせることができます。例えば、顧客が『ストラップの調整方法を教えて』『バックパックに入れたときのサイズ感を見せて』と質問した際、説明動画の該当部分のみを自動で瞬時にカットして再生表示するといったことが容易に実現します。
TwelveLabsとWeaviateの組み合わせにより、膨大なビデオデータを扱う環境でも、情報抽出の効率化と高度な対話システムを同時に満たす、強力なVideo RAG基盤を構築できます。




