パートナーシップ

マルチモーダルRAG:TwelveLabsとChromaを使用した動画とのチャット

ジェームズ・リー

開発者は、Twelve LabsのEmbed(埋め込み)APIとChromaを使用してマルチモーダルRAGシステムを構築できます。これにより、ビデオの埋め込みの生成、セマンティック検索による関連クリップの取得、そしてPegasusを使用したビデオコンテンツとのチャットが可能になります。また、オープンソースのLLaVA-NeXT-Videoモデルとの直接的な比較も含まれています。

開発者は、Twelve LabsのEmbed(埋め込み)APIとChromaを使用してマルチモーダルRAGシステムを構築できます。これにより、ビデオの埋め込みの生成、セマンティック検索による関連クリップの取得、そしてPegasusを使用したビデオコンテンツとのチャットが可能になります。また、オープンソースのLLaVA-NeXT-Videoモデルとの直接的な比較も含まれています。

この記事の内容

No headings found on page

ニュースレターに登録する

ニュースレターに登録する

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

AIを活用してビデオを検索、分析、探索します。

2025/03/21

20分

記事へのリンクをコピー

ドラフトをレビューしてくださったChromaチームのJeff Huber氏とItai Smith氏に深く感謝いたします!



動画を対象としたRAGベースのQ&Aを実現するために、 TwelveLabsのEmbed API と Chromaのベクトルデータベース を統合するチュートリアルへようこそ。このガイドでは、生成モデルを使用して、構造化されていない動画のデータベースからテキストの回答を抽出する方法を学びます。

私たちは、TwelveLabsの豊かで文脈に即した埋め込み(embeddings)Chromaのベクトルデータベースを組み合わせることで、これらの動画埋め込みを保存、インデックス登録、クエリし、チャットアプリケーションを作成します。このノートブックは、わずか数行のコードでこれらのテクノロジーがもたらす現在の可能性を実証します。

比較のために、テキスト回答を生成する TwelveLabsのGenerate API を使用する場合と、代表的なオープンソースモデルである LLaVA-NeXT-Videoを使用する場合における、開発者体験(Developer Experience)の違いも紹介します。



1 - 概要

このチュートリアルでは、TwelveLabs Marengoを使用して動画の埋め込みを作成し、Chromaを使用してそれらの埋め込みを保存およびクエリして関連する動画を見つけ、TwelveLabs PegasusとLLaVA-NeXT-Videoを使用して取得した動画とチャットする、いくつかの例を見ていきます。

行う内容は以下の通りです:

  1. TwelveLabs Marengoエンジンを使用した動画埋め込みの作成

  2. Chromaデータベースへの動画埋め込みの保存

  3. Chromaデータベース内の埋め込みをクエリして、関連する動画セグメントを検索

  4. TwelveLabs Pegasusを使用した、取得した動画セグメントとのチャット

  5. オープンソースモデルを使用した、取得した動画セグメントとのチャット

  6. Pegasusとオープンソースモデルの比較

  7. ChromaとTwelveLabsの埋め込みを使用した、複数動画の検索

  8. Pegasusを使用した、動画全体とのチャット

  9. オープンソースモデルを使用した、動画全体とのチャット



2 - セットアップとインストール



必要なライブラリのインストール

まず、TwelveLabsとChromaのSDKをインストールします。

# Twelve LabsとChromaのライブラリをインストール
!pip install --upgrade twelvelabs
!pip install --upgrade chromadb

次に、オープンソースモデルを実行するために使用するライブラリをインストールします。

# オープンソースモデルで使用するライブラリをインストール
!pip install protobuf==3.20.3
!pip install --upgrade -q accelerate bitsandbytes
!pip install git+https://github.com/huggingface/transformers.git
!pip install av

Colab以外の環境でこのチュートリアルを実行している場合は、以下のセルのコメントを解除して、使用する動画データを取り扱うためのライブラリをインストールしてください。

# Colab環境以外の場合に追加でインストールするもの
# !python -m pip install pillow
# !python -m pip install sentencepiece
# !python -m pip install matplotlib



3 - 動画データの準備

次に、動画データを準備します。



動画データの使用

このデモでは、TwelveLabsのGoogleドライブフォルダにある動画データを使用します。これを使用するには、フォルダをご自身のGoogleドライブにリンクし、そのGoogleドライブをこのColabにマウントする必要があります。



Googleドライブへのフォルダーのリンク

このリンクから誰でもフォルダーにアクセスできます:https://drive.google.com/drive/folders/1k6FmkVglFsdtJG4MTIK-2dk1Dk9gTPtu?usp=share_link

これをご自身のGoogleドライブの正しい場所にリンクするには:

  1. Googleドライブの「共有アイテム」に移動します。

  2. アクセスしたい共有フォルダを探します。

  3. 「整理」->「ショートカットを追加」を選択します。

  4. 保存先として「マイドライブ」を選択し、「追加」をクリックします。

これで、このフォルダーは /content/drive/MyDrive/TwelveLabs-Chroma でアクセス可能になります。



ドライブのマウント

ここで、ご自身のドライブをこのColabにマウントします。

from google.colab import drive
drive.mount('/content/drive')



動画パスの設定

次に、取り扱う動画のパスを設定します。これは、動画フォルダをどこにリンクしたかによって変更される場合があります。

video_folder_path = "/content/drive/MyDrive/TwelveLabs-Chroma/videos/"



動画解像度のアップスケール

動画の一部は埋め込みエンジンで使用するには解像度が低すぎるため、 upscale_video を使用して解像度を倍にします。

import numpy as np
import subprocess
import os

def upscale_video(input_file, output_path, target_width=854, target_height=480):

    output_file = os.path.join(output_path, os.path.basename(input_file))

    if os.path.exists(output_file):
        print(f"Skipping {input_file} as {output_file} already exists.")
        return
 

    """
    FFmpegを使用して、動画をターゲットの幅と高さにアップスケールします。

    Args:
        input_file (str): 入力動画ファイルへのパス。
        output_file (str): アップスケールされた動画を保存するパス。
        target_width (int): 希望する出力幅。デフォルトは 854。
        target_height (int): 希望する出力高さ。デフォルトは 480。
    """
    # 動画をアップスケールするためのFFmpegコマンド
    ffmpeg_command = [
        'ffmpeg',
        '-i', input_file,                              # 入力ファイル
        '-vf', f'scale={target_width}:{target_height}', # ターゲットの寸法を持つスケールフィルター
        '-c:a', 'copy',                                # 再エンコードなしでオーディオストリームをコピー
        output_file,                                    # 出力ファイル
        "-y"
    ]

    # FFmpegコマンドを実行
    subprocess.run(ffmpeg_command)

    print(f"Upscaled video saved to {output_file}")

まず、アップスケールされた動画を保存するパスを作成します。

upscaled_video_dir = video_folder_path + "upscaled_videos/"

次に、動画をアップスケールします。すでにアップスケールされてアップスケール動画フォルダに配置されている動画はスキップされます。

# すべての.mp4動画をアップスケール
# 出力ディレクトリが存在しない場合は作成
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# 生動画ディレクトリ内の全ファイルを反復処理
for filename in os.listdir(video_folder_path):
    # ファイルが動画ファイルであるかチェック
    input_filepath = os.path.join(video_folder_path, filename)
    if filename.endswith(".mp4"):
        upscale_video(input_filepath, upscaled_video_dir)
# 出力
...
Upscaled video saved to ../videos/upscaled_videos/How To Make Birria Tacos [4nIFJFgH99w].mp4
...
Upscaled video saved to ../videos/upscaled_videos/How To Make a McDonald's Cheeseburger [SvOx7tA_Cv8].mp4
...
Upscaled video saved to ../videos/upscaled_videos/How To Make Potato Wedges [eZXbMWPJkKQ]



4 - TwelveLabs Marengoエンジンを使用した動画埋め込みの作成

ここでは、TwelveLabs Marengoエンジンを使用して、動画の埋め込みを作成します。

まず、TwelveLabs APIキーを読み込むことから始めます。これは、Colabの左側パネルにある「シークレット」(鍵アイコン)をクリックし、「+新しいシークレットを追加」をクリックすることで、Colabのシークレットストアに保存できます。

from google.colab import userdata
TL_API_KEY=userdata.get('TL_API_KEY')

次に、TwelveLabsおよびChromaのクライアントを初期化します。このチュートリアルのChromaデータベースはローカルに保存されます。

from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask

# Twelve Labsクライアントを初期化
twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)

ここでは、実験に最適なEphemeral Client(一時的クライアント)を使用します。これはメモリ内で実行されますが、アプリケーションを閉じるとデータは保持されません。

import chromadb

# 一時的なChromaクライアントを初期化
chroma_client = chromadb.Client()

別のクライアント

データを永続化する必要がある場合は、2つのオプションがあります:

1 - 永続クライアント(Persistent Client): これにより、データベースがローカルディスクに保存およびロードされます。

# 永続クライアントのオプション
chroma_client = chromadb.PersistentClient(path="/path/to/save/to")

2 - HTTPクライアント(HTTP Client): これにより、別のプロセス(独自にデプロイされた環境、またはChroma Cloud上)で実行されているChromaサーバーに接続します。

# HTTPクライアント
chroma_client = chromadb.HttpClient(host='localhost', port=8000)



動画埋め込みの作成とChroma向けフォーマット

ここで、Marengoを使用して動画の埋め込みを作成し、Chroma向けにフォーマットします。Chromaにデータをアップロードするには、アップロードしたいすべてのデータ(embeddings、meta-datas、ids)に対して3つの明確に分かれたリストが必要になります。

def on_task_update(task: EmbeddingsTask):
    print(f"  Status={task.status}")

# 動画の埋め込みを作成し、Chroma向けにフォーマットします
def create_video_embeddings(client,video_file,segment_length,task_id=None):

    # Twelve Labsに動画が存在しない場合はアップロードします
    video_name = os.path.basename(video_file)

    if task_id == None or task_id == "":
        task = client.embed.task.create(
            engine_name="Marengo-retrieval-2.7",
            video_file=video_file,
            video_clip_length=segment_length
        )
        print(
            f"Created task: id={task.id} engine_name={task.engine_name} status={task.status}"
        )

        status = task.wait_for_done(
            sleep_interval=2,
            callback=on_task_update
        )

        print(f"Embedding done: {status}")

        task_id = task.id

    # 埋め込みを取得します
    task = client.embed.task.retrieve(task_id)
    print("task",task)

    # Chroma向けにフォーマットします
    embeddings = []
    metadatas = []
    ids = []

    idx = 0

    print("embeddings",task.video_embeddings)

    if task.video_embeddings is not None:
        for v in task.video_embeddings:

            metadata = {
                "embedding_scope":v.embedding_scope,
                "start_offset_sec":v.start_offset_sec,
                "end_offset_sec":v.end_offset_sec,
                "video_file":video_file,
                "video_name":video_name,
                "task_id":task.id,
                "video_segment_number":idx
            }


            embedding = v.values
            id = task.id + "_" + str(idx)

            metadatas.append(metadata)
            embeddings.append(embedding)
            ids.append(id)

            idx += 1

    return (ids,metadatas,embeddings,task_id)

次に、取り扱う動画を選択し、埋め込みセグメントの長さを6秒に設定します。各埋め込みは動画内の6秒間のセグメントを参照することになります。

# セグメントの長さと、処理対象の動画を設定します
segment_duration = 6
current_video_path = upscaled_video_dir + "How To Make Birria Tacos [4nIFJFgH99w].mp4"

続いて、Chromaにアップロードする埋め込みデータを取得します。また、TwelveLabsから埋め込みの task_id も取得します。この task_id を使用すれば、将来この動画の埋め込みを再取得できるため、同じ動画に対して埋め込みモデルを何度も実行する必要がなくなります。

# Chromaにアップロードする埋め込みを取得します

# すでにtask_idをお持ちの場合は設定し、そうでない場合は空文字を設定してください
task_id = ""
ids, metadatas, embeddings, task_id = create_video_embeddings(twelvelabs_client,current_video_path,segment_duration,task_id)

出力に task_id が表示されます。




5 - Chromaデータベースへの動画埋め込みの保存

レコードとベクトルを分かりやすいフォーマットにできたので、あとはこれらをChromaの新しいコレクション(または、すでに存在していれば既存のコレクション)に追加するだけです。

# Chromaコレクションを取得、または作成します
chroma_collection_name = "video_embeddings"
collection = chroma_client.get_or_create_collection(chroma_collection_name)

# 埋め込みとメタデータをコレクションに追加します
collection.add(
    metadatas = metadatas,
    embeddings = embeddings,
    ids=ids
)



6 - Chromaデータベース内の埋め込みをクエリして、関連する動画セグメントを検索



ベクトル検索のテスト

コレクションにすべてが収まったので、埋め込みクエリが機能することを確認しテストできます。最初に返された埋め込みで検索を行います。これ自身との間の距離(ディスタンス)は0になるはずです。

# 最初の埋め込みをテスト検索として使用します
test_segment_embeddings = embeddings[0]

results = collection.query(
    query_embeddings=[test_segment_embeddings],
    n_results=4
)

print("search embeddings for:",ids[0])
print("found:", results["ids"][0][0])
print("distance:",results["distances"][0][0])

# 最初の動画のテキスト埋め込みがそれ自体から距離 0 であることをアサート(確認)します
assert results["ids"][0][0] == ids[0]
assert results["distances"][0][0] == 0



ベクトルデータベースへのクエリ

次に、TwelveLabsの埋め込みとChromaのベクトルデータベースを組み合わせ、膨大な数の動画を効率的にクエリする威力を紹介します。

先ほど選択した動画は、ビリアタコス(birria tacos)のクッキングチュートリアルです。TwelveLabsとChromaを使用して、ビリアタコスを作るために必要な材料は何かを把握します。

ここでそのクエリを設定します。

query = "What are the ingredients for birria tacos?"(ビリアタコスの材料は何ですか?)

次に、TwelveLabs Marengoを使用してテキストクエリを埋め込み、その埋め込みを使用してChromaコレクションをクエリする関数を作成します。

import os

def query_chroma(collection,query,n_results=1):
    # クエリ用の埋め込みを作成します
    embedding = twelvelabs_client.embed.create(
        engine_name="Marengo-retrieval-2.7",
        text=query,
        text_truncate="start",
    )

    query_embeddings = embedding.text_embedding.float

    # クエリ埋め込みを使用してChromaデータベースを検索します

    response = collection.query(
        query_embeddings=query_embeddings,
        n_results=n_results,
    )


    return response

続いて、Chromaクエリを実行して、このクエリに最も良く答えているチュートリアルの6秒間のセグメントを見つけます。このチュートリアルの後半で、このセグメントをTwelveLabs Pegasusとオープンソースモデルに渡し、必要な材料などについて質問を投げかけます。

メタデータにはセグメントと元の動画に関する情報が含まれています。

response = query_chroma(collection,query)

# 最も類似しているオブジェクトのプロパティと距離を出力します
print(response["ids"][0][0])
print(response["distances"][0][0])
print(response["metadatas"][0][0])

# 次のステップで使用するために、見つかった動画セグメントのパスを取得します
found_video_metadata = response["metadatas"][0][0]
# 出力

672101d56025850d8c890d1c_6
1.3052971363067627
{'embedding_scope': 'clip', 
'end_offset_sec': 42.0, 
'start_offset_sec': 36.0, 
'task_id': '672101d56025850d8c890d1c', 
'video_file': '../videos/upscaled_videos/How To Make Birria Tacos [4nIFJFgH99w].mp4', 
'video_name': 'How To Make Birria Tacos [4nIFJFgH99w]



7 - 動画をセグメントに分割する

チュートリアル動画全体をPegasusにアップロードすることも可能ですが、ここでは前のステップで見つかった関連する6秒のセグメントのみをアップロードします。これにより、計算リソースを効率的に使用できます。

ここでは、処理対象の動画を埋め込み側のセグメントと一致する単位に分割し、分割後の動画フォルダ(split videos folder)に配置します。

split_video_dir = video_folder_path + "split_videos/"

def split_video(input_path, output_dir, segment_duration=6):

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    filename = os.path.splitext(os.path.basename(input_path))[0]
    filetype = os.path.splitext(os.path.basename(input_path))[1]

    # 動画をセグメントに分割します
    ffmpeg_command = [
        'ffmpeg',
        '-i', input_path,             # 入力動画ファイル
        '-c', 'copy',                  # 映像と音声の両コーデックをコピー
        '-f', 'segment',               # セグメントモード
        '-segment_time', str(segment_duration),  # セグメントの長さ
        '-reset_timestamps', '1',      # 各セグメントのタイムスタンプをリセット
        output_dir + filename + '_%03d' + filetype  # 出力ファイル名のパターン (例: output_001.mp4)
    ]

    # コマンドを実行します
    subprocess.run(ffmpeg_command)

    print("Video split into 6-second segments successfully.")

# 動画をセグメントに分割します
split_video(input_path=current_video_path, output_dir=split_video_dir, segment_duration=segment_duration)



8 - TwelveLabs Pegasusを使用した、取得した動画セグメントとのチャット

次のいくつかのセルでは、Pegasusを使って動画とチャットするのがどれほど簡単であるかをお見せします。あらかじめ必要なものはすべて揃っています。



Pegasusへの動画セグメントのアップロード

まず、アップロードする動画用のインデックスとPegasus Engineを作成し、それからアップロードします。

# pegasusのインデックスを作成、または取得します
engines = [
        {
            "name": "pegasus1.2",
            "options": ["visual", "conversation"]
        }
    ]

index_name = "cooking_video_index"
indices_list = twelvelabs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelvelabs_client.index.create(
        name=index_name,
        engines=engines,

    )
    print(f"A new index has been created: id={index.id} name={index.name} engines={index.engines}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} engines={index.engines}")



動画セグメントファイル名の取得

次に、Chromaクエリのメタデータを使用して、クエリに一致したセグメントを見つけます。

# 動画セグメントのファイル名を取得します
found_video_segment_number = int(found_video_metadata["video_segment_number"])
found_video_file = found_video_metadata["video_file"]
found_video_filename = os.path.splitext(os.path.basename(found_video_file))[0]
found_video_filetype = os.path.splitext(os.path.basename(found_video_file))[1]
found_video_segment_filename = found_video_filename + f"_{found_video_segment_number:03d}"

found_video_segment_path = split_video_dir + found_video_segment_filename + found_video_filetype
print(found_video_segment_path)



動画をPegasusにアップロードし、ビデオID(Video ID)を取得する

続いて、動画セグメントをTwelveLabs Pegasusにアップロードする関数を作成します。

この関数は、動画とのチャットで使用される video_id を返します。複数ターンの会話(チャット)に備えて、この video_id を保存しておくことができます。

def upload_video_to_twelve_labs(index,video_path):

    # 動画をtwelve labsのインデックスにアップロードします
    task = twelvelabs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")

    # video id を返します
    return task.video_id

もしこの動画セグメントのビデオIDをすでに持っている場合は、ここに設定できます。

# すでにvideo_idをお持ちの場合は設定し、そうでない場合は空文字を設定してください
video_id = ""

# Pegasusでチャットするためのvideo idを取得するために、動画をアップロードします
if video_id == "":
    video_id = upload_video_to_twelve_labs(index,found_video_segment_path)

出力に video_id が表示されます。




Pegasusの呼び出し

ここで、動画セグメントの検索に使用した同じクエリでクエリを発行します。TwelveLabsが舞台裏のボイラープレート(定型処理)をすべて処理するため、シンプルな関数でモデルを呼び出すことができます



9 - オープンソースモデルを使用した、取得した動画セグメントとのチャット

ここからは、オープンソースモデルを使用して同動画セグメントとチャットし、Pegasusと比較します。

まず、モデルが処理できるように自身で動画をサンプリングする必要があります。LLaVa-NeXT-Videoのサンプリングコードを修正し、各動画から均一に8フレームをサンプリングします。

これをフォルダー内にあるすべての動画セグメントに対して実行できます。

read_video_pyav は LLaVa-NeXT-Video Colab ノートブックから直接取得したものであり、推論のために動画を正しいnumpy表現にフォーマットします。

import av
def read_video_pyav(container, indices):
    '''
    PyAVデコーダーで動画をデコードします。

    Args:
        container (av.container.input.InputContainer): PyAVコンテナ。
        indices (List[int]): デコードするフレームインデックスのリスト。

    Returns:
        np.ndarray: デコードされたフレームのnumpy配列。形状は (num_frames, height, width, 3)。
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # 動画から均一に num_samples 枚のフレームをサンプリングします
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)

    sampled_frames = read_video_pyav(container, indices)

    return sampled_frames

def process_videos_in_folder(folder_path):
    sample_info = {}

    # サポートされている動画ファイルの拡張子
    video_extensions = ('.mp4', '.avi', '.mov', '.mkv')

    for filename in os.listdir(folder_path):
        simple_video_name = os.path.splitext(os.path.basename(filename))[0]
        if filename.lower().endswith(video_extensions):
            video_path = os.path.join(folder_path, filename)
            try:
                sampled_clip = sample_video(video_path)
                sample_info[simple_video_name] = {"sampled_video": sampled_clip, "video_path" : video_path}
            except Exception as e:
                print(f"Error processing {filename}: {str(e)}")

    return sample_info

sampled_video_info = process_videos_in_folder(split_video_dir)

# Chromaクエリで見つかった動画セグメントを取得します
video_segment = sampled_video_info[found_video_segment_filename]['sampled_video']



モデルの準備

推論速度を上げるために、モデルを4ビット量子化(quantization)で設定します。

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

# 後ほどノートブック内で直接動画を再生するために使用します

from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import HTML

# 形状が (frames, height, width, channels) のnumpy配列
# 確認用にランダムに1つ選択します
video = sampled_video_info[list(sampled_video_info.keys())[0]]['sampled_video']

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # 生成された画像を表示させないために必要です

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())



モデルの実行

これでクエリと関連動画の用意ができたので、これらをモデルにおくり、出力を得ることができます。

# 各「content」は辞書のリストになっており、画像/ビデオ/テキストのモダリティを追加できます
conversation = [
      {
          "role": "user",
          "content": [
              {"type": "text", "text": query},
              {"type": "video"},
              ],
      },
]

prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)

inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

output = model.generate(**inputs, **generate_kwargs)
open_source_segment_generated_text = processor.batch_decode(output, skip_special_tokens=True)

print(open_source_segment_generated_text[0])



10 - Pegasusとオープンソースモデルの比較

ご覧の通り、クエリ(分かりやすくするために再度出力しています)に対してPegasusの方が適切な回答を生成しています。

print(f"query {query}")
print("pegasus 回答")
print(segment_answer)
print("オープンソース 回答")
print(open_source_segment_generated_text[0])



11 - ChromaとTwelveLabsの埋め込みを使った、複数動画の検索

これまでに、TwelveLabs MarengoとChromaを使用して動画内の関連セグメントを検出する方法を解説しました。

今度は、RAGユースケースにおけるMarengoとChromaの真の強みをお見せします。大量の候補動画全体にクエリを発行し、クエリへの回答に必要な特定の動画を検出します。



動画データベースの埋め込み:

まず、すべての動画を埋め込み化し、それらをChromaに保存します:

# すべての動画の埋め込みを行い、タスクIDを保存します
chroma_collection_name = "video_embeddings"
collection = chroma_client.get_or_create_collection(chroma_collection_name)

続いて、将来TwelveLabsからこれらの埋め込みを再取得する場合に備えて、動画ごとにtask_idを保存しておくための辞書を作成します。

# 各動画のtwelve labsのタスクIDを保存します
task_ids = {}

次に、先ほど作成した create_video_embeddings 関数を使用し、各動画の埋め込みを取得してChromaコレクションにアップロードします。

# 各動画の埋め込みとメタデータを取得します
# 同じ動画を複数回アップロードしないように、タスクIDを保存します
for filename in os.listdir(upscaled_video_dir):

    if filename.endswith(".mp4"):

        if (filename in task_ids.keys()):
            task_id = task_ids[filename]
        else:
            task_id = None

        file_path = os.path.join(upscaled_video_dir, filename)

        ids, metadatas, embeddings, task_id = create_video_embeddings(twelvelabs_client,file_path,segment_duration,task_id)

        task_ids[filename] = task_id

        collection.add(
            metadatas = metadatas,
            embeddings = embeddings,
            ids=ids
        )
print(task_ids)



データベースのクエリ

ここで、以前と同様のクエリを実行し、クエリへの回答となる動画全体を検索します。

response = query_chroma(collection,query)
found_full_video_name = response["metadatas"][0][0]["video_name"]
print(found_full_video_name)



12 - Pegasusを使用した、動画全体とのチャット

すでにインデックスが作成されているので、インデックスに動画をアップロードしてPegasusを呼び出すだけです。

# 同じ動画を複数回アップロードしないように、pegasusのビデオIDを保存する辞書を用意します
pegasus_video_ids = {}

各動画をここにアップロードし、チャットで使用することになるビデオID(video ids)を保存します。

for upscaled_video in os.listdir(upscaled_video_dir):
    upscaled_video_path = os.path.join(upscaled_video_dir, upscaled_video)
    print(upscaled_video_path)
    if upscaled_video not in pegasus_video_ids:
        video_id = upload_video_to_twelve_labs(index,upscaled_video_path)
        pegasus_video_ids[upscaled_video] = video_id
print(pegasus_video_ids)



Pegasusを呼び出して動画全体とチャットする

まず、Chromaクエリにマッチした動画に紐付けられる video_id を特定します。

video_id = pegasus_video_ids[found_full_video_name]
print(video_id)

次に、Pegasusに対して、ビリアタコスの食材(材料)が何か尋ねます。以前使用したクエリと全く同様ですが、今回は6秒のセグメント単位ではなく、動画全体を相手にチャットを展開します。

res = twelvelabs_client.generate.text(
  video_id=video_id,
  prompt=query
)
full_video_answer = res.data
print(f"query {query}")
print(f"{full_video_answer}")



動画全体の回答とセグメントの回答を比較する

次に、これを6秒のセグメントとチャットしたときに返された回答と比較します。

print(f"セグメントでの回答: \n{segment_answer}")



13 - オープンソースモデルを使用した、動画全体とのチャット

ここからは、動画全体とのチャットにおける、Pegasusとオープンソースモデルの実力を比較します。

すべての動画に対して再度サンプリングを行った後に動画全体に対しモデルを実行すると、もう少し面白い回答が出力されます。

# すべての動画をサンプリングします:
sampled_database_video_info = process_videos_in_folder(upscaled_video_dir)

# 各「content」は辞書のリストになっており、画像/ビデオ/テキストのモダリティを追加できます
conversation = [
      {
          "role": "user",
          "content": [
              {"type": "text", "text": query},
              {"type": "video"},
              ],
      },
]

prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)

inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

output = model.generate(**inputs, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)

print(generated_text[0])



結果をPegasusの回答と比較する

ご覧の通り、オープンソースモデルは動画全体とチャットした際にも回答を生成することができず、Pegasusのパワーを示しています:

print(f"Pegasusの回答: \n{full_video_answer}")



14 - 比較

2つのモデルを比較すると、Pegasusが動画を明確に理解しており、クエリに対して正確な応答を返せることがよく分かります。この動画がビリアタコスのチュートリアルであることを認識しており、材料の一覧を特定できています。動画全体でも、動画の一部を切り出したセグメント単位でも同様の回答が可能となっています。

一方、LLaVA-NeXT-Videoのオープンソースモデルはクエリの内容自体は解釈できていますが、動画から必要な情報を抽出できていません。LLaVA-NeXT-Videoも、これが料理動画であることはある程度把握していますが、クエリで求めるレベルの深い理解には至っていません。



15 - 結論

このガイドでは、個別またはセット全体の動画コンテンツを取り扱う方法をご紹介しました。TwelveLabs Embed APIとChromaのベクトルデータベースを活用して、ローカルマシン上でのデータの検索と管理を行いました。

また、TwelveLabsのPegasusモデルとオープンソースモデルのLLaVA-NeXT-Videoを比較し、必要なインフラ環境、開発コスト、クエリの結果の違いを評価しました。オープンソースモデルと比較すると、Pegasusは運用オーバーヘッドが少なく、指示追従性(Instruction Following)が高く、より長いコンテキスト長が持てることで長尺動画のクエリ評価がより容易になるといった将来性を秘めていることが確認されました。



付録

ご参考ならびに今後の学習のために以下をご用意しています:

  1. 一連のコードを格納した Colab ノートブック

  2. TwelveLabs ドキュメント

  3. Chromaのクライアント および ドキュメント

ドラフトをレビューしてくださったChromaチームのJeff Huber氏とItai Smith氏に深く感謝いたします!



動画を対象としたRAGベースのQ&Aを実現するために、 TwelveLabsのEmbed API と Chromaのベクトルデータベース を統合するチュートリアルへようこそ。このガイドでは、生成モデルを使用して、構造化されていない動画のデータベースからテキストの回答を抽出する方法を学びます。

私たちは、TwelveLabsの豊かで文脈に即した埋め込み(embeddings)Chromaのベクトルデータベースを組み合わせることで、これらの動画埋め込みを保存、インデックス登録、クエリし、チャットアプリケーションを作成します。このノートブックは、わずか数行のコードでこれらのテクノロジーがもたらす現在の可能性を実証します。

比較のために、テキスト回答を生成する TwelveLabsのGenerate API を使用する場合と、代表的なオープンソースモデルである LLaVA-NeXT-Videoを使用する場合における、開発者体験(Developer Experience)の違いも紹介します。



1 - 概要

このチュートリアルでは、TwelveLabs Marengoを使用して動画の埋め込みを作成し、Chromaを使用してそれらの埋め込みを保存およびクエリして関連する動画を見つけ、TwelveLabs PegasusとLLaVA-NeXT-Videoを使用して取得した動画とチャットする、いくつかの例を見ていきます。

行う内容は以下の通りです:

  1. TwelveLabs Marengoエンジンを使用した動画埋め込みの作成

  2. Chromaデータベースへの動画埋め込みの保存

  3. Chromaデータベース内の埋め込みをクエリして、関連する動画セグメントを検索

  4. TwelveLabs Pegasusを使用した、取得した動画セグメントとのチャット

  5. オープンソースモデルを使用した、取得した動画セグメントとのチャット

  6. Pegasusとオープンソースモデルの比較

  7. ChromaとTwelveLabsの埋め込みを使用した、複数動画の検索

  8. Pegasusを使用した、動画全体とのチャット

  9. オープンソースモデルを使用した、動画全体とのチャット



2 - セットアップとインストール



必要なライブラリのインストール

まず、TwelveLabsとChromaのSDKをインストールします。

# Twelve LabsとChromaのライブラリをインストール
!pip install --upgrade twelvelabs
!pip install --upgrade chromadb

次に、オープンソースモデルを実行するために使用するライブラリをインストールします。

# オープンソースモデルで使用するライブラリをインストール
!pip install protobuf==3.20.3
!pip install --upgrade -q accelerate bitsandbytes
!pip install git+https://github.com/huggingface/transformers.git
!pip install av

Colab以外の環境でこのチュートリアルを実行している場合は、以下のセルのコメントを解除して、使用する動画データを取り扱うためのライブラリをインストールしてください。

# Colab環境以外の場合に追加でインストールするもの
# !python -m pip install pillow
# !python -m pip install sentencepiece
# !python -m pip install matplotlib



3 - 動画データの準備

次に、動画データを準備します。



動画データの使用

このデモでは、TwelveLabsのGoogleドライブフォルダにある動画データを使用します。これを使用するには、フォルダをご自身のGoogleドライブにリンクし、そのGoogleドライブをこのColabにマウントする必要があります。



Googleドライブへのフォルダーのリンク

このリンクから誰でもフォルダーにアクセスできます:https://drive.google.com/drive/folders/1k6FmkVglFsdtJG4MTIK-2dk1Dk9gTPtu?usp=share_link

これをご自身のGoogleドライブの正しい場所にリンクするには:

  1. Googleドライブの「共有アイテム」に移動します。

  2. アクセスしたい共有フォルダを探します。

  3. 「整理」->「ショートカットを追加」を選択します。

  4. 保存先として「マイドライブ」を選択し、「追加」をクリックします。

これで、このフォルダーは /content/drive/MyDrive/TwelveLabs-Chroma でアクセス可能になります。



ドライブのマウント

ここで、ご自身のドライブをこのColabにマウントします。

from google.colab import drive
drive.mount('/content/drive')



動画パスの設定

次に、取り扱う動画のパスを設定します。これは、動画フォルダをどこにリンクしたかによって変更される場合があります。

video_folder_path = "/content/drive/MyDrive/TwelveLabs-Chroma/videos/"



動画解像度のアップスケール

動画の一部は埋め込みエンジンで使用するには解像度が低すぎるため、 upscale_video を使用して解像度を倍にします。

import numpy as np
import subprocess
import os

def upscale_video(input_file, output_path, target_width=854, target_height=480):

    output_file = os.path.join(output_path, os.path.basename(input_file))

    if os.path.exists(output_file):
        print(f"Skipping {input_file} as {output_file} already exists.")
        return
 

    """
    FFmpegを使用して、動画をターゲットの幅と高さにアップスケールします。

    Args:
        input_file (str): 入力動画ファイルへのパス。
        output_file (str): アップスケールされた動画を保存するパス。
        target_width (int): 希望する出力幅。デフォルトは 854。
        target_height (int): 希望する出力高さ。デフォルトは 480。
    """
    # 動画をアップスケールするためのFFmpegコマンド
    ffmpeg_command = [
        'ffmpeg',
        '-i', input_file,                              # 入力ファイル
        '-vf', f'scale={target_width}:{target_height}', # ターゲットの寸法を持つスケールフィルター
        '-c:a', 'copy',                                # 再エンコードなしでオーディオストリームをコピー
        output_file,                                    # 出力ファイル
        "-y"
    ]

    # FFmpegコマンドを実行
    subprocess.run(ffmpeg_command)

    print(f"Upscaled video saved to {output_file}")

まず、アップスケールされた動画を保存するパスを作成します。

upscaled_video_dir = video_folder_path + "upscaled_videos/"

次に、動画をアップスケールします。すでにアップスケールされてアップスケール動画フォルダに配置されている動画はスキップされます。

# すべての.mp4動画をアップスケール
# 出力ディレクトリが存在しない場合は作成
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# 生動画ディレクトリ内の全ファイルを反復処理
for filename in os.listdir(video_folder_path):
    # ファイルが動画ファイルであるかチェック
    input_filepath = os.path.join(video_folder_path, filename)
    if filename.endswith(".mp4"):
        upscale_video(input_filepath, upscaled_video_dir)
# 出力
...
Upscaled video saved to ../videos/upscaled_videos/How To Make Birria Tacos [4nIFJFgH99w].mp4
...
Upscaled video saved to ../videos/upscaled_videos/How To Make a McDonald's Cheeseburger [SvOx7tA_Cv8].mp4
...
Upscaled video saved to ../videos/upscaled_videos/How To Make Potato Wedges [eZXbMWPJkKQ]



4 - TwelveLabs Marengoエンジンを使用した動画埋め込みの作成

ここでは、TwelveLabs Marengoエンジンを使用して、動画の埋め込みを作成します。

まず、TwelveLabs APIキーを読み込むことから始めます。これは、Colabの左側パネルにある「シークレット」(鍵アイコン)をクリックし、「+新しいシークレットを追加」をクリックすることで、Colabのシークレットストアに保存できます。

from google.colab import userdata
TL_API_KEY=userdata.get('TL_API_KEY')

次に、TwelveLabsおよびChromaのクライアントを初期化します。このチュートリアルのChromaデータベースはローカルに保存されます。

from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask

# Twelve Labsクライアントを初期化
twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)

ここでは、実験に最適なEphemeral Client(一時的クライアント)を使用します。これはメモリ内で実行されますが、アプリケーションを閉じるとデータは保持されません。

import chromadb

# 一時的なChromaクライアントを初期化
chroma_client = chromadb.Client()

別のクライアント

データを永続化する必要がある場合は、2つのオプションがあります:

1 - 永続クライアント(Persistent Client): これにより、データベースがローカルディスクに保存およびロードされます。

# 永続クライアントのオプション
chroma_client = chromadb.PersistentClient(path="/path/to/save/to")

2 - HTTPクライアント(HTTP Client): これにより、別のプロセス(独自にデプロイされた環境、またはChroma Cloud上)で実行されているChromaサーバーに接続します。

# HTTPクライアント
chroma_client = chromadb.HttpClient(host='localhost', port=8000)



動画埋め込みの作成とChroma向けフォーマット

ここで、Marengoを使用して動画の埋め込みを作成し、Chroma向けにフォーマットします。Chromaにデータをアップロードするには、アップロードしたいすべてのデータ(embeddings、meta-datas、ids)に対して3つの明確に分かれたリストが必要になります。

def on_task_update(task: EmbeddingsTask):
    print(f"  Status={task.status}")

# 動画の埋め込みを作成し、Chroma向けにフォーマットします
def create_video_embeddings(client,video_file,segment_length,task_id=None):

    # Twelve Labsに動画が存在しない場合はアップロードします
    video_name = os.path.basename(video_file)

    if task_id == None or task_id == "":
        task = client.embed.task.create(
            engine_name="Marengo-retrieval-2.7",
            video_file=video_file,
            video_clip_length=segment_length
        )
        print(
            f"Created task: id={task.id} engine_name={task.engine_name} status={task.status}"
        )

        status = task.wait_for_done(
            sleep_interval=2,
            callback=on_task_update
        )

        print(f"Embedding done: {status}")

        task_id = task.id

    # 埋め込みを取得します
    task = client.embed.task.retrieve(task_id)
    print("task",task)

    # Chroma向けにフォーマットします
    embeddings = []
    metadatas = []
    ids = []

    idx = 0

    print("embeddings",task.video_embeddings)

    if task.video_embeddings is not None:
        for v in task.video_embeddings:

            metadata = {
                "embedding_scope":v.embedding_scope,
                "start_offset_sec":v.start_offset_sec,
                "end_offset_sec":v.end_offset_sec,
                "video_file":video_file,
                "video_name":video_name,
                "task_id":task.id,
                "video_segment_number":idx
            }


            embedding = v.values
            id = task.id + "_" + str(idx)

            metadatas.append(metadata)
            embeddings.append(embedding)
            ids.append(id)

            idx += 1

    return (ids,metadatas,embeddings,task_id)

次に、取り扱う動画を選択し、埋め込みセグメントの長さを6秒に設定します。各埋め込みは動画内の6秒間のセグメントを参照することになります。

# セグメントの長さと、処理対象の動画を設定します
segment_duration = 6
current_video_path = upscaled_video_dir + "How To Make Birria Tacos [4nIFJFgH99w].mp4"

続いて、Chromaにアップロードする埋め込みデータを取得します。また、TwelveLabsから埋め込みの task_id も取得します。この task_id を使用すれば、将来この動画の埋め込みを再取得できるため、同じ動画に対して埋め込みモデルを何度も実行する必要がなくなります。

# Chromaにアップロードする埋め込みを取得します

# すでにtask_idをお持ちの場合は設定し、そうでない場合は空文字を設定してください
task_id = ""
ids, metadatas, embeddings, task_id = create_video_embeddings(twelvelabs_client,current_video_path,segment_duration,task_id)

出力に task_id が表示されます。




5 - Chromaデータベースへの動画埋め込みの保存

レコードとベクトルを分かりやすいフォーマットにできたので、あとはこれらをChromaの新しいコレクション(または、すでに存在していれば既存のコレクション)に追加するだけです。

# Chromaコレクションを取得、または作成します
chroma_collection_name = "video_embeddings"
collection = chroma_client.get_or_create_collection(chroma_collection_name)

# 埋め込みとメタデータをコレクションに追加します
collection.add(
    metadatas = metadatas,
    embeddings = embeddings,
    ids=ids
)



6 - Chromaデータベース内の埋め込みをクエリして、関連する動画セグメントを検索



ベクトル検索のテスト

コレクションにすべてが収まったので、埋め込みクエリが機能することを確認しテストできます。最初に返された埋め込みで検索を行います。これ自身との間の距離(ディスタンス)は0になるはずです。

# 最初の埋め込みをテスト検索として使用します
test_segment_embeddings = embeddings[0]

results = collection.query(
    query_embeddings=[test_segment_embeddings],
    n_results=4
)

print("search embeddings for:",ids[0])
print("found:", results["ids"][0][0])
print("distance:",results["distances"][0][0])

# 最初の動画のテキスト埋め込みがそれ自体から距離 0 であることをアサート(確認)します
assert results["ids"][0][0] == ids[0]
assert results["distances"][0][0] == 0



ベクトルデータベースへのクエリ

次に、TwelveLabsの埋め込みとChromaのベクトルデータベースを組み合わせ、膨大な数の動画を効率的にクエリする威力を紹介します。

先ほど選択した動画は、ビリアタコス(birria tacos)のクッキングチュートリアルです。TwelveLabsとChromaを使用して、ビリアタコスを作るために必要な材料は何かを把握します。

ここでそのクエリを設定します。

query = "What are the ingredients for birria tacos?"(ビリアタコスの材料は何ですか?)

次に、TwelveLabs Marengoを使用してテキストクエリを埋め込み、その埋め込みを使用してChromaコレクションをクエリする関数を作成します。

import os

def query_chroma(collection,query,n_results=1):
    # クエリ用の埋め込みを作成します
    embedding = twelvelabs_client.embed.create(
        engine_name="Marengo-retrieval-2.7",
        text=query,
        text_truncate="start",
    )

    query_embeddings = embedding.text_embedding.float

    # クエリ埋め込みを使用してChromaデータベースを検索します

    response = collection.query(
        query_embeddings=query_embeddings,
        n_results=n_results,
    )


    return response

続いて、Chromaクエリを実行して、このクエリに最も良く答えているチュートリアルの6秒間のセグメントを見つけます。このチュートリアルの後半で、このセグメントをTwelveLabs Pegasusとオープンソースモデルに渡し、必要な材料などについて質問を投げかけます。

メタデータにはセグメントと元の動画に関する情報が含まれています。

response = query_chroma(collection,query)

# 最も類似しているオブジェクトのプロパティと距離を出力します
print(response["ids"][0][0])
print(response["distances"][0][0])
print(response["metadatas"][0][0])

# 次のステップで使用するために、見つかった動画セグメントのパスを取得します
found_video_metadata = response["metadatas"][0][0]
# 出力

672101d56025850d8c890d1c_6
1.3052971363067627
{'embedding_scope': 'clip', 
'end_offset_sec': 42.0, 
'start_offset_sec': 36.0, 
'task_id': '672101d56025850d8c890d1c', 
'video_file': '../videos/upscaled_videos/How To Make Birria Tacos [4nIFJFgH99w].mp4', 
'video_name': 'How To Make Birria Tacos [4nIFJFgH99w]



7 - 動画をセグメントに分割する

チュートリアル動画全体をPegasusにアップロードすることも可能ですが、ここでは前のステップで見つかった関連する6秒のセグメントのみをアップロードします。これにより、計算リソースを効率的に使用できます。

ここでは、処理対象の動画を埋め込み側のセグメントと一致する単位に分割し、分割後の動画フォルダ(split videos folder)に配置します。

split_video_dir = video_folder_path + "split_videos/"

def split_video(input_path, output_dir, segment_duration=6):

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    filename = os.path.splitext(os.path.basename(input_path))[0]
    filetype = os.path.splitext(os.path.basename(input_path))[1]

    # 動画をセグメントに分割します
    ffmpeg_command = [
        'ffmpeg',
        '-i', input_path,             # 入力動画ファイル
        '-c', 'copy',                  # 映像と音声の両コーデックをコピー
        '-f', 'segment',               # セグメントモード
        '-segment_time', str(segment_duration),  # セグメントの長さ
        '-reset_timestamps', '1',      # 各セグメントのタイムスタンプをリセット
        output_dir + filename + '_%03d' + filetype  # 出力ファイル名のパターン (例: output_001.mp4)
    ]

    # コマンドを実行します
    subprocess.run(ffmpeg_command)

    print("Video split into 6-second segments successfully.")

# 動画をセグメントに分割します
split_video(input_path=current_video_path, output_dir=split_video_dir, segment_duration=segment_duration)



8 - TwelveLabs Pegasusを使用した、取得した動画セグメントとのチャット

次のいくつかのセルでは、Pegasusを使って動画とチャットするのがどれほど簡単であるかをお見せします。あらかじめ必要なものはすべて揃っています。



Pegasusへの動画セグメントのアップロード

まず、アップロードする動画用のインデックスとPegasus Engineを作成し、それからアップロードします。

# pegasusのインデックスを作成、または取得します
engines = [
        {
            "name": "pegasus1.2",
            "options": ["visual", "conversation"]
        }
    ]

index_name = "cooking_video_index"
indices_list = twelvelabs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelvelabs_client.index.create(
        name=index_name,
        engines=engines,

    )
    print(f"A new index has been created: id={index.id} name={index.name} engines={index.engines}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} engines={index.engines}")



動画セグメントファイル名の取得

次に、Chromaクエリのメタデータを使用して、クエリに一致したセグメントを見つけます。

# 動画セグメントのファイル名を取得します
found_video_segment_number = int(found_video_metadata["video_segment_number"])
found_video_file = found_video_metadata["video_file"]
found_video_filename = os.path.splitext(os.path.basename(found_video_file))[0]
found_video_filetype = os.path.splitext(os.path.basename(found_video_file))[1]
found_video_segment_filename = found_video_filename + f"_{found_video_segment_number:03d}"

found_video_segment_path = split_video_dir + found_video_segment_filename + found_video_filetype
print(found_video_segment_path)



動画をPegasusにアップロードし、ビデオID(Video ID)を取得する

続いて、動画セグメントをTwelveLabs Pegasusにアップロードする関数を作成します。

この関数は、動画とのチャットで使用される video_id を返します。複数ターンの会話(チャット)に備えて、この video_id を保存しておくことができます。

def upload_video_to_twelve_labs(index,video_path):

    # 動画をtwelve labsのインデックスにアップロードします
    task = twelvelabs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")

    # video id を返します
    return task.video_id

もしこの動画セグメントのビデオIDをすでに持っている場合は、ここに設定できます。

# すでにvideo_idをお持ちの場合は設定し、そうでない場合は空文字を設定してください
video_id = ""

# Pegasusでチャットするためのvideo idを取得するために、動画をアップロードします
if video_id == "":
    video_id = upload_video_to_twelve_labs(index,found_video_segment_path)

出力に video_id が表示されます。




Pegasusの呼び出し

ここで、動画セグメントの検索に使用した同じクエリでクエリを発行します。TwelveLabsが舞台裏のボイラープレート(定型処理)をすべて処理するため、シンプルな関数でモデルを呼び出すことができます



9 - オープンソースモデルを使用した、取得した動画セグメントとのチャット

ここからは、オープンソースモデルを使用して同動画セグメントとチャットし、Pegasusと比較します。

まず、モデルが処理できるように自身で動画をサンプリングする必要があります。LLaVa-NeXT-Videoのサンプリングコードを修正し、各動画から均一に8フレームをサンプリングします。

これをフォルダー内にあるすべての動画セグメントに対して実行できます。

read_video_pyav は LLaVa-NeXT-Video Colab ノートブックから直接取得したものであり、推論のために動画を正しいnumpy表現にフォーマットします。

import av
def read_video_pyav(container, indices):
    '''
    PyAVデコーダーで動画をデコードします。

    Args:
        container (av.container.input.InputContainer): PyAVコンテナ。
        indices (List[int]): デコードするフレームインデックスのリスト。

    Returns:
        np.ndarray: デコードされたフレームのnumpy配列。形状は (num_frames, height, width, 3)。
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # 動画から均一に num_samples 枚のフレームをサンプリングします
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)

    sampled_frames = read_video_pyav(container, indices)

    return sampled_frames

def process_videos_in_folder(folder_path):
    sample_info = {}

    # サポートされている動画ファイルの拡張子
    video_extensions = ('.mp4', '.avi', '.mov', '.mkv')

    for filename in os.listdir(folder_path):
        simple_video_name = os.path.splitext(os.path.basename(filename))[0]
        if filename.lower().endswith(video_extensions):
            video_path = os.path.join(folder_path, filename)
            try:
                sampled_clip = sample_video(video_path)
                sample_info[simple_video_name] = {"sampled_video": sampled_clip, "video_path" : video_path}
            except Exception as e:
                print(f"Error processing {filename}: {str(e)}")

    return sample_info

sampled_video_info = process_videos_in_folder(split_video_dir)

# Chromaクエリで見つかった動画セグメントを取得します
video_segment = sampled_video_info[found_video_segment_filename]['sampled_video']



モデルの準備

推論速度を上げるために、モデルを4ビット量子化(quantization)で設定します。

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

# 後ほどノートブック内で直接動画を再生するために使用します

from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import HTML

# 形状が (frames, height, width, channels) のnumpy配列
# 確認用にランダムに1つ選択します
video = sampled_video_info[list(sampled_video_info.keys())[0]]['sampled_video']

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # 生成された画像を表示させないために必要です

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())



モデルの実行

これでクエリと関連動画の用意ができたので、これらをモデルにおくり、出力を得ることができます。

# 各「content」は辞書のリストになっており、画像/ビデオ/テキストのモダリティを追加できます
conversation = [
      {
          "role": "user",
          "content": [
              {"type": "text", "text": query},
              {"type": "video"},
              ],
      },
]

prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)

inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

output = model.generate(**inputs, **generate_kwargs)
open_source_segment_generated_text = processor.batch_decode(output, skip_special_tokens=True)

print(open_source_segment_generated_text[0])



10 - Pegasusとオープンソースモデルの比較

ご覧の通り、クエリ(分かりやすくするために再度出力しています)に対してPegasusの方が適切な回答を生成しています。

print(f"query {query}")
print("pegasus 回答")
print(segment_answer)
print("オープンソース 回答")
print(open_source_segment_generated_text[0])



11 - ChromaとTwelveLabsの埋め込みを使った、複数動画の検索

これまでに、TwelveLabs MarengoとChromaを使用して動画内の関連セグメントを検出する方法を解説しました。

今度は、RAGユースケースにおけるMarengoとChromaの真の強みをお見せします。大量の候補動画全体にクエリを発行し、クエリへの回答に必要な特定の動画を検出します。



動画データベースの埋め込み:

まず、すべての動画を埋め込み化し、それらをChromaに保存します:

# すべての動画の埋め込みを行い、タスクIDを保存します
chroma_collection_name = "video_embeddings"
collection = chroma_client.get_or_create_collection(chroma_collection_name)

続いて、将来TwelveLabsからこれらの埋め込みを再取得する場合に備えて、動画ごとにtask_idを保存しておくための辞書を作成します。

# 各動画のtwelve labsのタスクIDを保存します
task_ids = {}

次に、先ほど作成した create_video_embeddings 関数を使用し、各動画の埋め込みを取得してChromaコレクションにアップロードします。

# 各動画の埋め込みとメタデータを取得します
# 同じ動画を複数回アップロードしないように、タスクIDを保存します
for filename in os.listdir(upscaled_video_dir):

    if filename.endswith(".mp4"):

        if (filename in task_ids.keys()):
            task_id = task_ids[filename]
        else:
            task_id = None

        file_path = os.path.join(upscaled_video_dir, filename)

        ids, metadatas, embeddings, task_id = create_video_embeddings(twelvelabs_client,file_path,segment_duration,task_id)

        task_ids[filename] = task_id

        collection.add(
            metadatas = metadatas,
            embeddings = embeddings,
            ids=ids
        )
print(task_ids)



データベースのクエリ

ここで、以前と同様のクエリを実行し、クエリへの回答となる動画全体を検索します。

response = query_chroma(collection,query)
found_full_video_name = response["metadatas"][0][0]["video_name"]
print(found_full_video_name)



12 - Pegasusを使用した、動画全体とのチャット

すでにインデックスが作成されているので、インデックスに動画をアップロードしてPegasusを呼び出すだけです。

# 同じ動画を複数回アップロードしないように、pegasusのビデオIDを保存する辞書を用意します
pegasus_video_ids = {}

各動画をここにアップロードし、チャットで使用することになるビデオID(video ids)を保存します。

for upscaled_video in os.listdir(upscaled_video_dir):
    upscaled_video_path = os.path.join(upscaled_video_dir, upscaled_video)
    print(upscaled_video_path)
    if upscaled_video not in pegasus_video_ids:
        video_id = upload_video_to_twelve_labs(index,upscaled_video_path)
        pegasus_video_ids[upscaled_video] = video_id
print(pegasus_video_ids)



Pegasusを呼び出して動画全体とチャットする

まず、Chromaクエリにマッチした動画に紐付けられる video_id を特定します。

video_id = pegasus_video_ids[found_full_video_name]
print(video_id)

次に、Pegasusに対して、ビリアタコスの食材(材料)が何か尋ねます。以前使用したクエリと全く同様ですが、今回は6秒のセグメント単位ではなく、動画全体を相手にチャットを展開します。

res = twelvelabs_client.generate.text(
  video_id=video_id,
  prompt=query
)
full_video_answer = res.data
print(f"query {query}")
print(f"{full_video_answer}")



動画全体の回答とセグメントの回答を比較する

次に、これを6秒のセグメントとチャットしたときに返された回答と比較します。

print(f"セグメントでの回答: \n{segment_answer}")



13 - オープンソースモデルを使用した、動画全体とのチャット

ここからは、動画全体とのチャットにおける、Pegasusとオープンソースモデルの実力を比較します。

すべての動画に対して再度サンプリングを行った後に動画全体に対しモデルを実行すると、もう少し面白い回答が出力されます。

# すべての動画をサンプリングします:
sampled_database_video_info = process_videos_in_folder(upscaled_video_dir)

# 各「content」は辞書のリストになっており、画像/ビデオ/テキストのモダリティを追加できます
conversation = [
      {
          "role": "user",
          "content": [
              {"type": "text", "text": query},
              {"type": "video"},
              ],
      },
]

prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)

inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

output = model.generate(**inputs, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)

print(generated_text[0])



結果をPegasusの回答と比較する

ご覧の通り、オープンソースモデルは動画全体とチャットした際にも回答を生成することができず、Pegasusのパワーを示しています:

print(f"Pegasusの回答: \n{full_video_answer}")



14 - 比較

2つのモデルを比較すると、Pegasusが動画を明確に理解しており、クエリに対して正確な応答を返せることがよく分かります。この動画がビリアタコスのチュートリアルであることを認識しており、材料の一覧を特定できています。動画全体でも、動画の一部を切り出したセグメント単位でも同様の回答が可能となっています。

一方、LLaVA-NeXT-Videoのオープンソースモデルはクエリの内容自体は解釈できていますが、動画から必要な情報を抽出できていません。LLaVA-NeXT-Videoも、これが料理動画であることはある程度把握していますが、クエリで求めるレベルの深い理解には至っていません。



15 - 結論

このガイドでは、個別またはセット全体の動画コンテンツを取り扱う方法をご紹介しました。TwelveLabs Embed APIとChromaのベクトルデータベースを活用して、ローカルマシン上でのデータの検索と管理を行いました。

また、TwelveLabsのPegasusモデルとオープンソースモデルのLLaVA-NeXT-Videoを比較し、必要なインフラ環境、開発コスト、クエリの結果の違いを評価しました。オープンソースモデルと比較すると、Pegasusは運用オーバーヘッドが少なく、指示追従性(Instruction Following)が高く、より長いコンテキスト長が持てることで長尺動画のクエリ評価がより容易になるといった将来性を秘めていることが確認されました。



付録

ご参考ならびに今後の学習のために以下をご用意しています:

  1. 一連のコードを格納した Colab ノートブック

  2. TwelveLabs ドキュメント

  3. Chromaのクライアント および ドキュメント

ドラフトをレビューしてくださったChromaチームのJeff Huber氏とItai Smith氏に深く感謝いたします!



動画を対象としたRAGベースのQ&Aを実現するために、 TwelveLabsのEmbed API と Chromaのベクトルデータベース を統合するチュートリアルへようこそ。このガイドでは、生成モデルを使用して、構造化されていない動画のデータベースからテキストの回答を抽出する方法を学びます。

私たちは、TwelveLabsの豊かで文脈に即した埋め込み(embeddings)Chromaのベクトルデータベースを組み合わせることで、これらの動画埋め込みを保存、インデックス登録、クエリし、チャットアプリケーションを作成します。このノートブックは、わずか数行のコードでこれらのテクノロジーがもたらす現在の可能性を実証します。

比較のために、テキスト回答を生成する TwelveLabsのGenerate API を使用する場合と、代表的なオープンソースモデルである LLaVA-NeXT-Videoを使用する場合における、開発者体験(Developer Experience)の違いも紹介します。



1 - 概要

このチュートリアルでは、TwelveLabs Marengoを使用して動画の埋め込みを作成し、Chromaを使用してそれらの埋め込みを保存およびクエリして関連する動画を見つけ、TwelveLabs PegasusとLLaVA-NeXT-Videoを使用して取得した動画とチャットする、いくつかの例を見ていきます。

行う内容は以下の通りです:

  1. TwelveLabs Marengoエンジンを使用した動画埋め込みの作成

  2. Chromaデータベースへの動画埋め込みの保存

  3. Chromaデータベース内の埋め込みをクエリして、関連する動画セグメントを検索

  4. TwelveLabs Pegasusを使用した、取得した動画セグメントとのチャット

  5. オープンソースモデルを使用した、取得した動画セグメントとのチャット

  6. Pegasusとオープンソースモデルの比較

  7. ChromaとTwelveLabsの埋め込みを使用した、複数動画の検索

  8. Pegasusを使用した、動画全体とのチャット

  9. オープンソースモデルを使用した、動画全体とのチャット



2 - セットアップとインストール



必要なライブラリのインストール

まず、TwelveLabsとChromaのSDKをインストールします。

# Twelve LabsとChromaのライブラリをインストール
!pip install --upgrade twelvelabs
!pip install --upgrade chromadb

次に、オープンソースモデルを実行するために使用するライブラリをインストールします。

# オープンソースモデルで使用するライブラリをインストール
!pip install protobuf==3.20.3
!pip install --upgrade -q accelerate bitsandbytes
!pip install git+https://github.com/huggingface/transformers.git
!pip install av

Colab以外の環境でこのチュートリアルを実行している場合は、以下のセルのコメントを解除して、使用する動画データを取り扱うためのライブラリをインストールしてください。

# Colab環境以外の場合に追加でインストールするもの
# !python -m pip install pillow
# !python -m pip install sentencepiece
# !python -m pip install matplotlib



3 - 動画データの準備

次に、動画データを準備します。



動画データの使用

このデモでは、TwelveLabsのGoogleドライブフォルダにある動画データを使用します。これを使用するには、フォルダをご自身のGoogleドライブにリンクし、そのGoogleドライブをこのColabにマウントする必要があります。



Googleドライブへのフォルダーのリンク

このリンクから誰でもフォルダーにアクセスできます:https://drive.google.com/drive/folders/1k6FmkVglFsdtJG4MTIK-2dk1Dk9gTPtu?usp=share_link

これをご自身のGoogleドライブの正しい場所にリンクするには:

  1. Googleドライブの「共有アイテム」に移動します。

  2. アクセスしたい共有フォルダを探します。

  3. 「整理」->「ショートカットを追加」を選択します。

  4. 保存先として「マイドライブ」を選択し、「追加」をクリックします。

これで、このフォルダーは /content/drive/MyDrive/TwelveLabs-Chroma でアクセス可能になります。



ドライブのマウント

ここで、ご自身のドライブをこのColabにマウントします。

from google.colab import drive
drive.mount('/content/drive')



動画パスの設定

次に、取り扱う動画のパスを設定します。これは、動画フォルダをどこにリンクしたかによって変更される場合があります。

video_folder_path = "/content/drive/MyDrive/TwelveLabs-Chroma/videos/"



動画解像度のアップスケール

動画の一部は埋め込みエンジンで使用するには解像度が低すぎるため、 upscale_video を使用して解像度を倍にします。

import numpy as np
import subprocess
import os

def upscale_video(input_file, output_path, target_width=854, target_height=480):

    output_file = os.path.join(output_path, os.path.basename(input_file))

    if os.path.exists(output_file):
        print(f"Skipping {input_file} as {output_file} already exists.")
        return
 

    """
    FFmpegを使用して、動画をターゲットの幅と高さにアップスケールします。

    Args:
        input_file (str): 入力動画ファイルへのパス。
        output_file (str): アップスケールされた動画を保存するパス。
        target_width (int): 希望する出力幅。デフォルトは 854。
        target_height (int): 希望する出力高さ。デフォルトは 480。
    """
    # 動画をアップスケールするためのFFmpegコマンド
    ffmpeg_command = [
        'ffmpeg',
        '-i', input_file,                              # 入力ファイル
        '-vf', f'scale={target_width}:{target_height}', # ターゲットの寸法を持つスケールフィルター
        '-c:a', 'copy',                                # 再エンコードなしでオーディオストリームをコピー
        output_file,                                    # 出力ファイル
        "-y"
    ]

    # FFmpegコマンドを実行
    subprocess.run(ffmpeg_command)

    print(f"Upscaled video saved to {output_file}")

まず、アップスケールされた動画を保存するパスを作成します。

upscaled_video_dir = video_folder_path + "upscaled_videos/"

次に、動画をアップスケールします。すでにアップスケールされてアップスケール動画フォルダに配置されている動画はスキップされます。

# すべての.mp4動画をアップスケール
# 出力ディレクトリが存在しない場合は作成
if not os.path.exists(upscaled_video_dir):
    os.makedirs(upscaled_video_dir)

# 生動画ディレクトリ内の全ファイルを反復処理
for filename in os.listdir(video_folder_path):
    # ファイルが動画ファイルであるかチェック
    input_filepath = os.path.join(video_folder_path, filename)
    if filename.endswith(".mp4"):
        upscale_video(input_filepath, upscaled_video_dir)
# 出力
...
Upscaled video saved to ../videos/upscaled_videos/How To Make Birria Tacos [4nIFJFgH99w].mp4
...
Upscaled video saved to ../videos/upscaled_videos/How To Make a McDonald's Cheeseburger [SvOx7tA_Cv8].mp4
...
Upscaled video saved to ../videos/upscaled_videos/How To Make Potato Wedges [eZXbMWPJkKQ]



4 - TwelveLabs Marengoエンジンを使用した動画埋め込みの作成

ここでは、TwelveLabs Marengoエンジンを使用して、動画の埋め込みを作成します。

まず、TwelveLabs APIキーを読み込むことから始めます。これは、Colabの左側パネルにある「シークレット」(鍵アイコン)をクリックし、「+新しいシークレットを追加」をクリックすることで、Colabのシークレットストアに保存できます。

from google.colab import userdata
TL_API_KEY=userdata.get('TL_API_KEY')

次に、TwelveLabsおよびChromaのクライアントを初期化します。このチュートリアルのChromaデータベースはローカルに保存されます。

from twelvelabs import TwelveLabs
from twelvelabs.models.embed import EmbeddingsTask

# Twelve Labsクライアントを初期化
twelvelabs_client = TwelveLabs(api_key=TL_API_KEY)

ここでは、実験に最適なEphemeral Client(一時的クライアント)を使用します。これはメモリ内で実行されますが、アプリケーションを閉じるとデータは保持されません。

import chromadb

# 一時的なChromaクライアントを初期化
chroma_client = chromadb.Client()

別のクライアント

データを永続化する必要がある場合は、2つのオプションがあります:

1 - 永続クライアント(Persistent Client): これにより、データベースがローカルディスクに保存およびロードされます。

# 永続クライアントのオプション
chroma_client = chromadb.PersistentClient(path="/path/to/save/to")

2 - HTTPクライアント(HTTP Client): これにより、別のプロセス(独自にデプロイされた環境、またはChroma Cloud上)で実行されているChromaサーバーに接続します。

# HTTPクライアント
chroma_client = chromadb.HttpClient(host='localhost', port=8000)



動画埋め込みの作成とChroma向けフォーマット

ここで、Marengoを使用して動画の埋め込みを作成し、Chroma向けにフォーマットします。Chromaにデータをアップロードするには、アップロードしたいすべてのデータ(embeddings、meta-datas、ids)に対して3つの明確に分かれたリストが必要になります。

def on_task_update(task: EmbeddingsTask):
    print(f"  Status={task.status}")

# 動画の埋め込みを作成し、Chroma向けにフォーマットします
def create_video_embeddings(client,video_file,segment_length,task_id=None):

    # Twelve Labsに動画が存在しない場合はアップロードします
    video_name = os.path.basename(video_file)

    if task_id == None or task_id == "":
        task = client.embed.task.create(
            engine_name="Marengo-retrieval-2.7",
            video_file=video_file,
            video_clip_length=segment_length
        )
        print(
            f"Created task: id={task.id} engine_name={task.engine_name} status={task.status}"
        )

        status = task.wait_for_done(
            sleep_interval=2,
            callback=on_task_update
        )

        print(f"Embedding done: {status}")

        task_id = task.id

    # 埋め込みを取得します
    task = client.embed.task.retrieve(task_id)
    print("task",task)

    # Chroma向けにフォーマットします
    embeddings = []
    metadatas = []
    ids = []

    idx = 0

    print("embeddings",task.video_embeddings)

    if task.video_embeddings is not None:
        for v in task.video_embeddings:

            metadata = {
                "embedding_scope":v.embedding_scope,
                "start_offset_sec":v.start_offset_sec,
                "end_offset_sec":v.end_offset_sec,
                "video_file":video_file,
                "video_name":video_name,
                "task_id":task.id,
                "video_segment_number":idx
            }


            embedding = v.values
            id = task.id + "_" + str(idx)

            metadatas.append(metadata)
            embeddings.append(embedding)
            ids.append(id)

            idx += 1

    return (ids,metadatas,embeddings,task_id)

次に、取り扱う動画を選択し、埋め込みセグメントの長さを6秒に設定します。各埋め込みは動画内の6秒間のセグメントを参照することになります。

# セグメントの長さと、処理対象の動画を設定します
segment_duration = 6
current_video_path = upscaled_video_dir + "How To Make Birria Tacos [4nIFJFgH99w].mp4"

続いて、Chromaにアップロードする埋め込みデータを取得します。また、TwelveLabsから埋め込みの task_id も取得します。この task_id を使用すれば、将来この動画の埋め込みを再取得できるため、同じ動画に対して埋め込みモデルを何度も実行する必要がなくなります。

# Chromaにアップロードする埋め込みを取得します

# すでにtask_idをお持ちの場合は設定し、そうでない場合は空文字を設定してください
task_id = ""
ids, metadatas, embeddings, task_id = create_video_embeddings(twelvelabs_client,current_video_path,segment_duration,task_id)

出力に task_id が表示されます。




5 - Chromaデータベースへの動画埋め込みの保存

レコードとベクトルを分かりやすいフォーマットにできたので、あとはこれらをChromaの新しいコレクション(または、すでに存在していれば既存のコレクション)に追加するだけです。

# Chromaコレクションを取得、または作成します
chroma_collection_name = "video_embeddings"
collection = chroma_client.get_or_create_collection(chroma_collection_name)

# 埋め込みとメタデータをコレクションに追加します
collection.add(
    metadatas = metadatas,
    embeddings = embeddings,
    ids=ids
)



6 - Chromaデータベース内の埋め込みをクエリして、関連する動画セグメントを検索



ベクトル検索のテスト

コレクションにすべてが収まったので、埋め込みクエリが機能することを確認しテストできます。最初に返された埋め込みで検索を行います。これ自身との間の距離(ディスタンス)は0になるはずです。

# 最初の埋め込みをテスト検索として使用します
test_segment_embeddings = embeddings[0]

results = collection.query(
    query_embeddings=[test_segment_embeddings],
    n_results=4
)

print("search embeddings for:",ids[0])
print("found:", results["ids"][0][0])
print("distance:",results["distances"][0][0])

# 最初の動画のテキスト埋め込みがそれ自体から距離 0 であることをアサート(確認)します
assert results["ids"][0][0] == ids[0]
assert results["distances"][0][0] == 0



ベクトルデータベースへのクエリ

次に、TwelveLabsの埋め込みとChromaのベクトルデータベースを組み合わせ、膨大な数の動画を効率的にクエリする威力を紹介します。

先ほど選択した動画は、ビリアタコス(birria tacos)のクッキングチュートリアルです。TwelveLabsとChromaを使用して、ビリアタコスを作るために必要な材料は何かを把握します。

ここでそのクエリを設定します。

query = "What are the ingredients for birria tacos?"(ビリアタコスの材料は何ですか?)

次に、TwelveLabs Marengoを使用してテキストクエリを埋め込み、その埋め込みを使用してChromaコレクションをクエリする関数を作成します。

import os

def query_chroma(collection,query,n_results=1):
    # クエリ用の埋め込みを作成します
    embedding = twelvelabs_client.embed.create(
        engine_name="Marengo-retrieval-2.7",
        text=query,
        text_truncate="start",
    )

    query_embeddings = embedding.text_embedding.float

    # クエリ埋め込みを使用してChromaデータベースを検索します

    response = collection.query(
        query_embeddings=query_embeddings,
        n_results=n_results,
    )


    return response

続いて、Chromaクエリを実行して、このクエリに最も良く答えているチュートリアルの6秒間のセグメントを見つけます。このチュートリアルの後半で、このセグメントをTwelveLabs Pegasusとオープンソースモデルに渡し、必要な材料などについて質問を投げかけます。

メタデータにはセグメントと元の動画に関する情報が含まれています。

response = query_chroma(collection,query)

# 最も類似しているオブジェクトのプロパティと距離を出力します
print(response["ids"][0][0])
print(response["distances"][0][0])
print(response["metadatas"][0][0])

# 次のステップで使用するために、見つかった動画セグメントのパスを取得します
found_video_metadata = response["metadatas"][0][0]
# 出力

672101d56025850d8c890d1c_6
1.3052971363067627
{'embedding_scope': 'clip', 
'end_offset_sec': 42.0, 
'start_offset_sec': 36.0, 
'task_id': '672101d56025850d8c890d1c', 
'video_file': '../videos/upscaled_videos/How To Make Birria Tacos [4nIFJFgH99w].mp4', 
'video_name': 'How To Make Birria Tacos [4nIFJFgH99w]



7 - 動画をセグメントに分割する

チュートリアル動画全体をPegasusにアップロードすることも可能ですが、ここでは前のステップで見つかった関連する6秒のセグメントのみをアップロードします。これにより、計算リソースを効率的に使用できます。

ここでは、処理対象の動画を埋め込み側のセグメントと一致する単位に分割し、分割後の動画フォルダ(split videos folder)に配置します。

split_video_dir = video_folder_path + "split_videos/"

def split_video(input_path, output_dir, segment_duration=6):

    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    filename = os.path.splitext(os.path.basename(input_path))[0]
    filetype = os.path.splitext(os.path.basename(input_path))[1]

    # 動画をセグメントに分割します
    ffmpeg_command = [
        'ffmpeg',
        '-i', input_path,             # 入力動画ファイル
        '-c', 'copy',                  # 映像と音声の両コーデックをコピー
        '-f', 'segment',               # セグメントモード
        '-segment_time', str(segment_duration),  # セグメントの長さ
        '-reset_timestamps', '1',      # 各セグメントのタイムスタンプをリセット
        output_dir + filename + '_%03d' + filetype  # 出力ファイル名のパターン (例: output_001.mp4)
    ]

    # コマンドを実行します
    subprocess.run(ffmpeg_command)

    print("Video split into 6-second segments successfully.")

# 動画をセグメントに分割します
split_video(input_path=current_video_path, output_dir=split_video_dir, segment_duration=segment_duration)



8 - TwelveLabs Pegasusを使用した、取得した動画セグメントとのチャット

次のいくつかのセルでは、Pegasusを使って動画とチャットするのがどれほど簡単であるかをお見せします。あらかじめ必要なものはすべて揃っています。



Pegasusへの動画セグメントのアップロード

まず、アップロードする動画用のインデックスとPegasus Engineを作成し、それからアップロードします。

# pegasusのインデックスを作成、または取得します
engines = [
        {
            "name": "pegasus1.2",
            "options": ["visual", "conversation"]
        }
    ]

index_name = "cooking_video_index"
indices_list = twelvelabs_client.index.list(name=index_name)

if len(indices_list) == 0:
    index = twelvelabs_client.index.create(
        name=index_name,
        engines=engines,

    )
    print(f"A new index has been created: id={index.id} name={index.name} engines={index.engines}")
else:
    index = indices_list[0]
    print(f"Index already exists: id={index.id} name={index.name} engines={index.engines}")



動画セグメントファイル名の取得

次に、Chromaクエリのメタデータを使用して、クエリに一致したセグメントを見つけます。

# 動画セグメントのファイル名を取得します
found_video_segment_number = int(found_video_metadata["video_segment_number"])
found_video_file = found_video_metadata["video_file"]
found_video_filename = os.path.splitext(os.path.basename(found_video_file))[0]
found_video_filetype = os.path.splitext(os.path.basename(found_video_file))[1]
found_video_segment_filename = found_video_filename + f"_{found_video_segment_number:03d}"

found_video_segment_path = split_video_dir + found_video_segment_filename + found_video_filetype
print(found_video_segment_path)



動画をPegasusにアップロードし、ビデオID(Video ID)を取得する

続いて、動画セグメントをTwelveLabs Pegasusにアップロードする関数を作成します。

この関数は、動画とのチャットで使用される video_id を返します。複数ターンの会話(チャット)に備えて、この video_id を保存しておくことができます。

def upload_video_to_twelve_labs(index,video_path):

    # 動画をtwelve labsのインデックスにアップロードします
    task = twelvelabs_client.task.create(
        index_id=index.id,
        file = video_path
    )
    print(f"Task created: id={task.id} status={task.status}")

    task.wait_for_done(sleep_interval=5, callback=on_task_update)

    if task.status != "ready":
      raise RuntimeError(f"Indexing failed with status {task.status}")
    print(f"The unique identifer of your video is {task.video_id}.")

    # video id を返します
    return task.video_id

もしこの動画セグメントのビデオIDをすでに持っている場合は、ここに設定できます。

# すでにvideo_idをお持ちの場合は設定し、そうでない場合は空文字を設定してください
video_id = ""

# Pegasusでチャットするためのvideo idを取得するために、動画をアップロードします
if video_id == "":
    video_id = upload_video_to_twelve_labs(index,found_video_segment_path)

出力に video_id が表示されます。




Pegasusの呼び出し

ここで、動画セグメントの検索に使用した同じクエリでクエリを発行します。TwelveLabsが舞台裏のボイラープレート(定型処理)をすべて処理するため、シンプルな関数でモデルを呼び出すことができます



9 - オープンソースモデルを使用した、取得した動画セグメントとのチャット

ここからは、オープンソースモデルを使用して同動画セグメントとチャットし、Pegasusと比較します。

まず、モデルが処理できるように自身で動画をサンプリングする必要があります。LLaVa-NeXT-Videoのサンプリングコードを修正し、各動画から均一に8フレームをサンプリングします。

これをフォルダー内にあるすべての動画セグメントに対して実行できます。

read_video_pyav は LLaVa-NeXT-Video Colab ノートブックから直接取得したものであり、推論のために動画を正しいnumpy表現にフォーマットします。

import av
def read_video_pyav(container, indices):
    '''
    PyAVデコーダーで動画をデコードします。

    Args:
        container (av.container.input.InputContainer): PyAVコンテナ。
        indices (List[int]): デコードするフレームインデックスのリスト。

    Returns:
        np.ndarray: デコードされたフレームのnumpy配列。形状は (num_frames, height, width, 3)。
    '''
    frames = []
    container.seek(0)
    start_index = indices[0]
    end_index = indices[-1]
    for i, frame in enumerate(container.decode(video=0)):
        if i > end_index:
            break
        if i >= start_index and i in indices:
            frames.append(frame)
    return np.stack([x.to_ndarray(format="rgb24") for x in frames])

def sample_video(video_path, num_samples=8):
    container = av.open(video_path)

    # 動画から均一に num_samples 枚のフレームをサンプリングします
    total_frames = container.streams.video[0].frames
    indices = np.arange(0, total_frames, total_frames / num_samples).astype(int)

    sampled_frames = read_video_pyav(container, indices)

    return sampled_frames

def process_videos_in_folder(folder_path):
    sample_info = {}

    # サポートされている動画ファイルの拡張子
    video_extensions = ('.mp4', '.avi', '.mov', '.mkv')

    for filename in os.listdir(folder_path):
        simple_video_name = os.path.splitext(os.path.basename(filename))[0]
        if filename.lower().endswith(video_extensions):
            video_path = os.path.join(folder_path, filename)
            try:
                sampled_clip = sample_video(video_path)
                sample_info[simple_video_name] = {"sampled_video": sampled_clip, "video_path" : video_path}
            except Exception as e:
                print(f"Error processing {filename}: {str(e)}")

    return sample_info

sampled_video_info = process_videos_in_folder(split_video_dir)

# Chromaクエリで見つかった動画セグメントを取得します
video_segment = sampled_video_info[found_video_segment_filename]['sampled_video']



モデルの準備

推論速度を上げるために、モデルを4ビット量子化(quantization)で設定します。

from transformers import BitsAndBytesConfig, LlavaNextVideoForConditionalGeneration, LlavaNextVideoProcessor
import torch

quantization_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16
)

processor = LlavaNextVideoProcessor.from_pretrained("llava-hf/LLaVA-NeXT-Video-7B-hf")
model = LlavaNextVideoForConditionalGeneration.from_pretrained(
    "llava-hf/LLaVA-NeXT-Video-7B-hf",
    quantization_config=quantization_config,
    device_map='auto'
)

# 後ほどノートブック内で直接動画を再生するために使用します

from matplotlib import pyplot as plt
from matplotlib import animation
from IPython.display import HTML

# 形状が (frames, height, width, channels) のnumpy配列
# 確認用にランダムに1つ選択します
video = sampled_video_info[list(sampled_video_info.keys())[0]]['sampled_video']

fig = plt.figure()
im = plt.imshow(video[0,:,:,:])

plt.close() # 生成された画像を表示させないために必要です

def init():
    im.set_data(video[0,:,:,:])

def animate(i):
    im.set_data(video[i,:,:,:])
    return im

anim = animation.FuncAnimation(fig, animate, init_func=init, frames=video.shape[0],
                               interval=100)
HTML(anim.to_html5_video())



モデルの実行

これでクエリと関連動画の用意ができたので、これらをモデルにおくり、出力を得ることができます。

# 各「content」は辞書のリストになっており、画像/ビデオ/テキストのモダリティを追加できます
conversation = [
      {
          "role": "user",
          "content": [
              {"type": "text", "text": query},
              {"type": "video"},
              ],
      },
]

prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)

inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

output = model.generate(**inputs, **generate_kwargs)
open_source_segment_generated_text = processor.batch_decode(output, skip_special_tokens=True)

print(open_source_segment_generated_text[0])



10 - Pegasusとオープンソースモデルの比較

ご覧の通り、クエリ(分かりやすくするために再度出力しています)に対してPegasusの方が適切な回答を生成しています。

print(f"query {query}")
print("pegasus 回答")
print(segment_answer)
print("オープンソース 回答")
print(open_source_segment_generated_text[0])



11 - ChromaとTwelveLabsの埋め込みを使った、複数動画の検索

これまでに、TwelveLabs MarengoとChromaを使用して動画内の関連セグメントを検出する方法を解説しました。

今度は、RAGユースケースにおけるMarengoとChromaの真の強みをお見せします。大量の候補動画全体にクエリを発行し、クエリへの回答に必要な特定の動画を検出します。



動画データベースの埋め込み:

まず、すべての動画を埋め込み化し、それらをChromaに保存します:

# すべての動画の埋め込みを行い、タスクIDを保存します
chroma_collection_name = "video_embeddings"
collection = chroma_client.get_or_create_collection(chroma_collection_name)

続いて、将来TwelveLabsからこれらの埋め込みを再取得する場合に備えて、動画ごとにtask_idを保存しておくための辞書を作成します。

# 各動画のtwelve labsのタスクIDを保存します
task_ids = {}

次に、先ほど作成した create_video_embeddings 関数を使用し、各動画の埋め込みを取得してChromaコレクションにアップロードします。

# 各動画の埋め込みとメタデータを取得します
# 同じ動画を複数回アップロードしないように、タスクIDを保存します
for filename in os.listdir(upscaled_video_dir):

    if filename.endswith(".mp4"):

        if (filename in task_ids.keys()):
            task_id = task_ids[filename]
        else:
            task_id = None

        file_path = os.path.join(upscaled_video_dir, filename)

        ids, metadatas, embeddings, task_id = create_video_embeddings(twelvelabs_client,file_path,segment_duration,task_id)

        task_ids[filename] = task_id

        collection.add(
            metadatas = metadatas,
            embeddings = embeddings,
            ids=ids
        )
print(task_ids)



データベースのクエリ

ここで、以前と同様のクエリを実行し、クエリへの回答となる動画全体を検索します。

response = query_chroma(collection,query)
found_full_video_name = response["metadatas"][0][0]["video_name"]
print(found_full_video_name)



12 - Pegasusを使用した、動画全体とのチャット

すでにインデックスが作成されているので、インデックスに動画をアップロードしてPegasusを呼び出すだけです。

# 同じ動画を複数回アップロードしないように、pegasusのビデオIDを保存する辞書を用意します
pegasus_video_ids = {}

各動画をここにアップロードし、チャットで使用することになるビデオID(video ids)を保存します。

for upscaled_video in os.listdir(upscaled_video_dir):
    upscaled_video_path = os.path.join(upscaled_video_dir, upscaled_video)
    print(upscaled_video_path)
    if upscaled_video not in pegasus_video_ids:
        video_id = upload_video_to_twelve_labs(index,upscaled_video_path)
        pegasus_video_ids[upscaled_video] = video_id
print(pegasus_video_ids)



Pegasusを呼び出して動画全体とチャットする

まず、Chromaクエリにマッチした動画に紐付けられる video_id を特定します。

video_id = pegasus_video_ids[found_full_video_name]
print(video_id)

次に、Pegasusに対して、ビリアタコスの食材(材料)が何か尋ねます。以前使用したクエリと全く同様ですが、今回は6秒のセグメント単位ではなく、動画全体を相手にチャットを展開します。

res = twelvelabs_client.generate.text(
  video_id=video_id,
  prompt=query
)
full_video_answer = res.data
print(f"query {query}")
print(f"{full_video_answer}")



動画全体の回答とセグメントの回答を比較する

次に、これを6秒のセグメントとチャットしたときに返された回答と比較します。

print(f"セグメントでの回答: \n{segment_answer}")



13 - オープンソースモデルを使用した、動画全体とのチャット

ここからは、動画全体とのチャットにおける、Pegasusとオープンソースモデルの実力を比較します。

すべての動画に対して再度サンプリングを行った後に動画全体に対しモデルを実行すると、もう少し面白い回答が出力されます。

# すべての動画をサンプリングします:
sampled_database_video_info = process_videos_in_folder(upscaled_video_dir)

# 各「content」は辞書のリストになっており、画像/ビデオ/テキストのモダリティを追加できます
conversation = [
      {
          "role": "user",
          "content": [
              {"type": "text", "text": query},
              {"type": "video"},
              ],
      },
]

prompt = processor.apply_chat_template(conversation, add_generation_prompt=True)
prompt_len = len(prompt)

inputs = processor([prompt], videos=[video_segment], padding=True, return_tensors="pt").to(model.device)
generate_kwargs = {"max_new_tokens": 100, "do_sample": True, "top_p": 0.9}

output = model.generate(**inputs, **generate_kwargs)
generated_text = processor.batch_decode(output, skip_special_tokens=True)

print(generated_text[0])



結果をPegasusの回答と比較する

ご覧の通り、オープンソースモデルは動画全体とチャットした際にも回答を生成することができず、Pegasusのパワーを示しています:

print(f"Pegasusの回答: \n{full_video_answer}")



14 - 比較

2つのモデルを比較すると、Pegasusが動画を明確に理解しており、クエリに対して正確な応答を返せることがよく分かります。この動画がビリアタコスのチュートリアルであることを認識しており、材料の一覧を特定できています。動画全体でも、動画の一部を切り出したセグメント単位でも同様の回答が可能となっています。

一方、LLaVA-NeXT-Videoのオープンソースモデルはクエリの内容自体は解釈できていますが、動画から必要な情報を抽出できていません。LLaVA-NeXT-Videoも、これが料理動画であることはある程度把握していますが、クエリで求めるレベルの深い理解には至っていません。



15 - 結論

このガイドでは、個別またはセット全体の動画コンテンツを取り扱う方法をご紹介しました。TwelveLabs Embed APIとChromaのベクトルデータベースを活用して、ローカルマシン上でのデータの検索と管理を行いました。

また、TwelveLabsのPegasusモデルとオープンソースモデルのLLaVA-NeXT-Videoを比較し、必要なインフラ環境、開発コスト、クエリの結果の違いを評価しました。オープンソースモデルと比較すると、Pegasusは運用オーバーヘッドが少なく、指示追従性(Instruction Following)が高く、より長いコンテキスト長が持てることで長尺動画のクエリ評価がより容易になるといった将来性を秘めていることが確認されました。



付録

ご参考ならびに今後の学習のために以下をご用意しています:

  1. 一連のコードを格納した Colab ノートブック

  2. TwelveLabs ドキュメント

  3. Chromaのクライアント および ドキュメント