パートナーシップ

マルチモーダルAIを極める:Twelve Labs + Databricks Mosaic AIによる高度なビデオ理解ビデオ高度理解

ジェームズ・リー

この記事では、Twelve LabsのEmbed APIとDatabricks Mosaic AI Vector Searchを統合して、類似検索や推奨システムを含む高度なビデオ理解アプリケーションを構築する方法を、開発者向けに解説します。また、パフォーマンスの最適化、スケーリング、モニターにおける考慮事項についても取り上げます。

この記事では、Twelve LabsのEmbed APIとDatabricks Mosaic AI Vector Searchを統合して、類似検索や推奨システムを含む高度なビデオ理解アプリケーションを構築する方法を、開発者向けに解説します。また、パフォーマンスの最適化、スケーリング、モニターにおける考慮事項についても取り上げます。

この記事の内容

No headings found on page

ニュースレターに登録する

ニュースレターに登録する

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

AIを活用してビデオを検索、分析、探索します。

2024/08/30

14分

記事へのリンクをコピー

概要

Twelve LabsのEmbed APIを使用すると、開発者は高度なビデオ理解のユースケースを動かすためのマルチモーダル埋め込みを取得できます。これには、セマンティックビデオ検索やデータキュレーションから、コンテンツ推奨、ビデオRAGシステムまでが含まれます。

Twelve Labsを使用すると、ビデオ内の視覚的表現、ボディランゲージ、話し言葉、および全体的な文脈の間の関係を捉える文脈ベクトル表現を生成できます。Databricks Mosaic AIのVector Searchは、高次元ベクトルのインデックス作成とクエリ実行のための、堅牢でスケーラブルなインフラストラクチャを提供します。

このブログ記事では、これらの相補的な技術を活用して、ビデオAIアプリケーションの新たな可能性を切り拓く方法を説明します。

このチュートリアルの作成にあたり、ご協力いただいたDatabricksのNina Williams氏、Austin Zaccor氏、Fernanda Heredia氏、およびEmily Hutson氏に深く感謝いたします!

なぜ Twelve Labs + Databricks Mosaic AI なのか?

Twelve LabsのEmbed APIをDatabricks Mosaic AI Vector Searchと統合することで、大規模なビデオデータセットの効率的な処理や、正確なマルチモーダルコンテンツの表現といった、ビデオAIにおける主要な課題に対応できます。この統合により、高度なビデオアプリケーションの開発時間と必要なリソースを削減し、膨大なビデオライブラリ全体にわたる複雑なクエリを可能にするとともに、ワークフロー全体の効率を向上させます。

マルチモーダルデータを処理するための統合されたアプローチは特に注目に値します。テキスト、画像、音声の分析のために別々のモデルを使い分ける代わりに、ビデオコンテンツの本質を丸ごと捉える、単一の一貫した表現を扱えるようになります。これにより、展開アーキテクチャが簡素化されるだけでなく、洗練されたコンテンツ推奨システムから高度なビデオ検索エンジン、自動コンテンツモデレーションツールまで、よりニュアンスに富んだ文脈対応のアプリケーションが実現します。

さらに、この統合によりDatabricksエコシステムの機能が拡張され、ビデオ理解を既存のデータパイプラインや機械学習ワークフローにシームレスに組み込めるようになります。企業がリアルタイムのビデオ分析を開発している場合でも、大規模なコンテンツ分類システムを構築している場合でも、あるいは生成AIの新しいアプリケーションを模索している場合でも、この組み合わせソリューションは強力な基盤を提供します。これにより、ビデオAIで実現可能な価値の限界を押し広げ、メディアやエンターテインメントからセキュリティ、ヘルスケアに至るまでの業界において、イノベーションと問題解決の新しい道を切り拓きます。

Twelve Labs の Embed API を理解する

Twelve LabsのEmbed APIは、ビデオコンテンツ専用に設計されたマルチモーダル埋め込み技術における大きな進歩を表しています。フレームごとの分析や異なるモダリティごとの個別のモデルに依存する従来のアプローチとは異なり、このAPIは、ビデオ内の視覚的表現、ボディランゲージ、話し言葉、および全体的な文脈の複雑な相互作用を捉える文脈ベクトル表現を生成します。これは、当社の最先端のマルチモーダル基盤モデル Marengo-2.6をベースに構築されています。



Embed APIは、ビデオデータを扱うAIエンジニアにとって特に強力となるいくつかの重要な機能を提供します。第一に、ビデオに存在するあらゆるモダリティに柔軟に対応できるため、テキスト専用や画像専用の個別のモデルを用意する必要がありません。第二に、動き、アクション、時系列の情報を考慮するビデオネイティブなアプローチを採用しており、ビデオコンテンツのより正確で時系列的に一貫した解釈を保証します。最後に、すべてのモダリティからの埋め込みを統合する統一されたベクトル空間を作成し、ビデオコンテンツのより包括的な理解を促進します。

AIエンジニアにとって、Embed APIはビデオ理解タスクにおいて新しい可能性を切り拓きます。これにより、より高度なコンテンツ分析、向上したセマンティック検索機能、および強化された推奨システムが可能になります。文脈に沿った微細な合図や異なるモダリティ間の長時間の相互作用を捉えるAPIの能力は、感情認識、文脈を考慮したコンテンツモデレーション、高度なビデオ検索システムなど、ビデオコンテンツのニュアンス豊かな理解を必要とするアプリケーションで特に価値を発揮します。

前提条件

Twelve LabsのEmbed APIをDatabricks Mosaic AI Vector Searchに統合する前に、以下の前提条件を満たしていることを確認してください。

  1. ワークスペースの作成と管理のアクセス権を持つDatabricksアカウント。(無料トライアルは https://databricks.com/try-databricks から登録できます)

  2. Pythonプログラミングおよび基本的なデータサイエンスの概念についての知識。

  3. Twelve LabsのAPIキー。(playground.twelvelabs.io で登録してください)

  4. ベクトル埋め込みと類似性検索の概念に関する基本的な理解。

  5. (オプション)AWS上でDatabricksを使用する場合はAWSアカウント。AzureまたはGoogle Cloud上でDatabricksを使用する場合は不要です。

注意:Embed APIは現在プライベートベータ版ですが、すべてのユーザーがこのフォームに記入するだけでアクセスをリクエストできます。通常、数時間以内に確認メールが届き、Embed APIの使用を開始できるようになります。

ステップ 1: 環境のセットアップ

まず、Databricks環境をセットアップし、必要なライブラリをインストールします。

1 - 新しいDatabricksワークスペースを作成する:

2 - 新しいクラスターを作成するか、既存のクラスターに接続する:

このアプリケーションには、ほぼすべてのMLクラスターが機能します。以下の設定は、最適なコストパフォーマンスを求める方向けに提供されています。

  • 「Compute」タブで「Create compute」をクリックします。

  • 「Single node」を選択し、Runtimeには「14.3 LTS ML non-GPU」を選択します。

    • クラスターのポリシーとアクセスモードはデフォルトのままで構いません。

  • ノードタイプとして「r6i.xlarge」を選択します。

    • これにより、メモリ使用率を最大化しつつ、コストはAWS上でわずか$0.252/時、Databricksで割引適用前で1.02 DBU/時となります。

    • また、テストした中で最も高速なオプションの一つでした。

  • 他のすべてのオプションはデフォルトのままで構いません。

  • 下部にある「Create compute」をクリックし、ワークスペースに戻ります。

3 - Databricksワークスペースに新しいノートブックを作成する:

  • ワークスペースで「Create」をクリックし、「Notebook」を選択します。

  • ノートブックに名前を付けます(例: "TwelveLabs_MosaicAI_VectorSearch_Integration")

  • デフォルト言語としてPythonを選択します。

4 - Twelve Labs および Mosaic AI Vector Search の SDK をインストールする:

ノートブックの最初のセルで、次のコマンドを実行します。

%pip install twelvelabs databricks-vectorsearch

5 - Twelve Labs の認証を設定する:

次のセルに、以下のコードを追加します。

from twelvelabs import TwelveLabs
import os

# Retrieve the API key from Databricks secrets (recommended)
# You'll need to set up the secret scope and add your API key first
TWELVE_LABS_API_KEY = dbutils.secrets.get(scope="your-scope", key="twelvelabs-api-key")

if TWELVE_LABS_API_KEY is None:
    raise ValueError("TWELVE_LABS_API_KEY environment variable is not set")

# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

注意:セキュリティを強化するため、APIキーをハードコードしたり環境変数を使用したりするのではなく、Databricksのシークレット機能を使用して保存することをお勧めします。

ステップ 2: マルチモーダル埋め込みを生成する

提供されているgenerate_embedding関数を使用して、Twelve Labs Embed APIでマルチモーダル埋め込みを生成します。この関数は、Databricks内のSpark DataFrameで効率的に動作するようにPandasユーザー定義関数(UDF)として設計されています。これは、埋め込みタスクの作成、進行状況の監視、および結果の取得のプロセスをカプセル化しています。

次に、ビデオURLを文字列として受け取り、Twelve Labs Embed APIを呼び出して配列<float>を返すprocess_url関数を作成します。

以下に実装方法と使用方法を示します。

UDFの定義:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType
from twelvelabs.models.embed import EmbeddingsTask
import pandas as pd

@pandas_udf(ArrayType(FloatType()))
def get_video_embeddings(urls: pd.Series) -> pd.Series:
    def generate_embedding(video_url):
        twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)
        task = twelvelabs_client.embed.task.create(
            engine_name="Marengo-retrieval-2.6",
            video_url=video_url
        )
        task.wait_for_done()
        task_result = twelvelabs_client.embed.task.retrieve(task.id)
        embeddings = []
        for v in task_result.video_embeddings:
            embeddings.append({
                'embedding': v.embedding.float,
                'start_offset_sec': v.start_offset_sec,
                'end_offset_sec': v.end_offset_sec,
                'embedding_scope': v.embedding_scope
            })
        return embeddings

    def process_url(url):
        embeddings = generate_embedding(url)
        return embeddings[0]['embedding'] if embeddings else None

    return urls.apply(process_url)

ビデオURLを含むサンプルDataFrameの作成:

video_urls = [
    "https://example.com/video1.mp4",
    "https://example.com/video2.mp4",
    "https://example.com/video3.mp4"
]
df = spark.createDataFrame([(url,) for url in video_urls], ["video_url"])

埋め込みを生成するためのUDFの適用:

df_with_embeddings = df.withColumn("embedding", get_video_embeddings(df.video_url))

結果の表示:

df_with_embeddings.show(truncate=False)

このプロセスにより、DataFrame内の各ビデオURLに対して、視覚情報、音声情報、およびテキスト情報を含む、ビデオコンテンツのマルチモーダルな本質を捉えたマルチモーダル埋め込みが生成されます。

大規模なビデオデータセットの場合、埋め込みの生成は計算負荷が高く、時間がかかることがある点に注意してください。本番スケールのアプリケーションでは、バッチ処理や分散処理の戦略を実装することを検討してください。さらに、APIエラーやネットワークの問題を処理するために、適切なエラーハンドリングとログ記録が導入されていることを確認してください。

ステップ 3: ビデオ埋め込み用の Delta テーブルを作成する

次に、ビデオメタデータと、Twelve Labs Embed APIによって生成された埋め込みを保存するためのソースDeltaテーブルを作成します。このテーブルは、Databricks Mosaic AI Vector SearchにおけるVector Searchインデックスの基礎として機能します。

まず、ビデオのURLとメタデータを含むソースDataFrameを作成します:

from pyspark.sql import Row

# Create a list of sample video URLs and metadata
video_data = [
Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4', title='Elephant Dream'), 

Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4', title='Sintel'),

Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4', title='Big Buck Bunny')
]

# Create a DataFrame from the list
source_df = spark.createDataFrame(video_data)
source_df.show()

次に、SQLを使用してDeltaテーブルのスキーマを定義します:

%sql
CREATE TABLE IF NOT EXISTS videos_source_embeddings (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  title STRING,
  embedding ARRAY<FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true)

テーブルでChange Data Feedが有効になっていることに注意してください。これは、Vector Searchインデックスを作成および維持するために非常に重要です。

では、先ほど定義したget_video_embeddings関数を使用して、ビデオの埋め込みを生成します:

embeddings_df = source_df.withColumn("embedding", get_video_embeddings("url"))

このステップは、ビデオの数や長さによっては少し時間がかかる場合があります。

埋め込みが生成されたら、データをDeltaテーブルに書き込みます:

embeddings_df.write.mode("append").saveAsTable("videos_source_embeddings")

最後に、埋め込みを含むDataFrameを表示して、データを確認します:

display(embeddings_df)

この手順により、Vector Search機能のための強力な基盤が構築されます。Deltaテーブルは自動的にVector Searchインデックスと同期されるため、ビデオデータセットに対する更新や追加は即座に検索結果に反映されます。

覚えておくべきいくつかの重要なポイント:

  • id列は自動生成され、各ビデオに一意の識別子を提供します。

  • embedding列は、Twelve Labs Embed APIによって生成された各ビデオの高次元ベクトル表現を格納します。

  • Change Data Feedを有効にすることで、Databricksはテーブル内の変更を効率的に追跡できます。これは、最新のVector Searchインデックスを維持するために不可欠です。

ステップ 4: Mosaic AI Vector Search を設定する

このステップでは、ビデオ埋め込みを処理できるようにDatabricks Mosaic AI Vector Searchをセットアップします。これには、Vector Searchエンドポイントと、videos_source_embeddings Deltaテーブルと自動的に同期するDelta Sync Indexの作成が含まれます。

まず、Vector Searchエンドポイントを作成します:

from databricks.vector_search.client import VectorSearchClient

# Initialize the Vector Search client and name the endpoint
mosaic_client = VectorSearchClient()
endpoint_name = "twelve_labs_video_endpoint"

# Delete the existing endpoint if it exists
try:
    mosaic_client.delete_endpoint(endpoint_name)
    print(f"Deleted existing endpoint: {endpoint_name}")
except Exception:
    pass  # Ignore non-existing endpoints

# Create the new endpoint
endpoint = mosaic_client.create_endpoint(
    name=endpoint_name,
    endpoint_type="STANDARD"
)

このコードは、新しいVector Searchエンドポイントを作成するか、同じ名前の既存のエンドポイントを置き換えます。このエンドポイントは、Vector Search操作用のアクセスポイントとして機能します。

次に、videos_source_embeddings Deltaテーブルと自動的に同期するDelta Sync Indexを作成します:

# Define the source table name and index name
source_table_name = "twelvelabs.default.videos_source_embeddings"
index_name = "twelvelabs.default.video_embeddings_index"

index = mosaic_client.create_delta_sync_index(
    endpoint_name="twelve_labs_video_endpoint",
    source_table_name=source_table_name,
    index_name=index_name,
    primary_key="id",
    embedding_dimension=1024,
    embedding_vector_column="embedding",
    pipeline_type="TRIGGERED"
)

print(f"Created index: {index.name}")

このコードは、ソースDeltaテーブルにリンクするDelta Sync Indexを作成します。ソーステーブルに加えられた変更から数秒以内にインデックスを自動的に更新したい場合(Vector Searchの結果が常に最新であることを保証する)、pipeline_type="CONTINUOUS"を設定します。

インデックスが作成され、正しく同期されていることを確認するには、次のコードを使用して手動で同期を呼び出します:

# Check the status of the index; this may take some time
index_status = mosaic_client.get_index(
    endpoint_name="twelve_labs_video_endpoint",
    index_name="twelvelabs.default.video_embeddings_index"
)
print(f"Index status: {index_status}")

# Manually trigger the index sync
try:
    index.sync()
    print("Index sync triggered successfully.")
except Exception as e:
    print(f"Error triggering index sync: {str(e)}")

このコードを使用すると、インデックスのステータスを確認し、必要に応じて手動で同期を行うことができます。本番環境では、ソースDeltaテーブルの変更に基づいて自動的に同期するようにパイプラインを設定する方が好ましいでしょう。

覚えておくべき重要事項:

  1. Vector Searchエンドポイントは、Vector Search操作のアクセスポイントとして機能します。

  2. Delta Sync IndexはソースDeltaテーブルと自動的に同期し、常に最新の検索結果を保証します。

  3. embedding_dimensionは、Twelve LabsのEmbed APIによって生成された埋め込みの次元数(1024)に一致させる必要があります。

  4. primary_keyは「id」に設定します。これはソーステーブルの一意の識別子に対応している必要があります。

  5. embedding_vector_columnは「embedding」に設定します。これはソーステーブル内のビデオ埋め込みが含まれる列名と一致する必要があります。

ステップ 5: 類似性検索を実装する

次のステップは、設定されたMosaic AI Vector SearchインデックスとTwelve Labs Embed APIを使用して類似性検索機能を実装することです。これにより、マルチモーダル埋め込みのパワーを活用し、指定したテキストクエリに類似したビデオを見つけることができます。

まず、Twelve Labs Embed APIを使用して、テキストクエリの埋め込みを取得する関数を定義します:

def get_text_embedding(text_query):
    # Twelve Labs Embed API supports text-to-embedding
    text_embedding = twelvelabs_client.embed.create(
      engine_name="Marengo-retrieval-2.6",
      text=text_query,
      text_truncate="start"
    )

    return text_embedding.text_embedding.float

この関数はテキストクエリを受け取り、ビデオ埋め込みと同じモデルを使用してその埋め込みを生成し、ベクトル空間での互換性を確保します。

次に、類似性検索関数を実装します:

def similarity_search(query_text, num_results=5):
    # Initialize the Vector Search client and get the query embedding
    mosaic_client = VectorSearchClient()
    query_embedding = get_text_embedding(query_text)

    print(f"Query embedding generated: {len(query_embedding)} dimensions")

    # Perform the similarity search
    results = index.similarity_search(
        query_vector=query_embedding,
        num_results=num_results,
        columns=["id", "url", "title"]
    )
    return results

この関数は、テキストクエリと返される結果の数を受け取ります。クエリの埋め込みを生成し、Mosaic AI Vector Searchインデックスを使用して類似するビデオを検索します。

検索結果を解析して表示するには、次のヘルパー関数を使用します:

def parse_search_results(raw_results):
    try:
        data_array = raw_results['result']['data_array']
        columns = [col['name'] for col in raw_results['manifest']['columns']]
        return [dict(zip(columns, row)) for row in data_array]
    except KeyError:
        print("Unexpected result format:", raw_results)
        return []

では、これらをすべて組み合わせて、サンプル検索を実行してみましょう:

# Example usage
query = "A dragon"
raw_results = similarity_search(query)

# Parse and print the search results
search_results = parse_search_results(raw_results)
if search_results:
    print(f"Top {len(search_results)} videos similar to the query: '{query}'")
    for i, result in enumerate(search_results, 1):
        print(f"{i}. Title: {result.get('title', 'N/A')}, URL: {result.get('url', 'N/A')}, Similarity Score: {result.get('score', 'N/A')}")
else:
    print("No valid search results returned.")

このコードは、Twelve Labsの類似性検索関数を使用して「A dragon」というクエリに関連するビデオを検索する方法を示しています。その後、検索結果を解析し、ユーザーフレンドリーな形式で表示します。

覚えておくべき重要事項:

  1. get_text_embedding関数は、ビデオ埋め込みと同じTwelve Labsモデルを使用するため、互換性が保証されます。

  2. similarity_search関数は、テキストから埋め込みへの変換とVector Searchを組み合わせて、類似するビデオを見つけます。

  3. ネットワークの問題やAPIの変更が検索プロセスに影響を与える可能性があるため、エラーハンドリングは不可欠です。

  4. parse_search_results関数は、生のAPI応答をより扱いやすい形式に変換するのに役立ちます。

  5. similarity_search関数の num_resultsパラメータを調整して、返される結果の数を制御できます。

この実装により、ビデオデータセット全体の強力なセマンティック検索機能が有効になります。ユーザーは、Twelve Labs Embed APIによって生成された豊かなマルチモーダル埋め込みを活用して、自然言語クエリを使用して関連するビデオを見つけることができるようになります。

ステップ 6: ビデオ推奨システムを構築する

次に、Twelve Labs Embed APIとDatabricks Mosaic AI Vector Searchによって生成されたマルチモーダル埋め込みを使用して、基本的なビデオ推奨システムを作成します。このシステムは、埋め込みの類似性に基づいて、指定されたビデオに類似したビデオを提示します。

まず、簡単な推奨機能を実装します:

def get_video_recommendations(video_id, num_recommendations=5):
    # Initialize the Vector Search client
    mosaic_client = VectorSearchClient()

    # First, retrieve the embedding for the given video_id
    source_df = spark.table("videos_source_embeddings")
    video_embedding = source_df.filter(f"id = {video_id}").select("embedding").first()

    if not video_embedding:
        print(f"No video found with id: {video_id}")
        return []

    # Perform similarity search using the video's embedding
    try:
        results = index.similarity_search(
            query_vector=video_embedding["embedding"],
            num_results=num_recommendations + 1,  # +1 to account for the input video
            columns=["id", "url", "title"]
        )
        
        # Parse the results
        recommendations = parse_search_results(results)
        
        # Remove the input video from recommendations if present
        recommendations = [r for r in recommendations if r.get('id') != video_id]
        
        return recommendations[:num_recommendations]
    except Exception as e:
        print(f"Error during recommendation: {e}")
        return []

# Helper function to display recommendations
def display_recommendations(recommendations):
    if recommendations:
        print(f"Top {len(recommendations)} recommended videos:")
        for i, video in enumerate(recommendations, 1):
            print(f"{i}. Title: {video.get('title', 'N/A')}")
            print(f"   URL: {video.get('url', 'N/A')}")
            print(f"   Similarity Score: {video.get('score', 'N/A')}")
            print()
    else:
        print("No recommendations found.")

# Example usage
video_id = 1  # Assuming this is a valid video ID in your dataset
recommendations = get_video_recommendations(video_id)
display_recommendations(recommendations)

この実装は以下の仕組みで動きます:

  1. get_video_recommendations関数は、ビデオIDと返すべき推奨ビデオの件数を受け取ります。

  2. 指定されたビデオの埋め込みをソースDeltaテーブルから取得します。

  3. この埋め込みを使用して類似性検索を実行し、最も類似しているビデオを見つけます。

  4. 同じビデオが推奨されないように、結果から入力ビデオを削除します(結果に含まれる場合)。

  5. display_recommendationsヘルパー関数は、推奨ビデオのリストをユーザーフレンドリーな形で整形し、出力します。

この推奨システムを使用するには:

  1. videos_source_embeddingsテーブルに、有効な埋め込みを持つビデオが保存されていることを確認します。

  2. データセット内の有効なビデオIDを指定して、get_video_recommendations関数を呼び出します。

  3. 関数は類似度に基づいた推奨ビデオのリストを表示します。

この基本的な推奨システムは、コンテンツベースのビデオ推奨においてマルチモーダル埋め込みをどのように活用するかを示しています。これは、いくつかの方法で拡張および改善できます:

  • パーソナライズされた推奨を行うために、ユーザーの好みや視聴履歴を組み込みます。

  • 多様な推奨結果を保証するために、偶発性メカニズムを実装します。

  • ビデオメタデータ(ジャンル、長さ、投稿日など)に基づくフィルタリングを追加します。

  • 頻繁にリクエストされる推奨ビデオのパフォーマンスを向上させるために、キャッシュメカニズムを実装します。

推奨の品質は、ビデオデータセットの規模と多様性、さらにはTwelve Labs Embed APIが生成する埋め込みの正確性に依存することに注意してください。システムに登録するビデオを増やすにつれて、推奨結果はさらに高い関連性と多様性を持つようになります。

この統合を次のレベルに引き上げる

インデックスの更新と同期

ビデオライブラリが拡大し進化するにつれて、Vector Searchインデックスを最新の状態に維持することが極めて重要です。Mosaic AI Vector SearchはソースDeltaテーブルとのシームレスな同期を提供し、推奨結果や検索結果に常に最新データを反映させることができます。

インデックス更新と同期における重要な考慮事項:

  1. 増分更新Delta Lakeの変更データフィード(Change Data Feed)を活用し、インデックス内の変更されたレコード、または新しいレコードのみを効率的に更新します。

  2. 定期同期のスケジュール:Databricksのワークフローオーケストレーションツールを使用し、定期的な同期ジョブを実行してインデックスの鮮度を維持します。

  3. リアルタイム更新:時間に極めてシリズなアプリケーションでは、Databricks Mosaic AIのストリーミング機能を使用した準リアルタイムのインデックス更新の実装を検討してください。

  4. バージョン管理:Delta Lakeのタイムトラベル機能を利用してインデックスの複数のバージョンを維持し、必要に応じて簡単にロールバックできるようにします。

  5. 同期ステータスの監視:同期の成功を追跡し、更新処理での問題をすばやく特定するために、ログ記録やアラートの仕組みを実装します。

これらの手法を習得することで、TwelveLabsのビデオ埋め込みを常に最新に保ち、高度な検索と推奨のユースケースにいつでも対応できるようにします。

パフォーマンスとスケーリングの最適化

自身のビデオ分析パイプラインが成長するにつれ、パフォーマンスの最適化を継続し、ソリューションをスケールアップさせることが重要になります。Databricksの分散コンピューティング機能は、Twelve Labsの効率的な埋め込み生成と連携することで、大規模なビデオ処理タスクの処理に耐えうる堅牢な基盤を提供します。

ソリューションを最適化しスケーリングするための主なアプローチ:

  1. 分散処理:DatabricksのSparkクラスターを活用して、複数のノード間で埋め込みの生成タスクやインデックス構築タスクを並列化します。

  2. キャッシュ戦略:アクセス頻度が高い埋め込みに対してインテリジェントなキャッシュ処理を導入し、APIの呼び出しを減らして応答時間を短縮します。

  3. バッチ処理:大規模なビデオライブラリに対しては、ピーク時以外の時間帯に埋め込みを生成してインデックスを更新するバッチ処理ワークフローを実装します。

  4. クエリの最適化num_resultsなどのパラメータを調整し、効率的なフィルタリング手法を駆使して、Vector Searchクエリを微調整します。

  5. インデックスのパーティショニング:超大規模なデータセットの場合は、クエリのレスポンスを高め、よりきめ細かい更新を可能にするために、インデックスのパーティショニング戦略を調査します。

  6. オートスケーリング:Databricksのオートスケーリング機能を利用し、ワークロードの処理需要に合わせてコンピューティングリソースを動的に調整します。

  7. エッジコンピューティング:遅延(レイテンシ)を気にするアプリケーションの場合は、モデルの軽量版をデータソースの近くにデプロイすることを検討します。

これらの最適化手法を導入することで、優れたパフォーマンスとコスト効率を両立させながら、増え続けるビデオライブラリと増加するユーザー需要に対処することが可能になります。

モニタリングと分析

ビデオ理解パイプラインの継続的な成功を収めるためには、堅牢な監視(モニタリング)とデータ分析の仕組みが欠かせません。Databricksは、システムのパフォーマンス、ユーザー行動、ビジネスインパクトを追跡するための強力なツールを提供しています。

監視と分析で重点を置くべき主な領域:

  1. パフォーマンスメトリクス:クエリの遅延、埋め込み生成までの時間、インデックス同期にかかる時間などの主要業績評価指標(KPI)を追跡します。

  2. 利用ログの分析:ユーザーの操作履歴、よく使われる検索クエリ、頻繁にレコメンドされるビデオを分析し、ユーザー行動の洞察を得ます。

  3. 品質評価:システムによる自動評価指標と実際のユーザーフィードバックを活用し、検索結果や推奨結果の関連度を評価するフィードバックループを構築します。

  4. リソース使用状況:コンピューティングリソース、APIコール数、ストレージ利用量を継続的に監視し、コストとパフォーマンスを最適化します。

  5. エラー追跡:パイプライン内で発生した不具合を迅速に検知・解決するために、包括的なログ記録と警告システム(アラート)を整備します。

  6. A/Bテスト:Databricksの検証機能を活用し、さまざまな埋め込みモデル、検索アルゴリズム、またはレコメンドモデルを試験評価(テスト)します。

  7. ビジネス目標の検証:ビデオ解析により実現した機能を、ユーザーの滞在状況(エンゲージメント)、視聴時間、成約率などの最終的なKGIに紐付けます。

  8. 法規制・規約遵守:ビデオ処理処理パイプラインが個人情報保護規制やコンテンツモデレーションのガイドラインに適合しているかを継続的に確認します。

これらすべての監視手法を施すことで、ビデオパイプラインの働きとその成果について価値の高い情報を手にすることができます。このアプローチに基づく改善を絶えず繰り返すことで、Twelve Labsの高度なビデオ構造化機能とDatabricks Data Intelligence Platformの統合価値を、データに基づいて実証できるようになります。

まとめ

Twelve Labs と Databricks Mosaic AI は、洗練された映像コンテンツの構造化と、高度なデータ抽出タスクのための一貫した統合開発フレームワークを提供します。この統合技術により、マルチモーダルなビデオ表現ベクトルの抽出と実効性の高いベクトル検索(Vector Search)を活用し、開発者は独自のビデオ解析、レコメンド、検索システムを作り上げることが可能になります。

このチュートリアルでは、開発に必要な環境構築から、マルチモーダル情報のベクトル展開、ベクトルエンジンへの登録、類似コンテンツ検索、レコメンデーションの実装に必要な基本ロジックを解説しました。さらに実用に必要なスケーラブルな管理システム設計、最適化手法、信頼性向上のための監視方法まで包括的に言及しています。

映像データが爆発的に増加する現在、それらをシステムに最適化させて価値を効率よく発掘する術(すべ)は不可欠です。本ドキュメントは、この難解なプロジェクトを克服するための強力な設計方法論を提供するものです。ぜひそれらの機能をテストし、応用的課題を検討しながら、映像理解を進化させる開発コミュニティを共に盛り上げましょう。

さらに学びたい方のための情報ソース

これらの連携方法をさらに深く探索するために、以下の情報をぜひご一読ください。

  1. Twelve Labs 公式技術ドキュメント

  2. Databricks Vector Search に関する公式ヘルプ

  3. Databricks コミュニティ・フォーラム

  4. Twelve Labs Discord コミュニティ

これらのお役立ち情報を手元に置くことで、最先端のAIビデオテクノロジーを活用し、Twelve Labs と Databricks をベースにした独自のイノベーティブなシステムの構築を進めていきましょう。

概要

Twelve LabsのEmbed APIを使用すると、開発者は高度なビデオ理解のユースケースを動かすためのマルチモーダル埋め込みを取得できます。これには、セマンティックビデオ検索やデータキュレーションから、コンテンツ推奨、ビデオRAGシステムまでが含まれます。

Twelve Labsを使用すると、ビデオ内の視覚的表現、ボディランゲージ、話し言葉、および全体的な文脈の間の関係を捉える文脈ベクトル表現を生成できます。Databricks Mosaic AIのVector Searchは、高次元ベクトルのインデックス作成とクエリ実行のための、堅牢でスケーラブルなインフラストラクチャを提供します。

このブログ記事では、これらの相補的な技術を活用して、ビデオAIアプリケーションの新たな可能性を切り拓く方法を説明します。

このチュートリアルの作成にあたり、ご協力いただいたDatabricksのNina Williams氏、Austin Zaccor氏、Fernanda Heredia氏、およびEmily Hutson氏に深く感謝いたします!

なぜ Twelve Labs + Databricks Mosaic AI なのか?

Twelve LabsのEmbed APIをDatabricks Mosaic AI Vector Searchと統合することで、大規模なビデオデータセットの効率的な処理や、正確なマルチモーダルコンテンツの表現といった、ビデオAIにおける主要な課題に対応できます。この統合により、高度なビデオアプリケーションの開発時間と必要なリソースを削減し、膨大なビデオライブラリ全体にわたる複雑なクエリを可能にするとともに、ワークフロー全体の効率を向上させます。

マルチモーダルデータを処理するための統合されたアプローチは特に注目に値します。テキスト、画像、音声の分析のために別々のモデルを使い分ける代わりに、ビデオコンテンツの本質を丸ごと捉える、単一の一貫した表現を扱えるようになります。これにより、展開アーキテクチャが簡素化されるだけでなく、洗練されたコンテンツ推奨システムから高度なビデオ検索エンジン、自動コンテンツモデレーションツールまで、よりニュアンスに富んだ文脈対応のアプリケーションが実現します。

さらに、この統合によりDatabricksエコシステムの機能が拡張され、ビデオ理解を既存のデータパイプラインや機械学習ワークフローにシームレスに組み込めるようになります。企業がリアルタイムのビデオ分析を開発している場合でも、大規模なコンテンツ分類システムを構築している場合でも、あるいは生成AIの新しいアプリケーションを模索している場合でも、この組み合わせソリューションは強力な基盤を提供します。これにより、ビデオAIで実現可能な価値の限界を押し広げ、メディアやエンターテインメントからセキュリティ、ヘルスケアに至るまでの業界において、イノベーションと問題解決の新しい道を切り拓きます。

Twelve Labs の Embed API を理解する

Twelve LabsのEmbed APIは、ビデオコンテンツ専用に設計されたマルチモーダル埋め込み技術における大きな進歩を表しています。フレームごとの分析や異なるモダリティごとの個別のモデルに依存する従来のアプローチとは異なり、このAPIは、ビデオ内の視覚的表現、ボディランゲージ、話し言葉、および全体的な文脈の複雑な相互作用を捉える文脈ベクトル表現を生成します。これは、当社の最先端のマルチモーダル基盤モデル Marengo-2.6をベースに構築されています。



Embed APIは、ビデオデータを扱うAIエンジニアにとって特に強力となるいくつかの重要な機能を提供します。第一に、ビデオに存在するあらゆるモダリティに柔軟に対応できるため、テキスト専用や画像専用の個別のモデルを用意する必要がありません。第二に、動き、アクション、時系列の情報を考慮するビデオネイティブなアプローチを採用しており、ビデオコンテンツのより正確で時系列的に一貫した解釈を保証します。最後に、すべてのモダリティからの埋め込みを統合する統一されたベクトル空間を作成し、ビデオコンテンツのより包括的な理解を促進します。

AIエンジニアにとって、Embed APIはビデオ理解タスクにおいて新しい可能性を切り拓きます。これにより、より高度なコンテンツ分析、向上したセマンティック検索機能、および強化された推奨システムが可能になります。文脈に沿った微細な合図や異なるモダリティ間の長時間の相互作用を捉えるAPIの能力は、感情認識、文脈を考慮したコンテンツモデレーション、高度なビデオ検索システムなど、ビデオコンテンツのニュアンス豊かな理解を必要とするアプリケーションで特に価値を発揮します。

前提条件

Twelve LabsのEmbed APIをDatabricks Mosaic AI Vector Searchに統合する前に、以下の前提条件を満たしていることを確認してください。

  1. ワークスペースの作成と管理のアクセス権を持つDatabricksアカウント。(無料トライアルは https://databricks.com/try-databricks から登録できます)

  2. Pythonプログラミングおよび基本的なデータサイエンスの概念についての知識。

  3. Twelve LabsのAPIキー。(playground.twelvelabs.io で登録してください)

  4. ベクトル埋め込みと類似性検索の概念に関する基本的な理解。

  5. (オプション)AWS上でDatabricksを使用する場合はAWSアカウント。AzureまたはGoogle Cloud上でDatabricksを使用する場合は不要です。

注意:Embed APIは現在プライベートベータ版ですが、すべてのユーザーがこのフォームに記入するだけでアクセスをリクエストできます。通常、数時間以内に確認メールが届き、Embed APIの使用を開始できるようになります。

ステップ 1: 環境のセットアップ

まず、Databricks環境をセットアップし、必要なライブラリをインストールします。

1 - 新しいDatabricksワークスペースを作成する:

2 - 新しいクラスターを作成するか、既存のクラスターに接続する:

このアプリケーションには、ほぼすべてのMLクラスターが機能します。以下の設定は、最適なコストパフォーマンスを求める方向けに提供されています。

  • 「Compute」タブで「Create compute」をクリックします。

  • 「Single node」を選択し、Runtimeには「14.3 LTS ML non-GPU」を選択します。

    • クラスターのポリシーとアクセスモードはデフォルトのままで構いません。

  • ノードタイプとして「r6i.xlarge」を選択します。

    • これにより、メモリ使用率を最大化しつつ、コストはAWS上でわずか$0.252/時、Databricksで割引適用前で1.02 DBU/時となります。

    • また、テストした中で最も高速なオプションの一つでした。

  • 他のすべてのオプションはデフォルトのままで構いません。

  • 下部にある「Create compute」をクリックし、ワークスペースに戻ります。

3 - Databricksワークスペースに新しいノートブックを作成する:

  • ワークスペースで「Create」をクリックし、「Notebook」を選択します。

  • ノートブックに名前を付けます(例: "TwelveLabs_MosaicAI_VectorSearch_Integration")

  • デフォルト言語としてPythonを選択します。

4 - Twelve Labs および Mosaic AI Vector Search の SDK をインストールする:

ノートブックの最初のセルで、次のコマンドを実行します。

%pip install twelvelabs databricks-vectorsearch

5 - Twelve Labs の認証を設定する:

次のセルに、以下のコードを追加します。

from twelvelabs import TwelveLabs
import os

# Retrieve the API key from Databricks secrets (recommended)
# You'll need to set up the secret scope and add your API key first
TWELVE_LABS_API_KEY = dbutils.secrets.get(scope="your-scope", key="twelvelabs-api-key")

if TWELVE_LABS_API_KEY is None:
    raise ValueError("TWELVE_LABS_API_KEY environment variable is not set")

# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

注意:セキュリティを強化するため、APIキーをハードコードしたり環境変数を使用したりするのではなく、Databricksのシークレット機能を使用して保存することをお勧めします。

ステップ 2: マルチモーダル埋め込みを生成する

提供されているgenerate_embedding関数を使用して、Twelve Labs Embed APIでマルチモーダル埋め込みを生成します。この関数は、Databricks内のSpark DataFrameで効率的に動作するようにPandasユーザー定義関数(UDF)として設計されています。これは、埋め込みタスクの作成、進行状況の監視、および結果の取得のプロセスをカプセル化しています。

次に、ビデオURLを文字列として受け取り、Twelve Labs Embed APIを呼び出して配列<float>を返すprocess_url関数を作成します。

以下に実装方法と使用方法を示します。

UDFの定義:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType
from twelvelabs.models.embed import EmbeddingsTask
import pandas as pd

@pandas_udf(ArrayType(FloatType()))
def get_video_embeddings(urls: pd.Series) -> pd.Series:
    def generate_embedding(video_url):
        twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)
        task = twelvelabs_client.embed.task.create(
            engine_name="Marengo-retrieval-2.6",
            video_url=video_url
        )
        task.wait_for_done()
        task_result = twelvelabs_client.embed.task.retrieve(task.id)
        embeddings = []
        for v in task_result.video_embeddings:
            embeddings.append({
                'embedding': v.embedding.float,
                'start_offset_sec': v.start_offset_sec,
                'end_offset_sec': v.end_offset_sec,
                'embedding_scope': v.embedding_scope
            })
        return embeddings

    def process_url(url):
        embeddings = generate_embedding(url)
        return embeddings[0]['embedding'] if embeddings else None

    return urls.apply(process_url)

ビデオURLを含むサンプルDataFrameの作成:

video_urls = [
    "https://example.com/video1.mp4",
    "https://example.com/video2.mp4",
    "https://example.com/video3.mp4"
]
df = spark.createDataFrame([(url,) for url in video_urls], ["video_url"])

埋め込みを生成するためのUDFの適用:

df_with_embeddings = df.withColumn("embedding", get_video_embeddings(df.video_url))

結果の表示:

df_with_embeddings.show(truncate=False)

このプロセスにより、DataFrame内の各ビデオURLに対して、視覚情報、音声情報、およびテキスト情報を含む、ビデオコンテンツのマルチモーダルな本質を捉えたマルチモーダル埋め込みが生成されます。

大規模なビデオデータセットの場合、埋め込みの生成は計算負荷が高く、時間がかかることがある点に注意してください。本番スケールのアプリケーションでは、バッチ処理や分散処理の戦略を実装することを検討してください。さらに、APIエラーやネットワークの問題を処理するために、適切なエラーハンドリングとログ記録が導入されていることを確認してください。

ステップ 3: ビデオ埋め込み用の Delta テーブルを作成する

次に、ビデオメタデータと、Twelve Labs Embed APIによって生成された埋め込みを保存するためのソースDeltaテーブルを作成します。このテーブルは、Databricks Mosaic AI Vector SearchにおけるVector Searchインデックスの基礎として機能します。

まず、ビデオのURLとメタデータを含むソースDataFrameを作成します:

from pyspark.sql import Row

# Create a list of sample video URLs and metadata
video_data = [
Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4', title='Elephant Dream'), 

Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4', title='Sintel'),

Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4', title='Big Buck Bunny')
]

# Create a DataFrame from the list
source_df = spark.createDataFrame(video_data)
source_df.show()

次に、SQLを使用してDeltaテーブルのスキーマを定義します:

%sql
CREATE TABLE IF NOT EXISTS videos_source_embeddings (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  title STRING,
  embedding ARRAY<FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true)

テーブルでChange Data Feedが有効になっていることに注意してください。これは、Vector Searchインデックスを作成および維持するために非常に重要です。

では、先ほど定義したget_video_embeddings関数を使用して、ビデオの埋め込みを生成します:

embeddings_df = source_df.withColumn("embedding", get_video_embeddings("url"))

このステップは、ビデオの数や長さによっては少し時間がかかる場合があります。

埋め込みが生成されたら、データをDeltaテーブルに書き込みます:

embeddings_df.write.mode("append").saveAsTable("videos_source_embeddings")

最後に、埋め込みを含むDataFrameを表示して、データを確認します:

display(embeddings_df)

この手順により、Vector Search機能のための強力な基盤が構築されます。Deltaテーブルは自動的にVector Searchインデックスと同期されるため、ビデオデータセットに対する更新や追加は即座に検索結果に反映されます。

覚えておくべきいくつかの重要なポイント:

  • id列は自動生成され、各ビデオに一意の識別子を提供します。

  • embedding列は、Twelve Labs Embed APIによって生成された各ビデオの高次元ベクトル表現を格納します。

  • Change Data Feedを有効にすることで、Databricksはテーブル内の変更を効率的に追跡できます。これは、最新のVector Searchインデックスを維持するために不可欠です。

ステップ 4: Mosaic AI Vector Search を設定する

このステップでは、ビデオ埋め込みを処理できるようにDatabricks Mosaic AI Vector Searchをセットアップします。これには、Vector Searchエンドポイントと、videos_source_embeddings Deltaテーブルと自動的に同期するDelta Sync Indexの作成が含まれます。

まず、Vector Searchエンドポイントを作成します:

from databricks.vector_search.client import VectorSearchClient

# Initialize the Vector Search client and name the endpoint
mosaic_client = VectorSearchClient()
endpoint_name = "twelve_labs_video_endpoint"

# Delete the existing endpoint if it exists
try:
    mosaic_client.delete_endpoint(endpoint_name)
    print(f"Deleted existing endpoint: {endpoint_name}")
except Exception:
    pass  # Ignore non-existing endpoints

# Create the new endpoint
endpoint = mosaic_client.create_endpoint(
    name=endpoint_name,
    endpoint_type="STANDARD"
)

このコードは、新しいVector Searchエンドポイントを作成するか、同じ名前の既存のエンドポイントを置き換えます。このエンドポイントは、Vector Search操作用のアクセスポイントとして機能します。

次に、videos_source_embeddings Deltaテーブルと自動的に同期するDelta Sync Indexを作成します:

# Define the source table name and index name
source_table_name = "twelvelabs.default.videos_source_embeddings"
index_name = "twelvelabs.default.video_embeddings_index"

index = mosaic_client.create_delta_sync_index(
    endpoint_name="twelve_labs_video_endpoint",
    source_table_name=source_table_name,
    index_name=index_name,
    primary_key="id",
    embedding_dimension=1024,
    embedding_vector_column="embedding",
    pipeline_type="TRIGGERED"
)

print(f"Created index: {index.name}")

このコードは、ソースDeltaテーブルにリンクするDelta Sync Indexを作成します。ソーステーブルに加えられた変更から数秒以内にインデックスを自動的に更新したい場合(Vector Searchの結果が常に最新であることを保証する)、pipeline_type="CONTINUOUS"を設定します。

インデックスが作成され、正しく同期されていることを確認するには、次のコードを使用して手動で同期を呼び出します:

# Check the status of the index; this may take some time
index_status = mosaic_client.get_index(
    endpoint_name="twelve_labs_video_endpoint",
    index_name="twelvelabs.default.video_embeddings_index"
)
print(f"Index status: {index_status}")

# Manually trigger the index sync
try:
    index.sync()
    print("Index sync triggered successfully.")
except Exception as e:
    print(f"Error triggering index sync: {str(e)}")

このコードを使用すると、インデックスのステータスを確認し、必要に応じて手動で同期を行うことができます。本番環境では、ソースDeltaテーブルの変更に基づいて自動的に同期するようにパイプラインを設定する方が好ましいでしょう。

覚えておくべき重要事項:

  1. Vector Searchエンドポイントは、Vector Search操作のアクセスポイントとして機能します。

  2. Delta Sync IndexはソースDeltaテーブルと自動的に同期し、常に最新の検索結果を保証します。

  3. embedding_dimensionは、Twelve LabsのEmbed APIによって生成された埋め込みの次元数(1024)に一致させる必要があります。

  4. primary_keyは「id」に設定します。これはソーステーブルの一意の識別子に対応している必要があります。

  5. embedding_vector_columnは「embedding」に設定します。これはソーステーブル内のビデオ埋め込みが含まれる列名と一致する必要があります。

ステップ 5: 類似性検索を実装する

次のステップは、設定されたMosaic AI Vector SearchインデックスとTwelve Labs Embed APIを使用して類似性検索機能を実装することです。これにより、マルチモーダル埋め込みのパワーを活用し、指定したテキストクエリに類似したビデオを見つけることができます。

まず、Twelve Labs Embed APIを使用して、テキストクエリの埋め込みを取得する関数を定義します:

def get_text_embedding(text_query):
    # Twelve Labs Embed API supports text-to-embedding
    text_embedding = twelvelabs_client.embed.create(
      engine_name="Marengo-retrieval-2.6",
      text=text_query,
      text_truncate="start"
    )

    return text_embedding.text_embedding.float

この関数はテキストクエリを受け取り、ビデオ埋め込みと同じモデルを使用してその埋め込みを生成し、ベクトル空間での互換性を確保します。

次に、類似性検索関数を実装します:

def similarity_search(query_text, num_results=5):
    # Initialize the Vector Search client and get the query embedding
    mosaic_client = VectorSearchClient()
    query_embedding = get_text_embedding(query_text)

    print(f"Query embedding generated: {len(query_embedding)} dimensions")

    # Perform the similarity search
    results = index.similarity_search(
        query_vector=query_embedding,
        num_results=num_results,
        columns=["id", "url", "title"]
    )
    return results

この関数は、テキストクエリと返される結果の数を受け取ります。クエリの埋め込みを生成し、Mosaic AI Vector Searchインデックスを使用して類似するビデオを検索します。

検索結果を解析して表示するには、次のヘルパー関数を使用します:

def parse_search_results(raw_results):
    try:
        data_array = raw_results['result']['data_array']
        columns = [col['name'] for col in raw_results['manifest']['columns']]
        return [dict(zip(columns, row)) for row in data_array]
    except KeyError:
        print("Unexpected result format:", raw_results)
        return []

では、これらをすべて組み合わせて、サンプル検索を実行してみましょう:

# Example usage
query = "A dragon"
raw_results = similarity_search(query)

# Parse and print the search results
search_results = parse_search_results(raw_results)
if search_results:
    print(f"Top {len(search_results)} videos similar to the query: '{query}'")
    for i, result in enumerate(search_results, 1):
        print(f"{i}. Title: {result.get('title', 'N/A')}, URL: {result.get('url', 'N/A')}, Similarity Score: {result.get('score', 'N/A')}")
else:
    print("No valid search results returned.")

このコードは、Twelve Labsの類似性検索関数を使用して「A dragon」というクエリに関連するビデオを検索する方法を示しています。その後、検索結果を解析し、ユーザーフレンドリーな形式で表示します。

覚えておくべき重要事項:

  1. get_text_embedding関数は、ビデオ埋め込みと同じTwelve Labsモデルを使用するため、互換性が保証されます。

  2. similarity_search関数は、テキストから埋め込みへの変換とVector Searchを組み合わせて、類似するビデオを見つけます。

  3. ネットワークの問題やAPIの変更が検索プロセスに影響を与える可能性があるため、エラーハンドリングは不可欠です。

  4. parse_search_results関数は、生のAPI応答をより扱いやすい形式に変換するのに役立ちます。

  5. similarity_search関数の num_resultsパラメータを調整して、返される結果の数を制御できます。

この実装により、ビデオデータセット全体の強力なセマンティック検索機能が有効になります。ユーザーは、Twelve Labs Embed APIによって生成された豊かなマルチモーダル埋め込みを活用して、自然言語クエリを使用して関連するビデオを見つけることができるようになります。

ステップ 6: ビデオ推奨システムを構築する

次に、Twelve Labs Embed APIとDatabricks Mosaic AI Vector Searchによって生成されたマルチモーダル埋め込みを使用して、基本的なビデオ推奨システムを作成します。このシステムは、埋め込みの類似性に基づいて、指定されたビデオに類似したビデオを提示します。

まず、簡単な推奨機能を実装します:

def get_video_recommendations(video_id, num_recommendations=5):
    # Initialize the Vector Search client
    mosaic_client = VectorSearchClient()

    # First, retrieve the embedding for the given video_id
    source_df = spark.table("videos_source_embeddings")
    video_embedding = source_df.filter(f"id = {video_id}").select("embedding").first()

    if not video_embedding:
        print(f"No video found with id: {video_id}")
        return []

    # Perform similarity search using the video's embedding
    try:
        results = index.similarity_search(
            query_vector=video_embedding["embedding"],
            num_results=num_recommendations + 1,  # +1 to account for the input video
            columns=["id", "url", "title"]
        )
        
        # Parse the results
        recommendations = parse_search_results(results)
        
        # Remove the input video from recommendations if present
        recommendations = [r for r in recommendations if r.get('id') != video_id]
        
        return recommendations[:num_recommendations]
    except Exception as e:
        print(f"Error during recommendation: {e}")
        return []

# Helper function to display recommendations
def display_recommendations(recommendations):
    if recommendations:
        print(f"Top {len(recommendations)} recommended videos:")
        for i, video in enumerate(recommendations, 1):
            print(f"{i}. Title: {video.get('title', 'N/A')}")
            print(f"   URL: {video.get('url', 'N/A')}")
            print(f"   Similarity Score: {video.get('score', 'N/A')}")
            print()
    else:
        print("No recommendations found.")

# Example usage
video_id = 1  # Assuming this is a valid video ID in your dataset
recommendations = get_video_recommendations(video_id)
display_recommendations(recommendations)

この実装は以下の仕組みで動きます:

  1. get_video_recommendations関数は、ビデオIDと返すべき推奨ビデオの件数を受け取ります。

  2. 指定されたビデオの埋め込みをソースDeltaテーブルから取得します。

  3. この埋め込みを使用して類似性検索を実行し、最も類似しているビデオを見つけます。

  4. 同じビデオが推奨されないように、結果から入力ビデオを削除します(結果に含まれる場合)。

  5. display_recommendationsヘルパー関数は、推奨ビデオのリストをユーザーフレンドリーな形で整形し、出力します。

この推奨システムを使用するには:

  1. videos_source_embeddingsテーブルに、有効な埋め込みを持つビデオが保存されていることを確認します。

  2. データセット内の有効なビデオIDを指定して、get_video_recommendations関数を呼び出します。

  3. 関数は類似度に基づいた推奨ビデオのリストを表示します。

この基本的な推奨システムは、コンテンツベースのビデオ推奨においてマルチモーダル埋め込みをどのように活用するかを示しています。これは、いくつかの方法で拡張および改善できます:

  • パーソナライズされた推奨を行うために、ユーザーの好みや視聴履歴を組み込みます。

  • 多様な推奨結果を保証するために、偶発性メカニズムを実装します。

  • ビデオメタデータ(ジャンル、長さ、投稿日など)に基づくフィルタリングを追加します。

  • 頻繁にリクエストされる推奨ビデオのパフォーマンスを向上させるために、キャッシュメカニズムを実装します。

推奨の品質は、ビデオデータセットの規模と多様性、さらにはTwelve Labs Embed APIが生成する埋め込みの正確性に依存することに注意してください。システムに登録するビデオを増やすにつれて、推奨結果はさらに高い関連性と多様性を持つようになります。

この統合を次のレベルに引き上げる

インデックスの更新と同期

ビデオライブラリが拡大し進化するにつれて、Vector Searchインデックスを最新の状態に維持することが極めて重要です。Mosaic AI Vector SearchはソースDeltaテーブルとのシームレスな同期を提供し、推奨結果や検索結果に常に最新データを反映させることができます。

インデックス更新と同期における重要な考慮事項:

  1. 増分更新Delta Lakeの変更データフィード(Change Data Feed)を活用し、インデックス内の変更されたレコード、または新しいレコードのみを効率的に更新します。

  2. 定期同期のスケジュール:Databricksのワークフローオーケストレーションツールを使用し、定期的な同期ジョブを実行してインデックスの鮮度を維持します。

  3. リアルタイム更新:時間に極めてシリズなアプリケーションでは、Databricks Mosaic AIのストリーミング機能を使用した準リアルタイムのインデックス更新の実装を検討してください。

  4. バージョン管理:Delta Lakeのタイムトラベル機能を利用してインデックスの複数のバージョンを維持し、必要に応じて簡単にロールバックできるようにします。

  5. 同期ステータスの監視:同期の成功を追跡し、更新処理での問題をすばやく特定するために、ログ記録やアラートの仕組みを実装します。

これらの手法を習得することで、TwelveLabsのビデオ埋め込みを常に最新に保ち、高度な検索と推奨のユースケースにいつでも対応できるようにします。

パフォーマンスとスケーリングの最適化

自身のビデオ分析パイプラインが成長するにつれ、パフォーマンスの最適化を継続し、ソリューションをスケールアップさせることが重要になります。Databricksの分散コンピューティング機能は、Twelve Labsの効率的な埋め込み生成と連携することで、大規模なビデオ処理タスクの処理に耐えうる堅牢な基盤を提供します。

ソリューションを最適化しスケーリングするための主なアプローチ:

  1. 分散処理:DatabricksのSparkクラスターを活用して、複数のノード間で埋め込みの生成タスクやインデックス構築タスクを並列化します。

  2. キャッシュ戦略:アクセス頻度が高い埋め込みに対してインテリジェントなキャッシュ処理を導入し、APIの呼び出しを減らして応答時間を短縮します。

  3. バッチ処理:大規模なビデオライブラリに対しては、ピーク時以外の時間帯に埋め込みを生成してインデックスを更新するバッチ処理ワークフローを実装します。

  4. クエリの最適化num_resultsなどのパラメータを調整し、効率的なフィルタリング手法を駆使して、Vector Searchクエリを微調整します。

  5. インデックスのパーティショニング:超大規模なデータセットの場合は、クエリのレスポンスを高め、よりきめ細かい更新を可能にするために、インデックスのパーティショニング戦略を調査します。

  6. オートスケーリング:Databricksのオートスケーリング機能を利用し、ワークロードの処理需要に合わせてコンピューティングリソースを動的に調整します。

  7. エッジコンピューティング:遅延(レイテンシ)を気にするアプリケーションの場合は、モデルの軽量版をデータソースの近くにデプロイすることを検討します。

これらの最適化手法を導入することで、優れたパフォーマンスとコスト効率を両立させながら、増え続けるビデオライブラリと増加するユーザー需要に対処することが可能になります。

モニタリングと分析

ビデオ理解パイプラインの継続的な成功を収めるためには、堅牢な監視(モニタリング)とデータ分析の仕組みが欠かせません。Databricksは、システムのパフォーマンス、ユーザー行動、ビジネスインパクトを追跡するための強力なツールを提供しています。

監視と分析で重点を置くべき主な領域:

  1. パフォーマンスメトリクス:クエリの遅延、埋め込み生成までの時間、インデックス同期にかかる時間などの主要業績評価指標(KPI)を追跡します。

  2. 利用ログの分析:ユーザーの操作履歴、よく使われる検索クエリ、頻繁にレコメンドされるビデオを分析し、ユーザー行動の洞察を得ます。

  3. 品質評価:システムによる自動評価指標と実際のユーザーフィードバックを活用し、検索結果や推奨結果の関連度を評価するフィードバックループを構築します。

  4. リソース使用状況:コンピューティングリソース、APIコール数、ストレージ利用量を継続的に監視し、コストとパフォーマンスを最適化します。

  5. エラー追跡:パイプライン内で発生した不具合を迅速に検知・解決するために、包括的なログ記録と警告システム(アラート)を整備します。

  6. A/Bテスト:Databricksの検証機能を活用し、さまざまな埋め込みモデル、検索アルゴリズム、またはレコメンドモデルを試験評価(テスト)します。

  7. ビジネス目標の検証:ビデオ解析により実現した機能を、ユーザーの滞在状況(エンゲージメント)、視聴時間、成約率などの最終的なKGIに紐付けます。

  8. 法規制・規約遵守:ビデオ処理処理パイプラインが個人情報保護規制やコンテンツモデレーションのガイドラインに適合しているかを継続的に確認します。

これらすべての監視手法を施すことで、ビデオパイプラインの働きとその成果について価値の高い情報を手にすることができます。このアプローチに基づく改善を絶えず繰り返すことで、Twelve Labsの高度なビデオ構造化機能とDatabricks Data Intelligence Platformの統合価値を、データに基づいて実証できるようになります。

まとめ

Twelve Labs と Databricks Mosaic AI は、洗練された映像コンテンツの構造化と、高度なデータ抽出タスクのための一貫した統合開発フレームワークを提供します。この統合技術により、マルチモーダルなビデオ表現ベクトルの抽出と実効性の高いベクトル検索(Vector Search)を活用し、開発者は独自のビデオ解析、レコメンド、検索システムを作り上げることが可能になります。

このチュートリアルでは、開発に必要な環境構築から、マルチモーダル情報のベクトル展開、ベクトルエンジンへの登録、類似コンテンツ検索、レコメンデーションの実装に必要な基本ロジックを解説しました。さらに実用に必要なスケーラブルな管理システム設計、最適化手法、信頼性向上のための監視方法まで包括的に言及しています。

映像データが爆発的に増加する現在、それらをシステムに最適化させて価値を効率よく発掘する術(すべ)は不可欠です。本ドキュメントは、この難解なプロジェクトを克服するための強力な設計方法論を提供するものです。ぜひそれらの機能をテストし、応用的課題を検討しながら、映像理解を進化させる開発コミュニティを共に盛り上げましょう。

さらに学びたい方のための情報ソース

これらの連携方法をさらに深く探索するために、以下の情報をぜひご一読ください。

  1. Twelve Labs 公式技術ドキュメント

  2. Databricks Vector Search に関する公式ヘルプ

  3. Databricks コミュニティ・フォーラム

  4. Twelve Labs Discord コミュニティ

これらのお役立ち情報を手元に置くことで、最先端のAIビデオテクノロジーを活用し、Twelve Labs と Databricks をベースにした独自のイノベーティブなシステムの構築を進めていきましょう。

概要

Twelve LabsのEmbed APIを使用すると、開発者は高度なビデオ理解のユースケースを動かすためのマルチモーダル埋め込みを取得できます。これには、セマンティックビデオ検索やデータキュレーションから、コンテンツ推奨、ビデオRAGシステムまでが含まれます。

Twelve Labsを使用すると、ビデオ内の視覚的表現、ボディランゲージ、話し言葉、および全体的な文脈の間の関係を捉える文脈ベクトル表現を生成できます。Databricks Mosaic AIのVector Searchは、高次元ベクトルのインデックス作成とクエリ実行のための、堅牢でスケーラブルなインフラストラクチャを提供します。

このブログ記事では、これらの相補的な技術を活用して、ビデオAIアプリケーションの新たな可能性を切り拓く方法を説明します。

このチュートリアルの作成にあたり、ご協力いただいたDatabricksのNina Williams氏、Austin Zaccor氏、Fernanda Heredia氏、およびEmily Hutson氏に深く感謝いたします!

なぜ Twelve Labs + Databricks Mosaic AI なのか?

Twelve LabsのEmbed APIをDatabricks Mosaic AI Vector Searchと統合することで、大規模なビデオデータセットの効率的な処理や、正確なマルチモーダルコンテンツの表現といった、ビデオAIにおける主要な課題に対応できます。この統合により、高度なビデオアプリケーションの開発時間と必要なリソースを削減し、膨大なビデオライブラリ全体にわたる複雑なクエリを可能にするとともに、ワークフロー全体の効率を向上させます。

マルチモーダルデータを処理するための統合されたアプローチは特に注目に値します。テキスト、画像、音声の分析のために別々のモデルを使い分ける代わりに、ビデオコンテンツの本質を丸ごと捉える、単一の一貫した表現を扱えるようになります。これにより、展開アーキテクチャが簡素化されるだけでなく、洗練されたコンテンツ推奨システムから高度なビデオ検索エンジン、自動コンテンツモデレーションツールまで、よりニュアンスに富んだ文脈対応のアプリケーションが実現します。

さらに、この統合によりDatabricksエコシステムの機能が拡張され、ビデオ理解を既存のデータパイプラインや機械学習ワークフローにシームレスに組み込めるようになります。企業がリアルタイムのビデオ分析を開発している場合でも、大規模なコンテンツ分類システムを構築している場合でも、あるいは生成AIの新しいアプリケーションを模索している場合でも、この組み合わせソリューションは強力な基盤を提供します。これにより、ビデオAIで実現可能な価値の限界を押し広げ、メディアやエンターテインメントからセキュリティ、ヘルスケアに至るまでの業界において、イノベーションと問題解決の新しい道を切り拓きます。

Twelve Labs の Embed API を理解する

Twelve LabsのEmbed APIは、ビデオコンテンツ専用に設計されたマルチモーダル埋め込み技術における大きな進歩を表しています。フレームごとの分析や異なるモダリティごとの個別のモデルに依存する従来のアプローチとは異なり、このAPIは、ビデオ内の視覚的表現、ボディランゲージ、話し言葉、および全体的な文脈の複雑な相互作用を捉える文脈ベクトル表現を生成します。これは、当社の最先端のマルチモーダル基盤モデル Marengo-2.6をベースに構築されています。



Embed APIは、ビデオデータを扱うAIエンジニアにとって特に強力となるいくつかの重要な機能を提供します。第一に、ビデオに存在するあらゆるモダリティに柔軟に対応できるため、テキスト専用や画像専用の個別のモデルを用意する必要がありません。第二に、動き、アクション、時系列の情報を考慮するビデオネイティブなアプローチを採用しており、ビデオコンテンツのより正確で時系列的に一貫した解釈を保証します。最後に、すべてのモダリティからの埋め込みを統合する統一されたベクトル空間を作成し、ビデオコンテンツのより包括的な理解を促進します。

AIエンジニアにとって、Embed APIはビデオ理解タスクにおいて新しい可能性を切り拓きます。これにより、より高度なコンテンツ分析、向上したセマンティック検索機能、および強化された推奨システムが可能になります。文脈に沿った微細な合図や異なるモダリティ間の長時間の相互作用を捉えるAPIの能力は、感情認識、文脈を考慮したコンテンツモデレーション、高度なビデオ検索システムなど、ビデオコンテンツのニュアンス豊かな理解を必要とするアプリケーションで特に価値を発揮します。

前提条件

Twelve LabsのEmbed APIをDatabricks Mosaic AI Vector Searchに統合する前に、以下の前提条件を満たしていることを確認してください。

  1. ワークスペースの作成と管理のアクセス権を持つDatabricksアカウント。(無料トライアルは https://databricks.com/try-databricks から登録できます)

  2. Pythonプログラミングおよび基本的なデータサイエンスの概念についての知識。

  3. Twelve LabsのAPIキー。(playground.twelvelabs.io で登録してください)

  4. ベクトル埋め込みと類似性検索の概念に関する基本的な理解。

  5. (オプション)AWS上でDatabricksを使用する場合はAWSアカウント。AzureまたはGoogle Cloud上でDatabricksを使用する場合は不要です。

注意:Embed APIは現在プライベートベータ版ですが、すべてのユーザーがこのフォームに記入するだけでアクセスをリクエストできます。通常、数時間以内に確認メールが届き、Embed APIの使用を開始できるようになります。

ステップ 1: 環境のセットアップ

まず、Databricks環境をセットアップし、必要なライブラリをインストールします。

1 - 新しいDatabricksワークスペースを作成する:

2 - 新しいクラスターを作成するか、既存のクラスターに接続する:

このアプリケーションには、ほぼすべてのMLクラスターが機能します。以下の設定は、最適なコストパフォーマンスを求める方向けに提供されています。

  • 「Compute」タブで「Create compute」をクリックします。

  • 「Single node」を選択し、Runtimeには「14.3 LTS ML non-GPU」を選択します。

    • クラスターのポリシーとアクセスモードはデフォルトのままで構いません。

  • ノードタイプとして「r6i.xlarge」を選択します。

    • これにより、メモリ使用率を最大化しつつ、コストはAWS上でわずか$0.252/時、Databricksで割引適用前で1.02 DBU/時となります。

    • また、テストした中で最も高速なオプションの一つでした。

  • 他のすべてのオプションはデフォルトのままで構いません。

  • 下部にある「Create compute」をクリックし、ワークスペースに戻ります。

3 - Databricksワークスペースに新しいノートブックを作成する:

  • ワークスペースで「Create」をクリックし、「Notebook」を選択します。

  • ノートブックに名前を付けます(例: "TwelveLabs_MosaicAI_VectorSearch_Integration")

  • デフォルト言語としてPythonを選択します。

4 - Twelve Labs および Mosaic AI Vector Search の SDK をインストールする:

ノートブックの最初のセルで、次のコマンドを実行します。

%pip install twelvelabs databricks-vectorsearch

5 - Twelve Labs の認証を設定する:

次のセルに、以下のコードを追加します。

from twelvelabs import TwelveLabs
import os

# Retrieve the API key from Databricks secrets (recommended)
# You'll need to set up the secret scope and add your API key first
TWELVE_LABS_API_KEY = dbutils.secrets.get(scope="your-scope", key="twelvelabs-api-key")

if TWELVE_LABS_API_KEY is None:
    raise ValueError("TWELVE_LABS_API_KEY environment variable is not set")

# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

注意:セキュリティを強化するため、APIキーをハードコードしたり環境変数を使用したりするのではなく、Databricksのシークレット機能を使用して保存することをお勧めします。

ステップ 2: マルチモーダル埋め込みを生成する

提供されているgenerate_embedding関数を使用して、Twelve Labs Embed APIでマルチモーダル埋め込みを生成します。この関数は、Databricks内のSpark DataFrameで効率的に動作するようにPandasユーザー定義関数(UDF)として設計されています。これは、埋め込みタスクの作成、進行状況の監視、および結果の取得のプロセスをカプセル化しています。

次に、ビデオURLを文字列として受け取り、Twelve Labs Embed APIを呼び出して配列<float>を返すprocess_url関数を作成します。

以下に実装方法と使用方法を示します。

UDFの定義:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType
from twelvelabs.models.embed import EmbeddingsTask
import pandas as pd

@pandas_udf(ArrayType(FloatType()))
def get_video_embeddings(urls: pd.Series) -> pd.Series:
    def generate_embedding(video_url):
        twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)
        task = twelvelabs_client.embed.task.create(
            engine_name="Marengo-retrieval-2.6",
            video_url=video_url
        )
        task.wait_for_done()
        task_result = twelvelabs_client.embed.task.retrieve(task.id)
        embeddings = []
        for v in task_result.video_embeddings:
            embeddings.append({
                'embedding': v.embedding.float,
                'start_offset_sec': v.start_offset_sec,
                'end_offset_sec': v.end_offset_sec,
                'embedding_scope': v.embedding_scope
            })
        return embeddings

    def process_url(url):
        embeddings = generate_embedding(url)
        return embeddings[0]['embedding'] if embeddings else None

    return urls.apply(process_url)

ビデオURLを含むサンプルDataFrameの作成:

video_urls = [
    "https://example.com/video1.mp4",
    "https://example.com/video2.mp4",
    "https://example.com/video3.mp4"
]
df = spark.createDataFrame([(url,) for url in video_urls], ["video_url"])

埋め込みを生成するためのUDFの適用:

df_with_embeddings = df.withColumn("embedding", get_video_embeddings(df.video_url))

結果の表示:

df_with_embeddings.show(truncate=False)

このプロセスにより、DataFrame内の各ビデオURLに対して、視覚情報、音声情報、およびテキスト情報を含む、ビデオコンテンツのマルチモーダルな本質を捉えたマルチモーダル埋め込みが生成されます。

大規模なビデオデータセットの場合、埋め込みの生成は計算負荷が高く、時間がかかることがある点に注意してください。本番スケールのアプリケーションでは、バッチ処理や分散処理の戦略を実装することを検討してください。さらに、APIエラーやネットワークの問題を処理するために、適切なエラーハンドリングとログ記録が導入されていることを確認してください。

ステップ 3: ビデオ埋め込み用の Delta テーブルを作成する

次に、ビデオメタデータと、Twelve Labs Embed APIによって生成された埋め込みを保存するためのソースDeltaテーブルを作成します。このテーブルは、Databricks Mosaic AI Vector SearchにおけるVector Searchインデックスの基礎として機能します。

まず、ビデオのURLとメタデータを含むソースDataFrameを作成します:

from pyspark.sql import Row

# Create a list of sample video URLs and metadata
video_data = [
Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ElephantsDream.mp4', title='Elephant Dream'), 

Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/Sintel.mp4', title='Sintel'),

Row(url='http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4', title='Big Buck Bunny')
]

# Create a DataFrame from the list
source_df = spark.createDataFrame(video_data)
source_df.show()

次に、SQLを使用してDeltaテーブルのスキーマを定義します:

%sql
CREATE TABLE IF NOT EXISTS videos_source_embeddings (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  title STRING,
  embedding ARRAY<FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true)

テーブルでChange Data Feedが有効になっていることに注意してください。これは、Vector Searchインデックスを作成および維持するために非常に重要です。

では、先ほど定義したget_video_embeddings関数を使用して、ビデオの埋め込みを生成します:

embeddings_df = source_df.withColumn("embedding", get_video_embeddings("url"))

このステップは、ビデオの数や長さによっては少し時間がかかる場合があります。

埋め込みが生成されたら、データをDeltaテーブルに書き込みます:

embeddings_df.write.mode("append").saveAsTable("videos_source_embeddings")

最後に、埋め込みを含むDataFrameを表示して、データを確認します:

display(embeddings_df)

この手順により、Vector Search機能のための強力な基盤が構築されます。Deltaテーブルは自動的にVector Searchインデックスと同期されるため、ビデオデータセットに対する更新や追加は即座に検索結果に反映されます。

覚えておくべきいくつかの重要なポイント:

  • id列は自動生成され、各ビデオに一意の識別子を提供します。

  • embedding列は、Twelve Labs Embed APIによって生成された各ビデオの高次元ベクトル表現を格納します。

  • Change Data Feedを有効にすることで、Databricksはテーブル内の変更を効率的に追跡できます。これは、最新のVector Searchインデックスを維持するために不可欠です。

ステップ 4: Mosaic AI Vector Search を設定する

このステップでは、ビデオ埋め込みを処理できるようにDatabricks Mosaic AI Vector Searchをセットアップします。これには、Vector Searchエンドポイントと、videos_source_embeddings Deltaテーブルと自動的に同期するDelta Sync Indexの作成が含まれます。

まず、Vector Searchエンドポイントを作成します:

from databricks.vector_search.client import VectorSearchClient

# Initialize the Vector Search client and name the endpoint
mosaic_client = VectorSearchClient()
endpoint_name = "twelve_labs_video_endpoint"

# Delete the existing endpoint if it exists
try:
    mosaic_client.delete_endpoint(endpoint_name)
    print(f"Deleted existing endpoint: {endpoint_name}")
except Exception:
    pass  # Ignore non-existing endpoints

# Create the new endpoint
endpoint = mosaic_client.create_endpoint(
    name=endpoint_name,
    endpoint_type="STANDARD"
)

このコードは、新しいVector Searchエンドポイントを作成するか、同じ名前の既存のエンドポイントを置き換えます。このエンドポイントは、Vector Search操作用のアクセスポイントとして機能します。

次に、videos_source_embeddings Deltaテーブルと自動的に同期するDelta Sync Indexを作成します:

# Define the source table name and index name
source_table_name = "twelvelabs.default.videos_source_embeddings"
index_name = "twelvelabs.default.video_embeddings_index"

index = mosaic_client.create_delta_sync_index(
    endpoint_name="twelve_labs_video_endpoint",
    source_table_name=source_table_name,
    index_name=index_name,
    primary_key="id",
    embedding_dimension=1024,
    embedding_vector_column="embedding",
    pipeline_type="TRIGGERED"
)

print(f"Created index: {index.name}")

このコードは、ソースDeltaテーブルにリンクするDelta Sync Indexを作成します。ソーステーブルに加えられた変更から数秒以内にインデックスを自動的に更新したい場合(Vector Searchの結果が常に最新であることを保証する)、pipeline_type="CONTINUOUS"を設定します。

インデックスが作成され、正しく同期されていることを確認するには、次のコードを使用して手動で同期を呼び出します:

# Check the status of the index; this may take some time
index_status = mosaic_client.get_index(
    endpoint_name="twelve_labs_video_endpoint",
    index_name="twelvelabs.default.video_embeddings_index"
)
print(f"Index status: {index_status}")

# Manually trigger the index sync
try:
    index.sync()
    print("Index sync triggered successfully.")
except Exception as e:
    print(f"Error triggering index sync: {str(e)}")

このコードを使用すると、インデックスのステータスを確認し、必要に応じて手動で同期を行うことができます。本番環境では、ソースDeltaテーブルの変更に基づいて自動的に同期するようにパイプラインを設定する方が好ましいでしょう。

覚えておくべき重要事項:

  1. Vector Searchエンドポイントは、Vector Search操作のアクセスポイントとして機能します。

  2. Delta Sync IndexはソースDeltaテーブルと自動的に同期し、常に最新の検索結果を保証します。

  3. embedding_dimensionは、Twelve LabsのEmbed APIによって生成された埋め込みの次元数(1024)に一致させる必要があります。

  4. primary_keyは「id」に設定します。これはソーステーブルの一意の識別子に対応している必要があります。

  5. embedding_vector_columnは「embedding」に設定します。これはソーステーブル内のビデオ埋め込みが含まれる列名と一致する必要があります。

ステップ 5: 類似性検索を実装する

次のステップは、設定されたMosaic AI Vector SearchインデックスとTwelve Labs Embed APIを使用して類似性検索機能を実装することです。これにより、マルチモーダル埋め込みのパワーを活用し、指定したテキストクエリに類似したビデオを見つけることができます。

まず、Twelve Labs Embed APIを使用して、テキストクエリの埋め込みを取得する関数を定義します:

def get_text_embedding(text_query):
    # Twelve Labs Embed API supports text-to-embedding
    text_embedding = twelvelabs_client.embed.create(
      engine_name="Marengo-retrieval-2.6",
      text=text_query,
      text_truncate="start"
    )

    return text_embedding.text_embedding.float

この関数はテキストクエリを受け取り、ビデオ埋め込みと同じモデルを使用してその埋め込みを生成し、ベクトル空間での互換性を確保します。

次に、類似性検索関数を実装します:

def similarity_search(query_text, num_results=5):
    # Initialize the Vector Search client and get the query embedding
    mosaic_client = VectorSearchClient()
    query_embedding = get_text_embedding(query_text)

    print(f"Query embedding generated: {len(query_embedding)} dimensions")

    # Perform the similarity search
    results = index.similarity_search(
        query_vector=query_embedding,
        num_results=num_results,
        columns=["id", "url", "title"]
    )
    return results

この関数は、テキストクエリと返される結果の数を受け取ります。クエリの埋め込みを生成し、Mosaic AI Vector Searchインデックスを使用して類似するビデオを検索します。

検索結果を解析して表示するには、次のヘルパー関数を使用します:

def parse_search_results(raw_results):
    try:
        data_array = raw_results['result']['data_array']
        columns = [col['name'] for col in raw_results['manifest']['columns']]
        return [dict(zip(columns, row)) for row in data_array]
    except KeyError:
        print("Unexpected result format:", raw_results)
        return []

では、これらをすべて組み合わせて、サンプル検索を実行してみましょう:

# Example usage
query = "A dragon"
raw_results = similarity_search(query)

# Parse and print the search results
search_results = parse_search_results(raw_results)
if search_results:
    print(f"Top {len(search_results)} videos similar to the query: '{query}'")
    for i, result in enumerate(search_results, 1):
        print(f"{i}. Title: {result.get('title', 'N/A')}, URL: {result.get('url', 'N/A')}, Similarity Score: {result.get('score', 'N/A')}")
else:
    print("No valid search results returned.")

このコードは、Twelve Labsの類似性検索関数を使用して「A dragon」というクエリに関連するビデオを検索する方法を示しています。その後、検索結果を解析し、ユーザーフレンドリーな形式で表示します。

覚えておくべき重要事項:

  1. get_text_embedding関数は、ビデオ埋め込みと同じTwelve Labsモデルを使用するため、互換性が保証されます。

  2. similarity_search関数は、テキストから埋め込みへの変換とVector Searchを組み合わせて、類似するビデオを見つけます。

  3. ネットワークの問題やAPIの変更が検索プロセスに影響を与える可能性があるため、エラーハンドリングは不可欠です。

  4. parse_search_results関数は、生のAPI応答をより扱いやすい形式に変換するのに役立ちます。

  5. similarity_search関数の num_resultsパラメータを調整して、返される結果の数を制御できます。

この実装により、ビデオデータセット全体の強力なセマンティック検索機能が有効になります。ユーザーは、Twelve Labs Embed APIによって生成された豊かなマルチモーダル埋め込みを活用して、自然言語クエリを使用して関連するビデオを見つけることができるようになります。

ステップ 6: ビデオ推奨システムを構築する

次に、Twelve Labs Embed APIとDatabricks Mosaic AI Vector Searchによって生成されたマルチモーダル埋め込みを使用して、基本的なビデオ推奨システムを作成します。このシステムは、埋め込みの類似性に基づいて、指定されたビデオに類似したビデオを提示します。

まず、簡単な推奨機能を実装します:

def get_video_recommendations(video_id, num_recommendations=5):
    # Initialize the Vector Search client
    mosaic_client = VectorSearchClient()

    # First, retrieve the embedding for the given video_id
    source_df = spark.table("videos_source_embeddings")
    video_embedding = source_df.filter(f"id = {video_id}").select("embedding").first()

    if not video_embedding:
        print(f"No video found with id: {video_id}")
        return []

    # Perform similarity search using the video's embedding
    try:
        results = index.similarity_search(
            query_vector=video_embedding["embedding"],
            num_results=num_recommendations + 1,  # +1 to account for the input video
            columns=["id", "url", "title"]
        )
        
        # Parse the results
        recommendations = parse_search_results(results)
        
        # Remove the input video from recommendations if present
        recommendations = [r for r in recommendations if r.get('id') != video_id]
        
        return recommendations[:num_recommendations]
    except Exception as e:
        print(f"Error during recommendation: {e}")
        return []

# Helper function to display recommendations
def display_recommendations(recommendations):
    if recommendations:
        print(f"Top {len(recommendations)} recommended videos:")
        for i, video in enumerate(recommendations, 1):
            print(f"{i}. Title: {video.get('title', 'N/A')}")
            print(f"   URL: {video.get('url', 'N/A')}")
            print(f"   Similarity Score: {video.get('score', 'N/A')}")
            print()
    else:
        print("No recommendations found.")

# Example usage
video_id = 1  # Assuming this is a valid video ID in your dataset
recommendations = get_video_recommendations(video_id)
display_recommendations(recommendations)

この実装は以下の仕組みで動きます:

  1. get_video_recommendations関数は、ビデオIDと返すべき推奨ビデオの件数を受け取ります。

  2. 指定されたビデオの埋め込みをソースDeltaテーブルから取得します。

  3. この埋め込みを使用して類似性検索を実行し、最も類似しているビデオを見つけます。

  4. 同じビデオが推奨されないように、結果から入力ビデオを削除します(結果に含まれる場合)。

  5. display_recommendationsヘルパー関数は、推奨ビデオのリストをユーザーフレンドリーな形で整形し、出力します。

この推奨システムを使用するには:

  1. videos_source_embeddingsテーブルに、有効な埋め込みを持つビデオが保存されていることを確認します。

  2. データセット内の有効なビデオIDを指定して、get_video_recommendations関数を呼び出します。

  3. 関数は類似度に基づいた推奨ビデオのリストを表示します。

この基本的な推奨システムは、コンテンツベースのビデオ推奨においてマルチモーダル埋め込みをどのように活用するかを示しています。これは、いくつかの方法で拡張および改善できます:

  • パーソナライズされた推奨を行うために、ユーザーの好みや視聴履歴を組み込みます。

  • 多様な推奨結果を保証するために、偶発性メカニズムを実装します。

  • ビデオメタデータ(ジャンル、長さ、投稿日など)に基づくフィルタリングを追加します。

  • 頻繁にリクエストされる推奨ビデオのパフォーマンスを向上させるために、キャッシュメカニズムを実装します。

推奨の品質は、ビデオデータセットの規模と多様性、さらにはTwelve Labs Embed APIが生成する埋め込みの正確性に依存することに注意してください。システムに登録するビデオを増やすにつれて、推奨結果はさらに高い関連性と多様性を持つようになります。

この統合を次のレベルに引き上げる

インデックスの更新と同期

ビデオライブラリが拡大し進化するにつれて、Vector Searchインデックスを最新の状態に維持することが極めて重要です。Mosaic AI Vector SearchはソースDeltaテーブルとのシームレスな同期を提供し、推奨結果や検索結果に常に最新データを反映させることができます。

インデックス更新と同期における重要な考慮事項:

  1. 増分更新Delta Lakeの変更データフィード(Change Data Feed)を活用し、インデックス内の変更されたレコード、または新しいレコードのみを効率的に更新します。

  2. 定期同期のスケジュール:Databricksのワークフローオーケストレーションツールを使用し、定期的な同期ジョブを実行してインデックスの鮮度を維持します。

  3. リアルタイム更新:時間に極めてシリズなアプリケーションでは、Databricks Mosaic AIのストリーミング機能を使用した準リアルタイムのインデックス更新の実装を検討してください。

  4. バージョン管理:Delta Lakeのタイムトラベル機能を利用してインデックスの複数のバージョンを維持し、必要に応じて簡単にロールバックできるようにします。

  5. 同期ステータスの監視:同期の成功を追跡し、更新処理での問題をすばやく特定するために、ログ記録やアラートの仕組みを実装します。

これらの手法を習得することで、TwelveLabsのビデオ埋め込みを常に最新に保ち、高度な検索と推奨のユースケースにいつでも対応できるようにします。

パフォーマンスとスケーリングの最適化

自身のビデオ分析パイプラインが成長するにつれ、パフォーマンスの最適化を継続し、ソリューションをスケールアップさせることが重要になります。Databricksの分散コンピューティング機能は、Twelve Labsの効率的な埋め込み生成と連携することで、大規模なビデオ処理タスクの処理に耐えうる堅牢な基盤を提供します。

ソリューションを最適化しスケーリングするための主なアプローチ:

  1. 分散処理:DatabricksのSparkクラスターを活用して、複数のノード間で埋め込みの生成タスクやインデックス構築タスクを並列化します。

  2. キャッシュ戦略:アクセス頻度が高い埋め込みに対してインテリジェントなキャッシュ処理を導入し、APIの呼び出しを減らして応答時間を短縮します。

  3. バッチ処理:大規模なビデオライブラリに対しては、ピーク時以外の時間帯に埋め込みを生成してインデックスを更新するバッチ処理ワークフローを実装します。

  4. クエリの最適化num_resultsなどのパラメータを調整し、効率的なフィルタリング手法を駆使して、Vector Searchクエリを微調整します。

  5. インデックスのパーティショニング:超大規模なデータセットの場合は、クエリのレスポンスを高め、よりきめ細かい更新を可能にするために、インデックスのパーティショニング戦略を調査します。

  6. オートスケーリング:Databricksのオートスケーリング機能を利用し、ワークロードの処理需要に合わせてコンピューティングリソースを動的に調整します。

  7. エッジコンピューティング:遅延(レイテンシ)を気にするアプリケーションの場合は、モデルの軽量版をデータソースの近くにデプロイすることを検討します。

これらの最適化手法を導入することで、優れたパフォーマンスとコスト効率を両立させながら、増え続けるビデオライブラリと増加するユーザー需要に対処することが可能になります。

モニタリングと分析

ビデオ理解パイプラインの継続的な成功を収めるためには、堅牢な監視(モニタリング)とデータ分析の仕組みが欠かせません。Databricksは、システムのパフォーマンス、ユーザー行動、ビジネスインパクトを追跡するための強力なツールを提供しています。

監視と分析で重点を置くべき主な領域:

  1. パフォーマンスメトリクス:クエリの遅延、埋め込み生成までの時間、インデックス同期にかかる時間などの主要業績評価指標(KPI)を追跡します。

  2. 利用ログの分析:ユーザーの操作履歴、よく使われる検索クエリ、頻繁にレコメンドされるビデオを分析し、ユーザー行動の洞察を得ます。

  3. 品質評価:システムによる自動評価指標と実際のユーザーフィードバックを活用し、検索結果や推奨結果の関連度を評価するフィードバックループを構築します。

  4. リソース使用状況:コンピューティングリソース、APIコール数、ストレージ利用量を継続的に監視し、コストとパフォーマンスを最適化します。

  5. エラー追跡:パイプライン内で発生した不具合を迅速に検知・解決するために、包括的なログ記録と警告システム(アラート)を整備します。

  6. A/Bテスト:Databricksの検証機能を活用し、さまざまな埋め込みモデル、検索アルゴリズム、またはレコメンドモデルを試験評価(テスト)します。

  7. ビジネス目標の検証:ビデオ解析により実現した機能を、ユーザーの滞在状況(エンゲージメント)、視聴時間、成約率などの最終的なKGIに紐付けます。

  8. 法規制・規約遵守:ビデオ処理処理パイプラインが個人情報保護規制やコンテンツモデレーションのガイドラインに適合しているかを継続的に確認します。

これらすべての監視手法を施すことで、ビデオパイプラインの働きとその成果について価値の高い情報を手にすることができます。このアプローチに基づく改善を絶えず繰り返すことで、Twelve Labsの高度なビデオ構造化機能とDatabricks Data Intelligence Platformの統合価値を、データに基づいて実証できるようになります。

まとめ

Twelve Labs と Databricks Mosaic AI は、洗練された映像コンテンツの構造化と、高度なデータ抽出タスクのための一貫した統合開発フレームワークを提供します。この統合技術により、マルチモーダルなビデオ表現ベクトルの抽出と実効性の高いベクトル検索(Vector Search)を活用し、開発者は独自のビデオ解析、レコメンド、検索システムを作り上げることが可能になります。

このチュートリアルでは、開発に必要な環境構築から、マルチモーダル情報のベクトル展開、ベクトルエンジンへの登録、類似コンテンツ検索、レコメンデーションの実装に必要な基本ロジックを解説しました。さらに実用に必要なスケーラブルな管理システム設計、最適化手法、信頼性向上のための監視方法まで包括的に言及しています。

映像データが爆発的に増加する現在、それらをシステムに最適化させて価値を効率よく発掘する術(すべ)は不可欠です。本ドキュメントは、この難解なプロジェクトを克服するための強力な設計方法論を提供するものです。ぜひそれらの機能をテストし、応用的課題を検討しながら、映像理解を進化させる開発コミュニティを共に盛り上げましょう。

さらに学びたい方のための情報ソース

これらの連携方法をさらに深く探索するために、以下の情報をぜひご一読ください。

  1. Twelve Labs 公式技術ドキュメント

  2. Databricks Vector Search に関する公式ヘルプ

  3. Databricks コミュニティ・フォーラム

  4. Twelve Labs Discord コミュニティ

これらのお役立ち情報を手元に置くことで、最先端のAIビデオテクノロジーを活用し、Twelve Labs と Databricks をベースにした独自のイノベーティブなシステムの構築を進めていきましょう。