パートナーシップ
マルチモーダルRAG:Twelve LabsとPineconeを使用したビデオとのチャット
ジェームズ・リー、マニッシュ・マヘシュワリ、アレックス・オーウェン
開発者は、Twelve Labs Embed APIとPineconeを使用してマルチモーダルRAGシステムを構築し、ビデオ埋め込みの生成、セマンティック検索による関連クリップの取得、さらにはPegasusを使用したビデオコンテンツとのチャットを行うことができます。また、オープンソースのLLaVA-NeXT-Videoモデルとの直接的な比較も含まれています。
開発者は、Twelve Labs Embed APIとPineconeを使用してマルチモーダルRAGシステムを構築し、ビデオ埋め込みの生成、セマンティック検索による関連クリップの取得、さらにはPegasusを使用したビデオコンテンツとのチャットを行うことができます。また、オープンソースのLLaVA-NeXT-Videoモデルとの直接的な比較も含まれています。

この記事の内容
ニュースレターに登録する
ニュースレターに登録する
ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします
ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします
AIを活用してビデオを検索、分析、探索します。
2024/10/03
12分
記事へのリンクをコピー
このチュートリアルの共同制作にご協力いただいたPineconeチーム(Adam Heerwagen氏およびCory Waddingham氏)に深く感謝いたします。
はじめに
このチュートリアルでは、動画に対するRAGベースのQ&Aを実現するために、Twelve LabsのEmbed APIとホスト型ベクトルデータベースのPineconeを統合する方法について説明します。このガイドでは、生成モデルを使用して、非構造化動画データベースからテキストの回答を抽出する方法を学びます。
Twelve Labsの豊かでコンテキストを考慮した埋め込み(embeddings)とPineconeのベクトルデータベースを組み合わせることで、これら動画の埋め込みを保存、インデックス化、クエリし、チャットアプリケーションを構築します。このノートブックでは、わずか数行のコードでこれらのテクノロジーがどのように実現できるかを紹介します。
比較として、テキスト応答を生成するためにTwelve LabsのGenerate APIを使用する場合と、代表的なオープンソースモデルであるLLaVA-NeXT-Videoを使用する場合の開発者エクスペリエンス(DX)の違いも実演します。
セットアップとインストール
コア機能に入る前に、環境をセットアップし、必要なライブラリをインストールしましょう。
必要なライブラリのインストール
まず、Twelve LabsとPineconeのライブラリをインストールします。ノートブックのセルで次のコマンドを実行してください。
# Install required libraries !pip install twelvelabs pinecone-client
次に、動画フォーマット処理用のPyAV、およびHugging Faceでオープンソースモデルを使用するためのbitsandbytesとtransformersをインストールします。
!pip install -q av !pip install --upgrade -q accelerate bitsandbytes !pip install transformers
認証
Twelve Labs APIとPineconeのAPIキーを設定する必要があります。これらのキーを保存するために、Google Colabに組み込まれているuserdataライブラリを使用します。サインアップ後、Pineconeの情報はコンソールで確認できます。Pineconeは、このデモには十分すぎる無料のスターターティア(Starter Tier)を提供しています。
Twelve Labsのキーは、https://playground.twelvelabs.ioでサインアップした後、アカウント内で確認できます。
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') PINECONE_API_KEY=userdata.get('PINECONE_API_KEY')
動画データの設定
次に、埋め込み対象となる動画データを取得する必要があります。動画データは、このリンクからGoogleドライブのフォルダ内で確認できます。これをGoogleドライブのマイドライブ直下にある「TwelveLabs-Pinecone」という名前のフォルダにコピーしてください。次のセルを使用してドライブをマウントし、ノートブックから動画ファイルにアクセスできるようにします。
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Pinecone" single_video = base_folder_path + "/ad_vids/Rare Beauty By Selena Gomez - Makeup Made To Feel Good In.mp4" split_video_dir = base_folder_path + "/split_ad_videos"
クライアントのセットアップ
PineconeとTwelve Labsの設定を行う段階です。それぞれのAPIキーを使用して両方のサービスをインポートし、初期化します。
# Configure Pinecone from pinecone import Pinecone, ServerlessSpec pc = Pinecone(api_key=PINECONE_API_KEY) from twelvelabs import TwelveLabs from twelvelabs.models.embed import EmbeddingsTask # Initialize the Twelve Labs client twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)
埋め込み(Embeddings)の作成とPineconeへの取り込み
以下のコードブロックは、Twelve Labs APIとPineconeベクトルデータベースを使用して動画の埋め込みを生成・保存するプロセスを示しています。主に次の2つの関数を定義しています。
generate_embedding関数は、埋め込みタスクの作成と管理を担当します。指定された動画ファイルとエンジンを用いて、Twelve Labs APIで埋め込みタスクを作成します。
タスクの進行状況を監視するためのコールバック関数を定義します。
タスクの完了を待ち、結果を取得します。
最後に、タスク結果から埋め込みデータとメタデータ(タイムレンジおよびスコープ)を抽出します。
ingest_data関数は、データ取り込みを担当するメインの関数です。generate_embeddingを呼び出して、指定された動画ファイルの埋め込みを取得します。Pineconeのインデックス(この例では
twelve-labsという名前)に接続します。埋め込みデータとメタデータを整形し、アップサート(upsert)用のベクトルを準備します。
ベクトルをPineconeインデックスにアップサート(登録・更新)します。
このコードを実行すると、埋め込みタスクの処理に伴って進行状況が更新され、最終的にPineconeに取り込まれた埋め込みの数が確認メッセージとして表示されます。これにより、後でこれらの埋め込み検索を使用して動画コンテンツを検索・分析するための基盤が整います。
# Define a callback function to monitor task progress def on_task_update(task: EmbeddingsTask): print(f" Status={task.status}") def generate_embedding(video_file): # Create an embedding task task = twelvelabs_client.embed.task.create( engine_name="Marengo-retrieval-2.6", video_file=video_file ) print(f"Created task: id={task.id} engine_name={task.engine_name} status={task.status}") # Wait for the task to complete status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") # Retrieve the task result task_result = twelvelabs_client.embed.task.retrieve(task.id) # Extract and return the embeddings embeddings = [] for v in task_result.video_embeddings: embeddings.append({ 'embedding': v.embedding.float, 'start_offset_sec': v.start_offset_sec, 'end_offset_sec': v.end_offset_sec, 'embedding_scope': v.embedding_scope }) return embeddings, task_result def ingest_data(video_file_path, index_name = "twelve-labs"): """ Generate embeddings for video and store in Pinecone """ #Strip the extension and the folders from the video_file_path video_name = os.path.splitext(os.path.basename(video_file_path))[0] print(video_name) # Connect to Pinecone index if index_name not in pc.list_indexes().names(): pc.create_index( name=index_name, dimension=1024, # The dimensions of Twelve Lab's Embedding Model metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) index = pc.Index(index_name) # Generate embeddings using Twelve Labs Embed API embeddings, task_result = generate_embedding(video_file_path) # Prepare vectors for upsert vectors_to_upsert = [] for i, emb in enumerate(embeddings): vector_id = f"{video_name}_{i}" vectors_to_upsert.append((vector_id, emb['embedding'], { 'video_file': video_name, 'video_segment': i, 'start_time': emb['start_offset_sec'], 'end_time': emb['end_offset_sec'], 'scope': emb['embedding_scope'] })) # Upsert embeddings to Pinecone index.upsert(vectors=vectors_to_upsert) return f"Ingested {len(embeddings)} embeddings for {video_file_path}"
そして、これら2つの関数を使用して、動画の埋め込みをPineconeにロードします。
# Example usage result = ingest_data(single_video) print(result)
このコードにより、Twelve LabsのEmbed APIを使用して動画のマルチモーダル埋め込みを生成し、後で検索できるようにPineconeに保存できます。この埋め込みは、視覚情報、音声情報、テキスト情報を含む動画コンテンツのさまざまな側面を捉えているため、幅広いAIアプリケーションに適しています。
テキストクエリによる検索
次に、Twelve LabsのMarengoモデルを使用してテキストを埋め込み、Pineconeデータベースから類似したコンテンツを検索するための関数を設定します。
get_text_embedding関数は、Twelve Labs Embed APIを使ってテキストクエリを埋め込みベクトルに変換します。twelvelabs_client.embed.createメソッドを使用して、指定されたテキストの埋め込みを生成します。engine_nameパラメータには、使用する埋め込みモデル(“Marengo-retrieval-2.6”)を指定します。text_truncateパラメータは “start” に設定されており、テキストが長すぎる場合は、先頭から切り捨てられます。
retrieve_similar_content関数は、コンテンツ検索を実行するメインの関数です。テキストクエリと、返される結果の数(
top_k)をパラメータとして受け取ります。get_text_embeddingを呼び出して、テキストクエリを埋め込みに変換します。twelve-labsという名前のPineconeインデックスに接続します。クエリ埋め込みに類似したベクトルをPineconeインデックスに対して照会し、返す結果の数を指定して、メタデータを含めて取得します。
この検索プロセスは、テキストクエリの埋め込みと、Pineconeに保存されている動画セグメントの事前に計算された埋め込みとを比較することによって動作します。これにより、大規模な動画データセット全体に対して、高速かつ効率的な類似性検索が可能になります。
def get_text_embedding(text_query): # Twelve Labs Embed API supports text-to-embedding text_embedding = twelvelabs_client.embed.create( engine_name="Marengo-retrieval-2.6", text=text_query, text_truncate="start" ) return text_embedding.text_embedding.float def retrieve_similar_content(query, index_name="twelve-labs", top_k=5): """ Retrieve similar content based on query embedding """ # Generate query embedding query_embedding = get_text_embedding(query) # Connect to Pinecone index index = pc.Index(index_name) # Query Pinecone for similar vectors results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True) return results
これで、サンプルのテキストクエリを使用してretrieve_similar_content関数を実行し、クエリと見つかった上位の類似コンテンツの詳細を出力することができます。
# Example usage text_query = "Lipstick" similar_content = retrieve_similar_content(text_query) print(f"Query: '{text_query}'") print(f"Top {len(similar_content['matches'])} similar content:") for i, match in enumerate(similar_content['matches']): print(f"{i+1}. Score: {match['score']:.4f}") print(f" Video File: {match['metadata']['video_file']}") print(f" Video ID: {match['metadata']['video_segment']}") print(f" Time range: {match['metadata']['start_time']} - {match['metadata']['end_time']} seconds") print(f" Scope: {match['metadata']['scope']}") print()
このコードを使用すると、テキストクエリを使って動画コンテンツに対してセマンティック検索を実行できます。Twelve Labsのマルチモーダル埋め込みの力を活用して、正確な単語が動画に含まれていなくても、テキストクエリと意味的に類似している動画セグメントを見つけ出します。
このコードを実行すると、類似度スコア、動画ファイル、動画ID、タイムレンジ、スコープとともに、最も一致する動画セグメントが上位に表示されます。これにより、動画検索やコンテンツのレコメンデーションなど、さまざまなアプリケーションの実装が可能になります。
動画の切り出しフォーマット設定
データベース内の動画埋め込みとそれらを照会する機能によって、最初の実験では、これらの埋め込みをフル動画ではなく特定の短い動画クリップにリンクさせることを目指します。埋め込みモデルがタイムスタンプを処理する方法と同様に、動画を分割していきます。
以下のsplit_video関数は、avライブラリを使用して、指定された長さの短いセグメントに動画ファイルを分割します。簡単な説明は以下の通りです。
この関数は、入力動画パス、出力ディレクトリ、およびセグメント期間(デフォルトは6秒)をパラメータとして受け取ります。
入力動画を開き、動画のフレームレートに基づいてセグメントあたりのフレーム数を計算し、動画フレームをループ処理します。
各セグメントに対して新しい出力コンテナを作成し、そこにフレームを書き込み、フレームのタイムスタンプを調整します。
分割されたセグメントは、連番の名前が付けられた個別のMP4ファイルとして出力ディレクトリに保存されます。
import av def split_video(input_path, output_dir, segment_duration=6): # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) input_file_name = os.path.splitext(os.path.basename(input_path))[0] print(input_file_name) with av.open(input_path) as input_container: # Get video stream input_stream = input_container.streams.video[0] fps = input_stream.average_rate # Calculate how many frames are in each segment frames_per_segment = int(segment_duration * fps) segment_count = 0 frame_count = 0 output_container = None output_stream = None first_frame_timestamp = None for frame in input_container.decode(video=0): if frame_count % frames_per_segment == 0: # Close previous output container if it exists if output_container: output_container.close() # Create a new output container output_path = os.path.join(output_dir, f'{input_file_name}_segment_{segment_count:03d}.mp4') segment_count += 1 output_container = av.open(output_path, mode='w') output_stream = output_container.add_stream('h264', rate=fps) output_stream.width = frame.width output_stream.height = frame.height output_stream.pix_fmt = 'yuv420p' # Reset the first frame timestamp for the new segment first_frame_timestamp = frame.pts # Adjust the frame timestamp frame.pts -= first_frame_timestamp # Encode frame packet = output_stream.encode(frame) output_container.mux(packet) frame_count += 1 # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the last output container if output_container: output_container.close() split_video(input_path=single_video, output_dir=split_video_dir)
クエリの設定
生成モデルとの対話を開始する準備がすべて整いました。クエリを定義し、関連するコンテンツを取得しましょう。
query = "What is this advertisement selling?" similar_content = retrieve_similar_content(query)
Pegasusを使用した動画クリップとのチャット
Pegasus-1モデルを使用するためには、以下の3つのステップが必要です。
Twelve Labsに動画をホストするインデックスをセットアップする(これは一度だけ実行すれば十分です)
動画をTwelve Labsにアップロードする(これは動画ごとに一度だけ実行すれば十分です)
プロンプトと動画を指定してPegasusにクエリを投げる
まず、インデックスをセットアップします。
engines = [ { "name": "pegasus1.1", "options": ["visual", "conversation"] } ] index_name = "ads_index" indices_list = twelvelabs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelvelabs_client.index.create( name=index_name, engines=engines, ) print(f"A new index has been created: id={index.id} name={index.name} engines={index.engines}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} engines={index.engines}")
次に、使いやすくするためにアップロードロジックを設定します。
def upload_video_to_twelve_labs(video_path): task = twelvelabs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifier of your video is {task.video_id}.") return task.video_id
これで、分割動画の全ディレクトリをループ処理し、Twelve Labsのインデックスにアップロードできます。
video_ids = {} for split_video_filename in os.listdir(split_video_dir): split_video_path = os.path.join(split_video_dir, split_video_filename) print(split_video_path) split_video_name = split_video_filename.split('.')[0] print(split_video_name) video_id = upload_video_to_twelve_labs(split_video_path) video_ids[split_video_name] = video_id print(video_ids)
Pegasusの呼び出し
あとは、検索結果を実際の動画クリップに関連付けた後、シンプルなクエリを送信するだけです。
# retrieve the correct video_id for the relevant video video_segment = (int) (similar_content['matches'][0]['metadata']['video_segment']) print(f"Retrieved video segment: {video_segment}") base_filename = os.path.splitext(os.path.basename(single_video))[0] video_key = f"{base_filename}_segment_{video_segment:03d}" video_id = video_ids[video_key] res = twelvelabs_client.generate.text( video_id=video_id, prompt=query ) print(f"{res.data}")
LLaVA-NeXT-Videoの使用
オープンソースモデルを使用する場合は、以下の作業を行う必要があります。
動画をnumpy形式に変換する
動画をモデルが処理可能なフレームのサブセットにサンプリングする
モデルをダウンロードし、独自のGPUでホストする
モデルのすべてのフォーマット処理とクエリの実行を行う
動画をnumpy形式に変換する
read_video_pyav()関数は、PyAVを使用して動画から特定のフレームをデコードします。
import av import numpy as np def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame)
動画のサンプリング
以下のコードブロックの説明は次の通りです。
get_total_frames(): 動画の総フレーム数を手動でカウントします(均一サンプリングで0フレームと誤判定される場合のフォールバックとして機能します)。sample_video(): 動画から指定された数のフレームを均一にサンプリングします。process_videos_in_folder(): 指定されたフォルダ内のすべての動画を処理し、それぞれからフレームをサンプリングします。
def get_total_frames(video_path): """ Manually count the total number of frames in a video. Used in case uniformly sampling comes up as 0 frames. """ container = av.open(video_path) video_stream = container.streams.video[0] total_frames = 0 for frame in container.decode(video_stream): total_frames += 1 return total_frames def sample_video(video_path, num_samples=8): container = av.open(video_path) video_stream = container.streams.video[0] # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames if total_frames == 0: total_frames = get_total_frames(video_path) indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames def process_videos_in_folder(folder_path): sample_info = {} # Supported video file extensions video_extensions = ('.mp4', '.avi', '.mov', '.mkv') for filename in os.listdir(folder_path): simple_video_name = os.path.splitext(os.path.basename(filename))[0] if filename.lower().endswith(video_extensions): video_path = os.path.join(folder_path, filename) try: print("Sampling " + video_path) sampled_clip = sample_video(video_path) sample_info[simple_video_name] = {"sampled_video": sampled_clip, "video_path" : video_path} except Exception as e: print(f"Error processing {filename}: {str(e)}") return sample_info sampled_video_info = process_videos_in_folder(split_video_dir)
モデルのロード
ここでは、Hugging Face Transformersライブラリを使用して、LLaVA-NeXT-Videoモデルとそのプロセッサを初期化します。特に、メモリの効率的な使用のために4ビット量子化(4-bit quantization)を適用しています。
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
モデルへの質問
この関数を使用して、Pineconeクエリから返された実際のファイル名からセグメント番号を抽出します。video_segment_name_from_offset()関数は、動画パスと開始時間に基づいてセグメント名を生成し、'similar_content'辞書のメタデータに基づいて特定の動画セグメントを取得します。
def video_segment_name_from_offset(video_path, start_time, segment_length = 6): segment_number = int (start_time // segment_length) simple_video_name = os.path.splitext(os.path.basename(video_path))[0] return f"{simple_video_name}_segment_{segment_number:03d}"
これで、サンプリングされた動画を取得して、モデルへのリクエスト形式を整えることができます。まず、テキストと動画コンテンツの両方を含む会話入力(conversation input)を準備します。次に、LLaVA-NeXT-Videoプロセッサを使用して入力を処理します。最後に、指定されたパラメータを適用してモデルで応答を生成します。
video_segment = similar_content['matches'][0]['metadata']['video_file'] print(video_segment) video_offset = similar_content['matches'][0]['metadata']['start_time'] video_segment_name = video_segment_name_from_offset(video_segment, video_offset) video_segment = sampled_video_info[video_segment_name]['sampled_video'] # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) prompt_len = len(prompt) inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) print(generated_text[0])
比較
これらのモデルの出力を比較すると、LLaVA-NeXT-Videoと比較して、Pegasusからはより詳細で文脈を意識した回答が得られます。
しかし、両モデルとも動画のクリップしか与えられていないため、動画全体で何が起こっているかを理解することに苦労していることは明らかです。それでは、動画全体をモデルに提供するとどうなるかを見てみましょう。
複数動画の処理
ここからは、動画がランダムに保存された非構造化フォルダを扱い、それら全体について質問を投げます。先ほどまでの検索プロセスでは、特定のクエリに対して最も関連性の高い「クリップ」を見つけ出していました。ここでも同様のことを行いますが、今回はそのクリップに対応する「動画全体」をモデルに提供します。これは、Pineconeに保存されているメタデータを使用することで簡単に実現できます。
まず、動画をPineconeに取り込むことから始めます。
ads_dir = os.path.join(base_folder_path,"ad_vids") video_list = [] #Make sure that we' don't waste time re-embedding the original single video: single_video_filename = os.path.splitext(os.path.basename(single_video))[0] for filename in os.listdir(ads_dir): if filename.endswith(".mp4") and single_video_filename not in filename: video_list.append(ads_dir + "/" + filename) print(video_list) for video in video_list: ingest_data(video)
次に、データベースに投げかけるいくつかの質問を設定し、最も関連性の高い動画を検索します。
full_database_questions = ["Who is the actor in the Miss Dior video?", "What ad is Selena Gomez in?", "What is the ad for Rare Beauty about?", "Why should people buy the Rare Beauty product according to their ad?"] question = full_database_questions[0] similar_content_from_question = retrieve_similar_content(question) video_name = similar_content_from_question['matches'][0]['metadata']['video_file']
Pegasusの使用
次に、Python SDKを介してPegasusを使用するために必要な追加ステップについて説明します。今回はインデックスがすでにセットアップされているため、クエリを実行する前に動画をアップロードするだけで完了します。
Twelve Labsへの動画のアップロード
ディレクトリ内の動画をループ処理し、それぞれをTwelve Labsにアップロードして、生成された動画IDを保存します。
for vid in os.listdir(ads_dir): vid_path = os.path.join(ads_dir, vid) vid_name = os.path.splitext(os.path.basename(vid_path))[0] print(vid_path) video_id = upload_video_to_twelve_labs(vid_path) video_ids[vid_name] = video_id
動画データベースを指定したPegasusへのクエリ
次に、Twelve Labsクライアントを使用して、動画IDと質問プロンプトに基づきテキスト応答を生成します。
video_id = video_ids[video_name] res = twelvelabs_client.generate.text( video_id=video_id, prompt=question ) print(f"{res.data}")
動画データベースにおけるLLaVA-NeXT-Videoの使用
動画のサンプリング
まず、すべての動画をサンプリングして保存し、検索された動画の正しいサンプリングデータにアクセスする必要があります。
sampled_database_video_info = process_videos_in_folder(ads_dir) video_segment = sampled_database_video_info[video_name]['sampled_video']
モデルの実行
これで、サンプリングされた動画に対してモデルを実行できます。
まず、ユーザーのロール、テキスト(質問)、および動画コンテンツを含む会話構造を作成します。
次に、会話にチャットテンプレートを適用し、モデル向けのインプット(プロンプトと動画セグメントを含む)を準備し、応答作成時の生成パラメータ(
max_new_tokens、do_sample、top_p)を設定します。最後に、LLaVA-NeXT-Videoモデルを使用して入力に基づいてテキストを設定・生成し、出力をデコードして生成されたテキストをプリントします。
conversation = [ { "role": "user", "content": [ {"type": "text", "text": question}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) prompt_len = len(prompt) inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) print(generated_text[0])
比較
このクエリを2つのモデルで実行すると、Pegasusはナタリー・ポートマンが誰であるかを明確に理解し、動画内の彼女の存在を正確に認識していることがわかります。対照的に、LLaVA-NeXT-Videoモデルは、ナタリー・ポートマンを認識できないか、与えられたサンプリングフレームからは彼女を十分に捉えることができません。さらに、このオープンソースモデルは話が脱線する傾向があり、その結果、応答が長くなり、レイテンシが増大します。これは本番環境でのユースケースにおいては潜在的な懸念事項となり得ます。
まとめ
このガイドでは、動画を個別にあるいは完全なセットとして操作する方法について説明しました。検索管理には、Twelve LabsのEmbed APIとPineconeのベクトルデータベースを活用しました。
また、Twelve LabsのPegasusモデルと、オープンソースモデルであるLLaVA-NeXT-Videoを比較し、必要なインフラストラクチャ、開発者体験、およびクエリ結果の観点から評価しました。Pegasusは、オープンソースモデルと比較して、運用上のオーバーヘッドが少なく、指示に従う精度が高いため、非常に有望であることを示しました。
ベストプラクティス
専用ホストを使用する場合は、PineconeのPodベースのプランの利用をご検討ください。
オープンソースの動画モデルが処理するフレーム数と、その精度、および生成におけるレイテンシのトレードオフを慎重に考慮してください。
可能であれば、推論を高速化するためにオープンソースモデルを量子化してください。特定のニーズを満たすために、RAM使用量、速度、および品質の間で推論時間のトレードオフを実験してみるのが良いでしょう。
次のステップ
はるかに大規模な動画セットから検索する場合、検索メカニズムの精度が低下する可能性があります。いくつかの解決策として、以下の手段が挙げられます。
データにさらに適合させるために、埋め込みベクトルの上に線形アダプターをトレーニングさせる。
異なる動画からのクリップが返された場合に、Pegasusを使用して動画を再ランク付け(Re-rank)する。
Pineconeのエントリに各動画のサマリー(要約)データを追加し、ハイブリッド検索システムを構築することで、Pineconeのメタデータ機能を利用して精度を向上させる。
付録
参照およびさらなる探索のためのリソース一覧です。
完成版のColab Notebook
Pineconeのクライアント、およびクラウド用のドキュメント(英語)
このチュートリアルの共同制作にご協力いただいたPineconeチーム(Adam Heerwagen氏およびCory Waddingham氏)に深く感謝いたします。
はじめに
このチュートリアルでは、動画に対するRAGベースのQ&Aを実現するために、Twelve LabsのEmbed APIとホスト型ベクトルデータベースのPineconeを統合する方法について説明します。このガイドでは、生成モデルを使用して、非構造化動画データベースからテキストの回答を抽出する方法を学びます。
Twelve Labsの豊かでコンテキストを考慮した埋め込み(embeddings)とPineconeのベクトルデータベースを組み合わせることで、これら動画の埋め込みを保存、インデックス化、クエリし、チャットアプリケーションを構築します。このノートブックでは、わずか数行のコードでこれらのテクノロジーがどのように実現できるかを紹介します。
比較として、テキスト応答を生成するためにTwelve LabsのGenerate APIを使用する場合と、代表的なオープンソースモデルであるLLaVA-NeXT-Videoを使用する場合の開発者エクスペリエンス(DX)の違いも実演します。
セットアップとインストール
コア機能に入る前に、環境をセットアップし、必要なライブラリをインストールしましょう。
必要なライブラリのインストール
まず、Twelve LabsとPineconeのライブラリをインストールします。ノートブックのセルで次のコマンドを実行してください。
# Install required libraries !pip install twelvelabs pinecone-client
次に、動画フォーマット処理用のPyAV、およびHugging Faceでオープンソースモデルを使用するためのbitsandbytesとtransformersをインストールします。
!pip install -q av !pip install --upgrade -q accelerate bitsandbytes !pip install transformers
認証
Twelve Labs APIとPineconeのAPIキーを設定する必要があります。これらのキーを保存するために、Google Colabに組み込まれているuserdataライブラリを使用します。サインアップ後、Pineconeの情報はコンソールで確認できます。Pineconeは、このデモには十分すぎる無料のスターターティア(Starter Tier)を提供しています。
Twelve Labsのキーは、https://playground.twelvelabs.ioでサインアップした後、アカウント内で確認できます。
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') PINECONE_API_KEY=userdata.get('PINECONE_API_KEY')
動画データの設定
次に、埋め込み対象となる動画データを取得する必要があります。動画データは、このリンクからGoogleドライブのフォルダ内で確認できます。これをGoogleドライブのマイドライブ直下にある「TwelveLabs-Pinecone」という名前のフォルダにコピーしてください。次のセルを使用してドライブをマウントし、ノートブックから動画ファイルにアクセスできるようにします。
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Pinecone" single_video = base_folder_path + "/ad_vids/Rare Beauty By Selena Gomez - Makeup Made To Feel Good In.mp4" split_video_dir = base_folder_path + "/split_ad_videos"
クライアントのセットアップ
PineconeとTwelve Labsの設定を行う段階です。それぞれのAPIキーを使用して両方のサービスをインポートし、初期化します。
# Configure Pinecone from pinecone import Pinecone, ServerlessSpec pc = Pinecone(api_key=PINECONE_API_KEY) from twelvelabs import TwelveLabs from twelvelabs.models.embed import EmbeddingsTask # Initialize the Twelve Labs client twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)
埋め込み(Embeddings)の作成とPineconeへの取り込み
以下のコードブロックは、Twelve Labs APIとPineconeベクトルデータベースを使用して動画の埋め込みを生成・保存するプロセスを示しています。主に次の2つの関数を定義しています。
generate_embedding関数は、埋め込みタスクの作成と管理を担当します。指定された動画ファイルとエンジンを用いて、Twelve Labs APIで埋め込みタスクを作成します。
タスクの進行状況を監視するためのコールバック関数を定義します。
タスクの完了を待ち、結果を取得します。
最後に、タスク結果から埋め込みデータとメタデータ(タイムレンジおよびスコープ)を抽出します。
ingest_data関数は、データ取り込みを担当するメインの関数です。generate_embeddingを呼び出して、指定された動画ファイルの埋め込みを取得します。Pineconeのインデックス(この例では
twelve-labsという名前)に接続します。埋め込みデータとメタデータを整形し、アップサート(upsert)用のベクトルを準備します。
ベクトルをPineconeインデックスにアップサート(登録・更新)します。
このコードを実行すると、埋め込みタスクの処理に伴って進行状況が更新され、最終的にPineconeに取り込まれた埋め込みの数が確認メッセージとして表示されます。これにより、後でこれらの埋め込み検索を使用して動画コンテンツを検索・分析するための基盤が整います。
# Define a callback function to monitor task progress def on_task_update(task: EmbeddingsTask): print(f" Status={task.status}") def generate_embedding(video_file): # Create an embedding task task = twelvelabs_client.embed.task.create( engine_name="Marengo-retrieval-2.6", video_file=video_file ) print(f"Created task: id={task.id} engine_name={task.engine_name} status={task.status}") # Wait for the task to complete status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") # Retrieve the task result task_result = twelvelabs_client.embed.task.retrieve(task.id) # Extract and return the embeddings embeddings = [] for v in task_result.video_embeddings: embeddings.append({ 'embedding': v.embedding.float, 'start_offset_sec': v.start_offset_sec, 'end_offset_sec': v.end_offset_sec, 'embedding_scope': v.embedding_scope }) return embeddings, task_result def ingest_data(video_file_path, index_name = "twelve-labs"): """ Generate embeddings for video and store in Pinecone """ #Strip the extension and the folders from the video_file_path video_name = os.path.splitext(os.path.basename(video_file_path))[0] print(video_name) # Connect to Pinecone index if index_name not in pc.list_indexes().names(): pc.create_index( name=index_name, dimension=1024, # The dimensions of Twelve Lab's Embedding Model metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) index = pc.Index(index_name) # Generate embeddings using Twelve Labs Embed API embeddings, task_result = generate_embedding(video_file_path) # Prepare vectors for upsert vectors_to_upsert = [] for i, emb in enumerate(embeddings): vector_id = f"{video_name}_{i}" vectors_to_upsert.append((vector_id, emb['embedding'], { 'video_file': video_name, 'video_segment': i, 'start_time': emb['start_offset_sec'], 'end_time': emb['end_offset_sec'], 'scope': emb['embedding_scope'] })) # Upsert embeddings to Pinecone index.upsert(vectors=vectors_to_upsert) return f"Ingested {len(embeddings)} embeddings for {video_file_path}"
そして、これら2つの関数を使用して、動画の埋め込みをPineconeにロードします。
# Example usage result = ingest_data(single_video) print(result)
このコードにより、Twelve LabsのEmbed APIを使用して動画のマルチモーダル埋め込みを生成し、後で検索できるようにPineconeに保存できます。この埋め込みは、視覚情報、音声情報、テキスト情報を含む動画コンテンツのさまざまな側面を捉えているため、幅広いAIアプリケーションに適しています。
テキストクエリによる検索
次に、Twelve LabsのMarengoモデルを使用してテキストを埋め込み、Pineconeデータベースから類似したコンテンツを検索するための関数を設定します。
get_text_embedding関数は、Twelve Labs Embed APIを使ってテキストクエリを埋め込みベクトルに変換します。twelvelabs_client.embed.createメソッドを使用して、指定されたテキストの埋め込みを生成します。engine_nameパラメータには、使用する埋め込みモデル(“Marengo-retrieval-2.6”)を指定します。text_truncateパラメータは “start” に設定されており、テキストが長すぎる場合は、先頭から切り捨てられます。
retrieve_similar_content関数は、コンテンツ検索を実行するメインの関数です。テキストクエリと、返される結果の数(
top_k)をパラメータとして受け取ります。get_text_embeddingを呼び出して、テキストクエリを埋め込みに変換します。twelve-labsという名前のPineconeインデックスに接続します。クエリ埋め込みに類似したベクトルをPineconeインデックスに対して照会し、返す結果の数を指定して、メタデータを含めて取得します。
この検索プロセスは、テキストクエリの埋め込みと、Pineconeに保存されている動画セグメントの事前に計算された埋め込みとを比較することによって動作します。これにより、大規模な動画データセット全体に対して、高速かつ効率的な類似性検索が可能になります。
def get_text_embedding(text_query): # Twelve Labs Embed API supports text-to-embedding text_embedding = twelvelabs_client.embed.create( engine_name="Marengo-retrieval-2.6", text=text_query, text_truncate="start" ) return text_embedding.text_embedding.float def retrieve_similar_content(query, index_name="twelve-labs", top_k=5): """ Retrieve similar content based on query embedding """ # Generate query embedding query_embedding = get_text_embedding(query) # Connect to Pinecone index index = pc.Index(index_name) # Query Pinecone for similar vectors results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True) return results
これで、サンプルのテキストクエリを使用してretrieve_similar_content関数を実行し、クエリと見つかった上位の類似コンテンツの詳細を出力することができます。
# Example usage text_query = "Lipstick" similar_content = retrieve_similar_content(text_query) print(f"Query: '{text_query}'") print(f"Top {len(similar_content['matches'])} similar content:") for i, match in enumerate(similar_content['matches']): print(f"{i+1}. Score: {match['score']:.4f}") print(f" Video File: {match['metadata']['video_file']}") print(f" Video ID: {match['metadata']['video_segment']}") print(f" Time range: {match['metadata']['start_time']} - {match['metadata']['end_time']} seconds") print(f" Scope: {match['metadata']['scope']}") print()
このコードを使用すると、テキストクエリを使って動画コンテンツに対してセマンティック検索を実行できます。Twelve Labsのマルチモーダル埋め込みの力を活用して、正確な単語が動画に含まれていなくても、テキストクエリと意味的に類似している動画セグメントを見つけ出します。
このコードを実行すると、類似度スコア、動画ファイル、動画ID、タイムレンジ、スコープとともに、最も一致する動画セグメントが上位に表示されます。これにより、動画検索やコンテンツのレコメンデーションなど、さまざまなアプリケーションの実装が可能になります。
動画の切り出しフォーマット設定
データベース内の動画埋め込みとそれらを照会する機能によって、最初の実験では、これらの埋め込みをフル動画ではなく特定の短い動画クリップにリンクさせることを目指します。埋め込みモデルがタイムスタンプを処理する方法と同様に、動画を分割していきます。
以下のsplit_video関数は、avライブラリを使用して、指定された長さの短いセグメントに動画ファイルを分割します。簡単な説明は以下の通りです。
この関数は、入力動画パス、出力ディレクトリ、およびセグメント期間(デフォルトは6秒)をパラメータとして受け取ります。
入力動画を開き、動画のフレームレートに基づいてセグメントあたりのフレーム数を計算し、動画フレームをループ処理します。
各セグメントに対して新しい出力コンテナを作成し、そこにフレームを書き込み、フレームのタイムスタンプを調整します。
分割されたセグメントは、連番の名前が付けられた個別のMP4ファイルとして出力ディレクトリに保存されます。
import av def split_video(input_path, output_dir, segment_duration=6): # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) input_file_name = os.path.splitext(os.path.basename(input_path))[0] print(input_file_name) with av.open(input_path) as input_container: # Get video stream input_stream = input_container.streams.video[0] fps = input_stream.average_rate # Calculate how many frames are in each segment frames_per_segment = int(segment_duration * fps) segment_count = 0 frame_count = 0 output_container = None output_stream = None first_frame_timestamp = None for frame in input_container.decode(video=0): if frame_count % frames_per_segment == 0: # Close previous output container if it exists if output_container: output_container.close() # Create a new output container output_path = os.path.join(output_dir, f'{input_file_name}_segment_{segment_count:03d}.mp4') segment_count += 1 output_container = av.open(output_path, mode='w') output_stream = output_container.add_stream('h264', rate=fps) output_stream.width = frame.width output_stream.height = frame.height output_stream.pix_fmt = 'yuv420p' # Reset the first frame timestamp for the new segment first_frame_timestamp = frame.pts # Adjust the frame timestamp frame.pts -= first_frame_timestamp # Encode frame packet = output_stream.encode(frame) output_container.mux(packet) frame_count += 1 # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the last output container if output_container: output_container.close() split_video(input_path=single_video, output_dir=split_video_dir)
クエリの設定
生成モデルとの対話を開始する準備がすべて整いました。クエリを定義し、関連するコンテンツを取得しましょう。
query = "What is this advertisement selling?" similar_content = retrieve_similar_content(query)
Pegasusを使用した動画クリップとのチャット
Pegasus-1モデルを使用するためには、以下の3つのステップが必要です。
Twelve Labsに動画をホストするインデックスをセットアップする(これは一度だけ実行すれば十分です)
動画をTwelve Labsにアップロードする(これは動画ごとに一度だけ実行すれば十分です)
プロンプトと動画を指定してPegasusにクエリを投げる
まず、インデックスをセットアップします。
engines = [ { "name": "pegasus1.1", "options": ["visual", "conversation"] } ] index_name = "ads_index" indices_list = twelvelabs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelvelabs_client.index.create( name=index_name, engines=engines, ) print(f"A new index has been created: id={index.id} name={index.name} engines={index.engines}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} engines={index.engines}")
次に、使いやすくするためにアップロードロジックを設定します。
def upload_video_to_twelve_labs(video_path): task = twelvelabs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifier of your video is {task.video_id}.") return task.video_id
これで、分割動画の全ディレクトリをループ処理し、Twelve Labsのインデックスにアップロードできます。
video_ids = {} for split_video_filename in os.listdir(split_video_dir): split_video_path = os.path.join(split_video_dir, split_video_filename) print(split_video_path) split_video_name = split_video_filename.split('.')[0] print(split_video_name) video_id = upload_video_to_twelve_labs(split_video_path) video_ids[split_video_name] = video_id print(video_ids)
Pegasusの呼び出し
あとは、検索結果を実際の動画クリップに関連付けた後、シンプルなクエリを送信するだけです。
# retrieve the correct video_id for the relevant video video_segment = (int) (similar_content['matches'][0]['metadata']['video_segment']) print(f"Retrieved video segment: {video_segment}") base_filename = os.path.splitext(os.path.basename(single_video))[0] video_key = f"{base_filename}_segment_{video_segment:03d}" video_id = video_ids[video_key] res = twelvelabs_client.generate.text( video_id=video_id, prompt=query ) print(f"{res.data}")
LLaVA-NeXT-Videoの使用
オープンソースモデルを使用する場合は、以下の作業を行う必要があります。
動画をnumpy形式に変換する
動画をモデルが処理可能なフレームのサブセットにサンプリングする
モデルをダウンロードし、独自のGPUでホストする
モデルのすべてのフォーマット処理とクエリの実行を行う
動画をnumpy形式に変換する
read_video_pyav()関数は、PyAVを使用して動画から特定のフレームをデコードします。
import av import numpy as np def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame)
動画のサンプリング
以下のコードブロックの説明は次の通りです。
get_total_frames(): 動画の総フレーム数を手動でカウントします(均一サンプリングで0フレームと誤判定される場合のフォールバックとして機能します)。sample_video(): 動画から指定された数のフレームを均一にサンプリングします。process_videos_in_folder(): 指定されたフォルダ内のすべての動画を処理し、それぞれからフレームをサンプリングします。
def get_total_frames(video_path): """ Manually count the total number of frames in a video. Used in case uniformly sampling comes up as 0 frames. """ container = av.open(video_path) video_stream = container.streams.video[0] total_frames = 0 for frame in container.decode(video_stream): total_frames += 1 return total_frames def sample_video(video_path, num_samples=8): container = av.open(video_path) video_stream = container.streams.video[0] # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames if total_frames == 0: total_frames = get_total_frames(video_path) indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames def process_videos_in_folder(folder_path): sample_info = {} # Supported video file extensions video_extensions = ('.mp4', '.avi', '.mov', '.mkv') for filename in os.listdir(folder_path): simple_video_name = os.path.splitext(os.path.basename(filename))[0] if filename.lower().endswith(video_extensions): video_path = os.path.join(folder_path, filename) try: print("Sampling " + video_path) sampled_clip = sample_video(video_path) sample_info[simple_video_name] = {"sampled_video": sampled_clip, "video_path" : video_path} except Exception as e: print(f"Error processing {filename}: {str(e)}") return sample_info sampled_video_info = process_videos_in_folder(split_video_dir)
モデルのロード
ここでは、Hugging Face Transformersライブラリを使用して、LLaVA-NeXT-Videoモデルとそのプロセッサを初期化します。特に、メモリの効率的な使用のために4ビット量子化(4-bit quantization)を適用しています。
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
モデルへの質問
この関数を使用して、Pineconeクエリから返された実際のファイル名からセグメント番号を抽出します。video_segment_name_from_offset()関数は、動画パスと開始時間に基づいてセグメント名を生成し、'similar_content'辞書のメタデータに基づいて特定の動画セグメントを取得します。
def video_segment_name_from_offset(video_path, start_time, segment_length = 6): segment_number = int (start_time // segment_length) simple_video_name = os.path.splitext(os.path.basename(video_path))[0] return f"{simple_video_name}_segment_{segment_number:03d}"
これで、サンプリングされた動画を取得して、モデルへのリクエスト形式を整えることができます。まず、テキストと動画コンテンツの両方を含む会話入力(conversation input)を準備します。次に、LLaVA-NeXT-Videoプロセッサを使用して入力を処理します。最後に、指定されたパラメータを適用してモデルで応答を生成します。
video_segment = similar_content['matches'][0]['metadata']['video_file'] print(video_segment) video_offset = similar_content['matches'][0]['metadata']['start_time'] video_segment_name = video_segment_name_from_offset(video_segment, video_offset) video_segment = sampled_video_info[video_segment_name]['sampled_video'] # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) prompt_len = len(prompt) inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) print(generated_text[0])
比較
これらのモデルの出力を比較すると、LLaVA-NeXT-Videoと比較して、Pegasusからはより詳細で文脈を意識した回答が得られます。
しかし、両モデルとも動画のクリップしか与えられていないため、動画全体で何が起こっているかを理解することに苦労していることは明らかです。それでは、動画全体をモデルに提供するとどうなるかを見てみましょう。
複数動画の処理
ここからは、動画がランダムに保存された非構造化フォルダを扱い、それら全体について質問を投げます。先ほどまでの検索プロセスでは、特定のクエリに対して最も関連性の高い「クリップ」を見つけ出していました。ここでも同様のことを行いますが、今回はそのクリップに対応する「動画全体」をモデルに提供します。これは、Pineconeに保存されているメタデータを使用することで簡単に実現できます。
まず、動画をPineconeに取り込むことから始めます。
ads_dir = os.path.join(base_folder_path,"ad_vids") video_list = [] #Make sure that we' don't waste time re-embedding the original single video: single_video_filename = os.path.splitext(os.path.basename(single_video))[0] for filename in os.listdir(ads_dir): if filename.endswith(".mp4") and single_video_filename not in filename: video_list.append(ads_dir + "/" + filename) print(video_list) for video in video_list: ingest_data(video)
次に、データベースに投げかけるいくつかの質問を設定し、最も関連性の高い動画を検索します。
full_database_questions = ["Who is the actor in the Miss Dior video?", "What ad is Selena Gomez in?", "What is the ad for Rare Beauty about?", "Why should people buy the Rare Beauty product according to their ad?"] question = full_database_questions[0] similar_content_from_question = retrieve_similar_content(question) video_name = similar_content_from_question['matches'][0]['metadata']['video_file']
Pegasusの使用
次に、Python SDKを介してPegasusを使用するために必要な追加ステップについて説明します。今回はインデックスがすでにセットアップされているため、クエリを実行する前に動画をアップロードするだけで完了します。
Twelve Labsへの動画のアップロード
ディレクトリ内の動画をループ処理し、それぞれをTwelve Labsにアップロードして、生成された動画IDを保存します。
for vid in os.listdir(ads_dir): vid_path = os.path.join(ads_dir, vid) vid_name = os.path.splitext(os.path.basename(vid_path))[0] print(vid_path) video_id = upload_video_to_twelve_labs(vid_path) video_ids[vid_name] = video_id
動画データベースを指定したPegasusへのクエリ
次に、Twelve Labsクライアントを使用して、動画IDと質問プロンプトに基づきテキスト応答を生成します。
video_id = video_ids[video_name] res = twelvelabs_client.generate.text( video_id=video_id, prompt=question ) print(f"{res.data}")
動画データベースにおけるLLaVA-NeXT-Videoの使用
動画のサンプリング
まず、すべての動画をサンプリングして保存し、検索された動画の正しいサンプリングデータにアクセスする必要があります。
sampled_database_video_info = process_videos_in_folder(ads_dir) video_segment = sampled_database_video_info[video_name]['sampled_video']
モデルの実行
これで、サンプリングされた動画に対してモデルを実行できます。
まず、ユーザーのロール、テキスト(質問)、および動画コンテンツを含む会話構造を作成します。
次に、会話にチャットテンプレートを適用し、モデル向けのインプット(プロンプトと動画セグメントを含む)を準備し、応答作成時の生成パラメータ(
max_new_tokens、do_sample、top_p)を設定します。最後に、LLaVA-NeXT-Videoモデルを使用して入力に基づいてテキストを設定・生成し、出力をデコードして生成されたテキストをプリントします。
conversation = [ { "role": "user", "content": [ {"type": "text", "text": question}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) prompt_len = len(prompt) inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) print(generated_text[0])
比較
このクエリを2つのモデルで実行すると、Pegasusはナタリー・ポートマンが誰であるかを明確に理解し、動画内の彼女の存在を正確に認識していることがわかります。対照的に、LLaVA-NeXT-Videoモデルは、ナタリー・ポートマンを認識できないか、与えられたサンプリングフレームからは彼女を十分に捉えることができません。さらに、このオープンソースモデルは話が脱線する傾向があり、その結果、応答が長くなり、レイテンシが増大します。これは本番環境でのユースケースにおいては潜在的な懸念事項となり得ます。
まとめ
このガイドでは、動画を個別にあるいは完全なセットとして操作する方法について説明しました。検索管理には、Twelve LabsのEmbed APIとPineconeのベクトルデータベースを活用しました。
また、Twelve LabsのPegasusモデルと、オープンソースモデルであるLLaVA-NeXT-Videoを比較し、必要なインフラストラクチャ、開発者体験、およびクエリ結果の観点から評価しました。Pegasusは、オープンソースモデルと比較して、運用上のオーバーヘッドが少なく、指示に従う精度が高いため、非常に有望であることを示しました。
ベストプラクティス
専用ホストを使用する場合は、PineconeのPodベースのプランの利用をご検討ください。
オープンソースの動画モデルが処理するフレーム数と、その精度、および生成におけるレイテンシのトレードオフを慎重に考慮してください。
可能であれば、推論を高速化するためにオープンソースモデルを量子化してください。特定のニーズを満たすために、RAM使用量、速度、および品質の間で推論時間のトレードオフを実験してみるのが良いでしょう。
次のステップ
はるかに大規模な動画セットから検索する場合、検索メカニズムの精度が低下する可能性があります。いくつかの解決策として、以下の手段が挙げられます。
データにさらに適合させるために、埋め込みベクトルの上に線形アダプターをトレーニングさせる。
異なる動画からのクリップが返された場合に、Pegasusを使用して動画を再ランク付け(Re-rank)する。
Pineconeのエントリに各動画のサマリー(要約)データを追加し、ハイブリッド検索システムを構築することで、Pineconeのメタデータ機能を利用して精度を向上させる。
付録
参照およびさらなる探索のためのリソース一覧です。
完成版のColab Notebook
Pineconeのクライアント、およびクラウド用のドキュメント(英語)
このチュートリアルの共同制作にご協力いただいたPineconeチーム(Adam Heerwagen氏およびCory Waddingham氏)に深く感謝いたします。
はじめに
このチュートリアルでは、動画に対するRAGベースのQ&Aを実現するために、Twelve LabsのEmbed APIとホスト型ベクトルデータベースのPineconeを統合する方法について説明します。このガイドでは、生成モデルを使用して、非構造化動画データベースからテキストの回答を抽出する方法を学びます。
Twelve Labsの豊かでコンテキストを考慮した埋め込み(embeddings)とPineconeのベクトルデータベースを組み合わせることで、これら動画の埋め込みを保存、インデックス化、クエリし、チャットアプリケーションを構築します。このノートブックでは、わずか数行のコードでこれらのテクノロジーがどのように実現できるかを紹介します。
比較として、テキスト応答を生成するためにTwelve LabsのGenerate APIを使用する場合と、代表的なオープンソースモデルであるLLaVA-NeXT-Videoを使用する場合の開発者エクスペリエンス(DX)の違いも実演します。
セットアップとインストール
コア機能に入る前に、環境をセットアップし、必要なライブラリをインストールしましょう。
必要なライブラリのインストール
まず、Twelve LabsとPineconeのライブラリをインストールします。ノートブックのセルで次のコマンドを実行してください。
# Install required libraries !pip install twelvelabs pinecone-client
次に、動画フォーマット処理用のPyAV、およびHugging Faceでオープンソースモデルを使用するためのbitsandbytesとtransformersをインストールします。
!pip install -q av !pip install --upgrade -q accelerate bitsandbytes !pip install transformers
認証
Twelve Labs APIとPineconeのAPIキーを設定する必要があります。これらのキーを保存するために、Google Colabに組み込まれているuserdataライブラリを使用します。サインアップ後、Pineconeの情報はコンソールで確認できます。Pineconeは、このデモには十分すぎる無料のスターターティア(Starter Tier)を提供しています。
Twelve Labsのキーは、https://playground.twelvelabs.ioでサインアップした後、アカウント内で確認できます。
from google.colab import userdata TL_API_KEY=userdata.get('TL_API_KEY') PINECONE_API_KEY=userdata.get('PINECONE_API_KEY')
動画データの設定
次に、埋め込み対象となる動画データを取得する必要があります。動画データは、このリンクからGoogleドライブのフォルダ内で確認できます。これをGoogleドライブのマイドライブ直下にある「TwelveLabs-Pinecone」という名前のフォルダにコピーしてください。次のセルを使用してドライブをマウントし、ノートブックから動画ファイルにアクセスできるようにします。
from google.colab import drive drive.mount('/content/drive') base_folder_path = "/content/drive/MyDrive/TwelveLabs-Pinecone" single_video = base_folder_path + "/ad_vids/Rare Beauty By Selena Gomez - Makeup Made To Feel Good In.mp4" split_video_dir = base_folder_path + "/split_ad_videos"
クライアントのセットアップ
PineconeとTwelve Labsの設定を行う段階です。それぞれのAPIキーを使用して両方のサービスをインポートし、初期化します。
# Configure Pinecone from pinecone import Pinecone, ServerlessSpec pc = Pinecone(api_key=PINECONE_API_KEY) from twelvelabs import TwelveLabs from twelvelabs.models.embed import EmbeddingsTask # Initialize the Twelve Labs client twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)
埋め込み(Embeddings)の作成とPineconeへの取り込み
以下のコードブロックは、Twelve Labs APIとPineconeベクトルデータベースを使用して動画の埋め込みを生成・保存するプロセスを示しています。主に次の2つの関数を定義しています。
generate_embedding関数は、埋め込みタスクの作成と管理を担当します。指定された動画ファイルとエンジンを用いて、Twelve Labs APIで埋め込みタスクを作成します。
タスクの進行状況を監視するためのコールバック関数を定義します。
タスクの完了を待ち、結果を取得します。
最後に、タスク結果から埋め込みデータとメタデータ(タイムレンジおよびスコープ)を抽出します。
ingest_data関数は、データ取り込みを担当するメインの関数です。generate_embeddingを呼び出して、指定された動画ファイルの埋め込みを取得します。Pineconeのインデックス(この例では
twelve-labsという名前)に接続します。埋め込みデータとメタデータを整形し、アップサート(upsert)用のベクトルを準備します。
ベクトルをPineconeインデックスにアップサート(登録・更新)します。
このコードを実行すると、埋め込みタスクの処理に伴って進行状況が更新され、最終的にPineconeに取り込まれた埋め込みの数が確認メッセージとして表示されます。これにより、後でこれらの埋め込み検索を使用して動画コンテンツを検索・分析するための基盤が整います。
# Define a callback function to monitor task progress def on_task_update(task: EmbeddingsTask): print(f" Status={task.status}") def generate_embedding(video_file): # Create an embedding task task = twelvelabs_client.embed.task.create( engine_name="Marengo-retrieval-2.6", video_file=video_file ) print(f"Created task: id={task.id} engine_name={task.engine_name} status={task.status}") # Wait for the task to complete status = task.wait_for_done( sleep_interval=2, callback=on_task_update ) print(f"Embedding done: {status}") # Retrieve the task result task_result = twelvelabs_client.embed.task.retrieve(task.id) # Extract and return the embeddings embeddings = [] for v in task_result.video_embeddings: embeddings.append({ 'embedding': v.embedding.float, 'start_offset_sec': v.start_offset_sec, 'end_offset_sec': v.end_offset_sec, 'embedding_scope': v.embedding_scope }) return embeddings, task_result def ingest_data(video_file_path, index_name = "twelve-labs"): """ Generate embeddings for video and store in Pinecone """ #Strip the extension and the folders from the video_file_path video_name = os.path.splitext(os.path.basename(video_file_path))[0] print(video_name) # Connect to Pinecone index if index_name not in pc.list_indexes().names(): pc.create_index( name=index_name, dimension=1024, # The dimensions of Twelve Lab's Embedding Model metric="cosine", spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) index = pc.Index(index_name) # Generate embeddings using Twelve Labs Embed API embeddings, task_result = generate_embedding(video_file_path) # Prepare vectors for upsert vectors_to_upsert = [] for i, emb in enumerate(embeddings): vector_id = f"{video_name}_{i}" vectors_to_upsert.append((vector_id, emb['embedding'], { 'video_file': video_name, 'video_segment': i, 'start_time': emb['start_offset_sec'], 'end_time': emb['end_offset_sec'], 'scope': emb['embedding_scope'] })) # Upsert embeddings to Pinecone index.upsert(vectors=vectors_to_upsert) return f"Ingested {len(embeddings)} embeddings for {video_file_path}"
そして、これら2つの関数を使用して、動画の埋め込みをPineconeにロードします。
# Example usage result = ingest_data(single_video) print(result)
このコードにより、Twelve LabsのEmbed APIを使用して動画のマルチモーダル埋め込みを生成し、後で検索できるようにPineconeに保存できます。この埋め込みは、視覚情報、音声情報、テキスト情報を含む動画コンテンツのさまざまな側面を捉えているため、幅広いAIアプリケーションに適しています。
テキストクエリによる検索
次に、Twelve LabsのMarengoモデルを使用してテキストを埋め込み、Pineconeデータベースから類似したコンテンツを検索するための関数を設定します。
get_text_embedding関数は、Twelve Labs Embed APIを使ってテキストクエリを埋め込みベクトルに変換します。twelvelabs_client.embed.createメソッドを使用して、指定されたテキストの埋め込みを生成します。engine_nameパラメータには、使用する埋め込みモデル(“Marengo-retrieval-2.6”)を指定します。text_truncateパラメータは “start” に設定されており、テキストが長すぎる場合は、先頭から切り捨てられます。
retrieve_similar_content関数は、コンテンツ検索を実行するメインの関数です。テキストクエリと、返される結果の数(
top_k)をパラメータとして受け取ります。get_text_embeddingを呼び出して、テキストクエリを埋め込みに変換します。twelve-labsという名前のPineconeインデックスに接続します。クエリ埋め込みに類似したベクトルをPineconeインデックスに対して照会し、返す結果の数を指定して、メタデータを含めて取得します。
この検索プロセスは、テキストクエリの埋め込みと、Pineconeに保存されている動画セグメントの事前に計算された埋め込みとを比較することによって動作します。これにより、大規模な動画データセット全体に対して、高速かつ効率的な類似性検索が可能になります。
def get_text_embedding(text_query): # Twelve Labs Embed API supports text-to-embedding text_embedding = twelvelabs_client.embed.create( engine_name="Marengo-retrieval-2.6", text=text_query, text_truncate="start" ) return text_embedding.text_embedding.float def retrieve_similar_content(query, index_name="twelve-labs", top_k=5): """ Retrieve similar content based on query embedding """ # Generate query embedding query_embedding = get_text_embedding(query) # Connect to Pinecone index index = pc.Index(index_name) # Query Pinecone for similar vectors results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True) return results
これで、サンプルのテキストクエリを使用してretrieve_similar_content関数を実行し、クエリと見つかった上位の類似コンテンツの詳細を出力することができます。
# Example usage text_query = "Lipstick" similar_content = retrieve_similar_content(text_query) print(f"Query: '{text_query}'") print(f"Top {len(similar_content['matches'])} similar content:") for i, match in enumerate(similar_content['matches']): print(f"{i+1}. Score: {match['score']:.4f}") print(f" Video File: {match['metadata']['video_file']}") print(f" Video ID: {match['metadata']['video_segment']}") print(f" Time range: {match['metadata']['start_time']} - {match['metadata']['end_time']} seconds") print(f" Scope: {match['metadata']['scope']}") print()
このコードを使用すると、テキストクエリを使って動画コンテンツに対してセマンティック検索を実行できます。Twelve Labsのマルチモーダル埋め込みの力を活用して、正確な単語が動画に含まれていなくても、テキストクエリと意味的に類似している動画セグメントを見つけ出します。
このコードを実行すると、類似度スコア、動画ファイル、動画ID、タイムレンジ、スコープとともに、最も一致する動画セグメントが上位に表示されます。これにより、動画検索やコンテンツのレコメンデーションなど、さまざまなアプリケーションの実装が可能になります。
動画の切り出しフォーマット設定
データベース内の動画埋め込みとそれらを照会する機能によって、最初の実験では、これらの埋め込みをフル動画ではなく特定の短い動画クリップにリンクさせることを目指します。埋め込みモデルがタイムスタンプを処理する方法と同様に、動画を分割していきます。
以下のsplit_video関数は、avライブラリを使用して、指定された長さの短いセグメントに動画ファイルを分割します。簡単な説明は以下の通りです。
この関数は、入力動画パス、出力ディレクトリ、およびセグメント期間(デフォルトは6秒)をパラメータとして受け取ります。
入力動画を開き、動画のフレームレートに基づいてセグメントあたりのフレーム数を計算し、動画フレームをループ処理します。
各セグメントに対して新しい出力コンテナを作成し、そこにフレームを書き込み、フレームのタイムスタンプを調整します。
分割されたセグメントは、連番の名前が付けられた個別のMP4ファイルとして出力ディレクトリに保存されます。
import av def split_video(input_path, output_dir, segment_duration=6): # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) input_file_name = os.path.splitext(os.path.basename(input_path))[0] print(input_file_name) with av.open(input_path) as input_container: # Get video stream input_stream = input_container.streams.video[0] fps = input_stream.average_rate # Calculate how many frames are in each segment frames_per_segment = int(segment_duration * fps) segment_count = 0 frame_count = 0 output_container = None output_stream = None first_frame_timestamp = None for frame in input_container.decode(video=0): if frame_count % frames_per_segment == 0: # Close previous output container if it exists if output_container: output_container.close() # Create a new output container output_path = os.path.join(output_dir, f'{input_file_name}_segment_{segment_count:03d}.mp4') segment_count += 1 output_container = av.open(output_path, mode='w') output_stream = output_container.add_stream('h264', rate=fps) output_stream.width = frame.width output_stream.height = frame.height output_stream.pix_fmt = 'yuv420p' # Reset the first frame timestamp for the new segment first_frame_timestamp = frame.pts # Adjust the frame timestamp frame.pts -= first_frame_timestamp # Encode frame packet = output_stream.encode(frame) output_container.mux(packet) frame_count += 1 # Flush the encoder packet = output_stream.encode(None) output_container.mux(packet) # Close the last output container if output_container: output_container.close() split_video(input_path=single_video, output_dir=split_video_dir)
クエリの設定
生成モデルとの対話を開始する準備がすべて整いました。クエリを定義し、関連するコンテンツを取得しましょう。
query = "What is this advertisement selling?" similar_content = retrieve_similar_content(query)
Pegasusを使用した動画クリップとのチャット
Pegasus-1モデルを使用するためには、以下の3つのステップが必要です。
Twelve Labsに動画をホストするインデックスをセットアップする(これは一度だけ実行すれば十分です)
動画をTwelve Labsにアップロードする(これは動画ごとに一度だけ実行すれば十分です)
プロンプトと動画を指定してPegasusにクエリを投げる
まず、インデックスをセットアップします。
engines = [ { "name": "pegasus1.1", "options": ["visual", "conversation"] } ] index_name = "ads_index" indices_list = twelvelabs_client.index.list(name=index_name) if len(indices_list) == 0: index = twelvelabs_client.index.create( name=index_name, engines=engines, ) print(f"A new index has been created: id={index.id} name={index.name} engines={index.engines}") else: index = indices_list[0] print(f"Index already exists: id={index.id} name={index.name} engines={index.engines}")
次に、使いやすくするためにアップロードロジックを設定します。
def upload_video_to_twelve_labs(video_path): task = twelvelabs_client.task.create( index_id=index.id, file = video_path ) print(f"Task created: id={task.id} status={task.status}") task.wait_for_done(sleep_interval=5, callback=on_task_update) if task.status != "ready": raise RuntimeError(f"Indexing failed with status {task.status}") print(f"The unique identifier of your video is {task.video_id}.") return task.video_id
これで、分割動画の全ディレクトリをループ処理し、Twelve Labsのインデックスにアップロードできます。
video_ids = {} for split_video_filename in os.listdir(split_video_dir): split_video_path = os.path.join(split_video_dir, split_video_filename) print(split_video_path) split_video_name = split_video_filename.split('.')[0] print(split_video_name) video_id = upload_video_to_twelve_labs(split_video_path) video_ids[split_video_name] = video_id print(video_ids)
Pegasusの呼び出し
あとは、検索結果を実際の動画クリップに関連付けた後、シンプルなクエリを送信するだけです。
# retrieve the correct video_id for the relevant video video_segment = (int) (similar_content['matches'][0]['metadata']['video_segment']) print(f"Retrieved video segment: {video_segment}") base_filename = os.path.splitext(os.path.basename(single_video))[0] video_key = f"{base_filename}_segment_{video_segment:03d}" video_id = video_ids[video_key] res = twelvelabs_client.generate.text( video_id=video_id, prompt=query ) print(f"{res.data}")
LLaVA-NeXT-Videoの使用
オープンソースモデルを使用する場合は、以下の作業を行う必要があります。
動画をnumpy形式に変換する
動画をモデルが処理可能なフレームのサブセットにサンプリングする
モデルをダウンロードし、独自のGPUでホストする
モデルのすべてのフォーマット処理とクエリの実行を行う
動画をnumpy形式に変換する
read_video_pyav()関数は、PyAVを使用して動画から特定のフレームをデコードします。
import av import numpy as np def read_video_pyav(container, indices): ''' Decode the video with PyAV decoder. Args: container (av.container.input.InputContainer): PyAV container. indices (List[int]): List of frame indices to decode. Returns: np.ndarray: np array of decoded frames of shape (num_frames, height, width, 3). ''' frames = [] container.seek(0) start_index = indices[0] end_index = indices[-1] for i, frame in enumerate(container.decode(video=0)): if i > end_index: break if i >= start_index and i in indices: frames.append(frame)
動画のサンプリング
以下のコードブロックの説明は次の通りです。
get_total_frames(): 動画の総フレーム数を手動でカウントします(均一サンプリングで0フレームと誤判定される場合のフォールバックとして機能します)。sample_video(): 動画から指定された数のフレームを均一にサンプリングします。process_videos_in_folder(): 指定されたフォルダ内のすべての動画を処理し、それぞれからフレームをサンプリングします。
def get_total_frames(video_path): """ Manually count the total number of frames in a video. Used in case uniformly sampling comes up as 0 frames. """ container = av.open(video_path) video_stream = container.streams.video[0] total_frames = 0 for frame in container.decode(video_stream): total_frames += 1 return total_frames def sample_video(video_path, num_samples=8): container = av.open(video_path) video_stream = container.streams.video[0] # sample uniformly num_samples frames from the video total_frames = container.streams.video[0].frames if total_frames == 0: total_frames = get_total_frames(video_path) indices = np.arange(0, total_frames, total_frames / num_samples).astype(int) sampled_frames = read_video_pyav(container, indices) return sampled_frames def process_videos_in_folder(folder_path): sample_info = {} # Supported video file extensions video_extensions = ('.mp4', '.avi', '.mov', '.mkv') for filename in os.listdir(folder_path): simple_video_name = os.path.splitext(os.path.basename(filename))[0] if filename.lower().endswith(video_extensions): video_path = os.path.join(folder_path, filename) try: print("Sampling " + video_path) sampled_clip = sample_video(video_path) sample_info[simple_video_name] = {"sampled_video": sampled_clip, "video_path" : video_path} except Exception as e: print(f"Error processing {filename}: {str(e)}") return sample_info sampled_video_info = process_videos_in_folder(split_video_dir)
モデルのロード
ここでは、Hugging Face Transformersライブラリを使用して、LLaVA-NeXT-Videoモデルとそのプロセッサを初期化します。特に、メモリの効率的な使用のために4ビット量子化(4-bit quantization)を適用しています。
from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor import torch quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf") model = LlavaNextVideoForConditionalGeneration.from_pretrained( "llava-hf/LLaVA-NeXT-Video-7B-hf", quantization_config=quantization_config, device_map='auto' )
モデルへの質問
この関数を使用して、Pineconeクエリから返された実際のファイル名からセグメント番号を抽出します。video_segment_name_from_offset()関数は、動画パスと開始時間に基づいてセグメント名を生成し、'similar_content'辞書のメタデータに基づいて特定の動画セグメントを取得します。
def video_segment_name_from_offset(video_path, start_time, segment_length = 6): segment_number = int (start_time // segment_length) simple_video_name = os.path.splitext(os.path.basename(video_path))[0] return f"{simple_video_name}_segment_{segment_number:03d}"
これで、サンプリングされた動画を取得して、モデルへのリクエスト形式を整えることができます。まず、テキストと動画コンテンツの両方を含む会話入力(conversation input)を準備します。次に、LLaVA-NeXT-Videoプロセッサを使用して入力を処理します。最後に、指定されたパラメータを適用してモデルで応答を生成します。
video_segment = similar_content['matches'][0]['metadata']['video_file'] print(video_segment) video_offset = similar_content['matches'][0]['metadata']['start_time'] video_segment_name = video_segment_name_from_offset(video_segment, video_offset) video_segment = sampled_video_info[video_segment_name]['sampled_video'] # Each "content" is a list of dicts and you can add image/video/text modalities conversation = [ { "role": "user", "content": [ {"type": "text", "text": query}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) prompt_len = len(prompt) inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) print(generated_text[0])
比較
これらのモデルの出力を比較すると、LLaVA-NeXT-Videoと比較して、Pegasusからはより詳細で文脈を意識した回答が得られます。
しかし、両モデルとも動画のクリップしか与えられていないため、動画全体で何が起こっているかを理解することに苦労していることは明らかです。それでは、動画全体をモデルに提供するとどうなるかを見てみましょう。
複数動画の処理
ここからは、動画がランダムに保存された非構造化フォルダを扱い、それら全体について質問を投げます。先ほどまでの検索プロセスでは、特定のクエリに対して最も関連性の高い「クリップ」を見つけ出していました。ここでも同様のことを行いますが、今回はそのクリップに対応する「動画全体」をモデルに提供します。これは、Pineconeに保存されているメタデータを使用することで簡単に実現できます。
まず、動画をPineconeに取り込むことから始めます。
ads_dir = os.path.join(base_folder_path,"ad_vids") video_list = [] #Make sure that we' don't waste time re-embedding the original single video: single_video_filename = os.path.splitext(os.path.basename(single_video))[0] for filename in os.listdir(ads_dir): if filename.endswith(".mp4") and single_video_filename not in filename: video_list.append(ads_dir + "/" + filename) print(video_list) for video in video_list: ingest_data(video)
次に、データベースに投げかけるいくつかの質問を設定し、最も関連性の高い動画を検索します。
full_database_questions = ["Who is the actor in the Miss Dior video?", "What ad is Selena Gomez in?", "What is the ad for Rare Beauty about?", "Why should people buy the Rare Beauty product according to their ad?"] question = full_database_questions[0] similar_content_from_question = retrieve_similar_content(question) video_name = similar_content_from_question['matches'][0]['metadata']['video_file']
Pegasusの使用
次に、Python SDKを介してPegasusを使用するために必要な追加ステップについて説明します。今回はインデックスがすでにセットアップされているため、クエリを実行する前に動画をアップロードするだけで完了します。
Twelve Labsへの動画のアップロード
ディレクトリ内の動画をループ処理し、それぞれをTwelve Labsにアップロードして、生成された動画IDを保存します。
for vid in os.listdir(ads_dir): vid_path = os.path.join(ads_dir, vid) vid_name = os.path.splitext(os.path.basename(vid_path))[0] print(vid_path) video_id = upload_video_to_twelve_labs(vid_path) video_ids[vid_name] = video_id
動画データベースを指定したPegasusへのクエリ
次に、Twelve Labsクライアントを使用して、動画IDと質問プロンプトに基づきテキスト応答を生成します。
video_id = video_ids[video_name] res = twelvelabs_client.generate.text( video_id=video_id, prompt=question ) print(f"{res.data}")
動画データベースにおけるLLaVA-NeXT-Videoの使用
動画のサンプリング
まず、すべての動画をサンプリングして保存し、検索された動画の正しいサンプリングデータにアクセスする必要があります。
sampled_database_video_info = process_videos_in_folder(ads_dir) video_segment = sampled_database_video_info[video_name]['sampled_video']
モデルの実行
これで、サンプリングされた動画に対してモデルを実行できます。
まず、ユーザーのロール、テキスト(質問)、および動画コンテンツを含む会話構造を作成します。
次に、会話にチャットテンプレートを適用し、モデル向けのインプット(プロンプトと動画セグメントを含む)を準備し、応答作成時の生成パラメータ(
max_new_tokens、do_sample、top_p)を設定します。最後に、LLaVA-NeXT-Videoモデルを使用して入力に基づいてテキストを設定・生成し、出力をデコードして生成されたテキストをプリントします。
conversation = [ { "role": "user", "content": [ {"type": "text", "text": question}, {"type": "video"}, ], }, ] prompt = processor.apply_chat_template(conversation, add_generation_prompt=True) prompt_len = len(prompt) inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device) generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9} output = model.generate(**inputs, **generate_kwargs) generated_text = processor.batch_decode(output, skip_special_tokens=True) print(generated_text[0])
比較
このクエリを2つのモデルで実行すると、Pegasusはナタリー・ポートマンが誰であるかを明確に理解し、動画内の彼女の存在を正確に認識していることがわかります。対照的に、LLaVA-NeXT-Videoモデルは、ナタリー・ポートマンを認識できないか、与えられたサンプリングフレームからは彼女を十分に捉えることができません。さらに、このオープンソースモデルは話が脱線する傾向があり、その結果、応答が長くなり、レイテンシが増大します。これは本番環境でのユースケースにおいては潜在的な懸念事項となり得ます。
まとめ
このガイドでは、動画を個別にあるいは完全なセットとして操作する方法について説明しました。検索管理には、Twelve LabsのEmbed APIとPineconeのベクトルデータベースを活用しました。
また、Twelve LabsのPegasusモデルと、オープンソースモデルであるLLaVA-NeXT-Videoを比較し、必要なインフラストラクチャ、開発者体験、およびクエリ結果の観点から評価しました。Pegasusは、オープンソースモデルと比較して、運用上のオーバーヘッドが少なく、指示に従う精度が高いため、非常に有望であることを示しました。
ベストプラクティス
専用ホストを使用する場合は、PineconeのPodベースのプランの利用をご検討ください。
オープンソースの動画モデルが処理するフレーム数と、その精度、および生成におけるレイテンシのトレードオフを慎重に考慮してください。
可能であれば、推論を高速化するためにオープンソースモデルを量子化してください。特定のニーズを満たすために、RAM使用量、速度、および品質の間で推論時間のトレードオフを実験してみるのが良いでしょう。
次のステップ
はるかに大規模な動画セットから検索する場合、検索メカニズムの精度が低下する可能性があります。いくつかの解決策として、以下の手段が挙げられます。
データにさらに適合させるために、埋め込みベクトルの上に線形アダプターをトレーニングさせる。
異なる動画からのクリップが返された場合に、Pegasusを使用して動画を再ランク付け(Re-rank)する。
Pineconeのエントリに各動画のサマリー(要約)データを追加し、ハイブリッド検索システムを構築することで、Pineconeのメタデータ機能を利用して精度を向上させる。
付録
参照およびさらなる探索のためのリソース一覧です。
完成版のColab Notebook
Pineconeのクライアント、およびクラウド用のドキュメント(英語)




