パートナーシップ

ビデオの干し草の山から針を探す:TwelveLabs Marengo、Amazon Bedrock、Elasticsearchによるインテリジェントなビデオ検索の構築

ジェームズ・リー

開発者は、Amazon Bedrock上のTwelve Labs Marengo 2.7モデルをElasticsearchと統合し、ビデオセグメントから1,024次元のマルチモーダル埋め込み(embeddings)を生成して5つのベクトルインデックスタイプに保存し、Elasticsearchのk-NN検索を通じて自然言語でクエリを実行することにより、セマンティックビデオ検索アプリケーションを構築できます。

開発者は、Amazon Bedrock上のTwelve Labs Marengo 2.7モデルをElasticsearchと統合し、ビデオセグメントから1,024次元のマルチモーダル埋め込み(embeddings)を生成して5つのベクトルインデックスタイプに保存し、Elasticsearchのk-NN検索を通じて自然言語でクエリを実行することにより、セマンティックビデオ検索アプリケーションを構築できます。

この記事の内容

No headings found on page

ニュースレターに登録する

ニュースレターに登録する

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

AIを活用してビデオを検索、分析、探索します。

2025/09/20

13分

記事へのリンクをコピー

このチュートリアルの基礎となるアプリケーションを構築してくださったElastic社のDave Erickson氏に大いに感謝いたします。Elastic Search Labsブログの彼の投稿をご覧ください: https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock


TwelveLabsのモデルがAmazon Bedrockで利用可能になり、デベロッパーはデータの完全な制御を維持しながら、高度なビデオAIアプリケーションを構築できるようになりました。このチュートリアルでは、当社のMarengo 2.7モデルとElasticsearchのベクトル検索機能を組み合わせることで、セマンティックビデオ検索システムを作成する方法を示します。


マルチモーダルビデオの課題

従来のビデオ検索は、メタデータ、文字起こし、または手動でタグ付けされたコンテンツに依存していました。しかし、音声による会話や説明的なテキストがまったくない状態で、「恐竜の登場するシーン」や「チームワークを示す瞬間」を見つけたい場合はどうなるでしょうか?個別の画像モデルと音声テキスト化機能を使用する標準的なアプローチでは、ビデオをユニークなものにしている時間的ダイナミクスを見落としてしまいます。

TwelveLabs Marengo 2.7は、ビデオを連続的なマルチモーダルストリームとして理解し、個々のフレームに表示されるものだけでなく、視覚要素、音声の手がかり、そして動きのパターンが時間の経過とともにどのように連携して機能するかを捉えることで、これを解決します。


TwelveLabs Marengoモデルの理解

具体的には、Marengoは異なるメディアタイプ間で一貫した1024次元のベクトル表現を生成する、当社の最先端のマルチモーダル埋め込み(Embedding)モデルです。これらの埋め込みはコンテンツのセマンティックな本質を捉え、直感的なクロスモーダル検索機能を可能にします。Marengoを使用してビデオを埋め込むと、各セグメントがベクトルに変換され、テキストクエリ、画像、または他のビデオから生成された埋め込みと比較できるようになります。


ビデオ検索になぜElasticsearchなのか?

Elasticsearchは、これらのベクトル埋め込みを大規模に保存して検索するための理想的なプラットフォームを提供します。そのベクトル検索機能により、数千から数百万ものビデオセグメントにわたって、効率的に類似度検索を実行できます。TwelveLabsの埋め込みとElasticsearchの検索インフラの組み合わせは、ビデオコンテンツの発見性やアクセシビリティを高めたいと考えている企業にとって強力なソリューションとなります。


アーキテクチャの概要

私たちのソリューションは、主に4つのステージでビデオコンテンツを処理します:

  1. ビデオの取り込み:トレーラー動画をダウンロードし、S3にアップロード

  2. 非同期埋め込み生成:スケーラブルなベクトル作成のために Bedrock の start_async_invoke を使用

  3. ベクトルインデックス処理:複数の量子化オプションを使用して、埋め込みをElasticsearchに保存

  4. リアルタイム検索:k-NNリトリーバルのためにテキストクエリをベクトルに変換


完全なワークフロー


1 - 環境のセットアップ

開始する前に、以下が準備されていることを確認してください:

import os, json, time, copy
import boto3, botocore
import yt_dlp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pathlib import Path
from dotenv import load_dotenv
import tqdm

# Configuration
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
MARENGO_MODEL_ID = "twelvelabs.marengo-embed-2-7-v1:0"  
TEXT_EMBEDDING_MODEL_ID = "us.twelvelabs.marengo-embed-2-7-v1:0"
ELASTICSEARCH_ENDPOINT = os.getenv("ELASTICSEARCH_ENDPOINT")
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")

# Test dataset: 2025 summer blockbuster trailers
videos = [
    "https://www.youtube.com/watch?v=VWqJifMMgZE",  # Lilo and Stitch 2025
    "https://www.youtube.com/watch?v=Ox8ZLF6cGM0",  # Superman 2025
    "https://www.youtube.com/watch?v=jan5CFWs9ic",  # Jurassic World Rebirth
    "https://www.youtube.com/watch?v=qpoBjOg5RHU",  # Fantastic Four: First Steps
    "https://www.youtube.com/watch?v=22w7z_lT6YM",  # How to Train Your Dragon
]

上記のコードは、Amazon Bedrock上のTwelveLabs MarengoモデルとElasticsearchを使用して、ビデオ検索ソリューションを構築するための基礎を準備します。

  • これは必要なインポート、設定変数、および2025年公開予定の映画予告編のテストデータセットを設定します。

  • 主要なコンポーネントには、AWS認証情報の設定、Elasticsearchエンドポイント情報、ビデオとテキスト両方の埋め込み生成用モデルID、サンプル動画のYouTubeリンクが含まれます。

  • この初期設定により、チュートリアルの残りの部分で詳しく説明されるビデオ処理、埋め込み生成、およびベクトル検索の実装に必要な環境が整います。


2 - データ管理クラス

 VideoIntelligence クラスは、動画のすべてのメタデータ、ファイルパス、および埋め込みデータをカプセル化します:

class VideoIntelligence:
    def __init__(self, url, platform, video_id):
        self.url = url
        self.platform = platform  
        self.video_id = video_id
        self.video_string = f"{self.platform}_{self.video_id}"
        self.base_path = f"{DATA_PATH}/videos/{self.video_string}"
        
        self.video_path = None
        self.s3_key = None
        self.metadata = None
        self.title = None
        self.embeddings_list = None
    
    def get_video_object(self):
        """Return indexable document structure"""
        return {
            "url": self.url,
            "platform": self.platform, 
            "video_id": self.video_id,
            "title": self.title
        }
    
    # Additional getters/setters omitted for brevity

VideoIntelligenceクラスは、検索システムにおけるビデオ処理のすべての側面を管理するための、包括的なデータコンテナとして機能します。

  • 動画のソースURL、プラットフォーム情報、一意の識別子に加え、ローカルおよびS3ストレージの関連ファイルパスを保存します。

  • また、動画のメタデータ、タイトル情報、およびTwelveLabs Marengoモデルによって生成される重要な埋め込みリストへの参照も保持します。

  • get_video_object()メソッドを通じて、Elasticsearchにビデオ情報をインデックス登録するための構造化されたフォーマットを提供し、取り込みから埋め込み生成、検索可能なインデックス登録までの動画のライフサイクル全体の管理を容易にします。


3 - ビデオのダウンロードと処理

次のステップでは、YouTubeビデオをダウンロードし、いくつかのビデオ処理を行います:

def get_video(video: VideoIntelligence):
    """Download video using yt-dlp and extract metadata"""
    base_directory = Path(video.get_base_path())
    base_directory.mkdir(parents=True, exist_ok=True)
    
    video_path = f"{video.get_base_path()}/{video.get_video_string()}.mp4"
    metadata_path = f"{video.get_base_path()}/metadata.json"
    
    ydl_opts = {
        "format": "bestvideo+bestaudio/best",
        "outtmpl": video_path,
        "merge_output_format": "mp4"
    }
    
    if not os.path.exists(video_path):
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            metadata = ydl.extract_info(video.url, download=False)
            ydl.download([video.url])
        with open(metadata_path, "w") as f:
            json.dump(metadata, f)
    else:
        metadata = json.load(open(metadata_path, "r"))
    
    video.set_metadata(metadata) 
    video.set_video_path(video_path)

# Process all videos
video_objects = []
for video_str in videos:
    if "youtube.com" in video_str:
        platform = "youtube"
        video_id = video_str.split("v=")[1] 
        video_objects.append(VideoIntelligence(video_str, platform, video_id))

for video_object in video_objects:
    get_video(video_object)

こちらのコードは、以下の主要な機能を用いてビデオ処理のワークフローを処理します:

  • get_video関数は、yt-dlpを使用してYouTubeから動画をダウンロードし、ローカルに保存して、メタデータを抽出します。

  • 必要なディレクトリを作成し、適切なファイルパスを設定し、既存の動画を再度ダウンロードするのを防ぐための条件付きダウンロードを処理します。

  • 次に、コードはリスト内の各YouTube動画に対してVideoIntelligenceオブジェクトを初期化し、プラットフォーム情報と動画IDを記録します。

  • 最後に、get_videoを呼び出すことで各動画を処理し、オブジェクトにダウンロードされた動画パスとメタデータを格納します。


4 - 重複除去機能付きS3アップロード

次のステップでは、これらの動画をS3バケットにアップロードします:

# Initialize AWS clients
session = boto3.session.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
    region_name=AWS_REGION
)

aws_account_id = session.client('sts').get_caller_identity()["Account"]
s3_client = session.client('s3')
bedrock_client = session.client("bedrock-runtime", region_name=AWS_REGION)

# Upload videos to S3 with existence checking
for video_object in video_objects:
    video_path = video_object.get_video_path()
    s3_key = f"videos/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
    video_object.set_s3_key(s3_key)
    
    try:
        s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
        print(f"Video {video_object.get_video_string()} already exists in S3")
        continue
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Uploading {video_object.get_video_string()} to S3...")
            s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
            print(f"Successfully uploaded {video_object.get_video_string()}")

このコードブロックは、AWSセッションを初期化し、S3に接続し、Bedrockサービスとの認証を行うことで、ビデオ処理のためのAWS環境を構築します。これは、組み込みの重複除去機能を用いて、インテリジェントに動画をS3にアップロードします。アップロードする前に、各動画がバケット内にすでに存在するかどうかを確認し、無駄なストレージ消費と処理を防ぎます。

各ビデオオブジェクトに対して、プラットフォームと動画IDに基づいて標準化されたS3キーを構築し、既存の電子ビデオをスキップするか、新しいものをアップロードします。この土台により、その後に続く、生の動画を検索可能なコンテンツに変換する埋め込み生成およびベクトル検索処理が可能になります。


5 - 非同期での動画からの埋め込み生成

TwelveLabsでは、さまざまなコンテンツタイプに対応できるように埋め込みモデルを設計しています。ビデオを処理する際、Marengoはコンテンツを有意なセグメントに自動的に分割し、視覚的な情報と記述的な情報の両方を捉えた埋め込みを生成します。これらの埋め込みにより、自然言語のクエリを使用して特定の瞬間を検索することが可能になります。

この例における主要な革新は、スケーラブルな埋め込み生成のために、Bedrockの非同期呼び出しパターンを使用している点です:

def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, 
                             invocation_arn: str, verbose: bool = False) -> list:
    """Poll Bedrock async job until completion and retrieve results"""
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            tqdm.tqdm.write(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            return json.loads(content).get("data", [])
    
    raise Exception("No output.json found in S3 prefix")

def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
    """Start async Marengo embedding job for video in S3"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {"uri": video_s3_uri, "bucketOwner": aws_account_id}
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'}
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    return wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)

def check_existing_embedding(video_id: str):
    """Check S3 for cached embeddings to avoid re-processing"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
        if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
            # Load existing embeddings from S3
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('output.json'):
                    output_key = obj['Key']
                    obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
                    content = obj['Body'].read().decode('utf-8')
                    return json.loads(content).get("data", [])
        return None
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return None
        raise e

# Generate embeddings with caching
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
    video_id = video_object.get_video_id()
    video_uri = f"s3://{S3_BUCKET_NAME}/{video_object.get_s3_key()}"
    
    # Check for existing embeddings first
    retrieved_embeddings = check_existing_embedding(video_id)
    if retrieved_embeddings:
        video_object.set_embeddings_list(retrieved_embeddings)
    else:
        embedding_data = create_video_embedding(video_uri, video_id)
        video_object.set_embeddings_list(embedding_data)

このコードはコアとなるビデオ埋め込み生成機能を実装しており、スケーラブルな処理のためにAmazon Bedrockの非同期APIを効果的に活用しています。この実装には3つの主要な関数が含まれています:

  1. wait_for_embedding_output は、非同期ジョブのステータスを監視し、完了時にS3から結果を取得します

  2. create_video_embedding は、S3に保存されているビデオファイルに対する埋め込み生成ジョブを開始します、そして

  3. check_existing_embedding は、重複する処理を避けるためのキャッシュロジックを実装しています。

メインループは、組み込みの重複排除機能を使用して各ビデオを処理し、キャッシュされた埋め込みを取得するか、必要に応じて新しい埋め込みを生成します。このアプローチにより、リソースの使用が最適化され、検索アプリケーション向けの高スループットなビデオ処理が可能になります。


6 - Marengo出力構造の理解

各ビデオは、1,024次元の埋め込みを持つ約6秒のチャンクに分割されます:

# Preview embedding structure 
video_embedding_data = video_objects[0].get_embeddings_list()
for i, embedding in enumerate(video_embedding_data[:3]):
    print(f"{i}")
    for key in embedding:
        if "embedding" == key:
            print(f"\t{key}: len {len(embedding[key])}")
        else:
            print(f"\t{key}: {embedding[key]}")

# Output:
# 0
#   embedding: len 1024
#   embeddingOption: visual-text  
#   startSec: 0.0
#   endSec: 6.199999809265137
# 1 
#   embedding: len 1024
#   embeddingOption: visual-text
#   startSec: 6.199999809265137  
#   endSec: 10.399999618530273

このコードサンプルは、TwelveLabs Marengoモデルによって生成されたビデオ埋め込みの構造を検査する方法を示しています。

  • 最初のビデオオブジェクトから埋め込みデータを取得し、最初の3つの埋め込みチャンクの詳細を出力します。

  • 各埋め込みは、約6秒のビデオコンテンツセグメントを表す1024次元のベクトルであり、埋め込みタイプ(visual-text)や正確なタイムスタンプ(startSec および endSec)などのメタデータが含まれています。

  • この構造により、ビデオコンテンツ全体に対して細粒度のセマンティック検索が可能になり、ユーザーは自然言語のクエリを使って特定の瞬間を見つけることができます。


7 - ベクトル検索向けのElasticsearchの設定

埋め込みを取得したら、それらを効率的に保存して検索できるようにElasticsearchを設定する必要があります。Elasticsearchはベクトル検索用にいくつかのインデックスタイプを提供しており、それぞれ検索速度、精度、ストレージ要件の間で異なるトレードオフがあります。

TwelveLabsの1024次元埋め込みに最適化されたインデックスのセットアップ方法は以下の通りです:

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[ELASTICSEARCH_ENDPOINT],
    api_key=ELASTICSEARCH_API_KEY
)

# Prepare documents for indexing
docs = []
for video_object in video_objects:
    persist_object = video_object.get_video_object()
    embeddings = video_object.get_embeddings_list()
    
    for embedding in embeddings:
        if embedding["embeddingOption"] == "visual-image":  # Filter for visual embeddings
            doc = copy.deepcopy(persist_object) 
            doc["embedding"] = embedding["embedding"]
            doc["start_sec"] = embedding["startSec"] 
            doc["end_sec"] = embedding["endSec"]
            docs.append(doc)

# Create indices for different vector search methods
index_varieties = [
    "flat",       # Brute force, highest accuracy
    "hnsw",       # Hierarchical navigable small world graph
    "int8_hnsw",  # Quantized for efficiency
    "bbq_hnsw",   # Better Binary Quantization with HNSW
    "bbq_flat"    # BBQ with flat search
]

for index_variety in index_varieties:
    index_name = f"twelvelabs-movie-trailer-{index_variety}"
    mappings = {
        "properties": {
            "url": {"type": "keyword"},
            "platform": {"type": "keyword"},
            "video_id": {"type": "keyword"},
            "title": {"type": "text", "analyzer": "standard"},
            "embedding": {
                "type": "dense_vector", 
                "dims": 1024,
                "similarity": "cosine",
                "index_options": {
                    "type": index_variety
                }
            },
            "start_sec": {"type": "float"},
            "end_sec": {"type": "float"}
        }
    }
    
    # Recreate index if exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        time.sleep(2)
    
    es.indices.create(index=index_name, mappings=mappings)
    print(f"Index '{index_name}' created successfully")
    
    # Bulk insert documents
    actions = [{"_index": index_name, "_source": doc} for doc in docs]
    success, failed = bulk(es, actions, chunk_size=100, max_retries=3)
    print(f"Successfully indexed {success} documents into {index_name}")

このコードは、TwelveLabsの1024次元埋め込み用に最適化された5つの異なるインデックスタイプ(flat、HNSW、および量子化されたバイリアント)を作成することにより、ベクトル検索用のElasticsearchを設定します。Elasticsearchに接続し、埋め込みを持つビデオセグメントドキュメントを準備し、密なベクトルに適したマッピングを持つ特別なインデックスを作成し、ドキュメントを一括アップロードします。各インデックスタイプは検索精度、速度、およびストレージ要件の間で異なるトレードオフを提供し、デベロッパーは特定のユースケースの要件に基づいて最適なアプローチを選択できます。


8 - 検索用テキスト埋め込みの作成

TwelveLabsのテクノロジーの最も強力な側面の1つは、自然言語を使用してビデオコンテンツを検索できる点です。これを可能にするために、ビデオを処理したのと同じMarengo 2.7モデルを使用してテキスト埋め込みを生成し、それらが同じベクトル空間に存在するようにします:

# Generate text embedding for search
def create_text_embedding(text_query: str) -> list:
    """Generate text embedding using Marengo via Bedrock"""
    text_model_input = {"inputType": "text", "inputText": text_query}
    
    response = bedrock_client.invoke_model(
        modelId=TEXT_EMBEDDING_MODEL_ID,
        body=json.dumps(text_model_input)
    )
    
    response_body = json.loads(response['body'].read().decode('utf-8'))
    embedding_data = response_body.get("data", [])
    
    return embedding_data[0]["embedding"] if embedding_data else None

このコードは、Amazon Bedrockを介してTwelveLabs Marengoモデルを使用し、テキストクエリのベクトル表現(埋め込み)を生成する create_text_embedding という関数を定義しています。この関数は、テキストクエリ文字列を入力として受け取り、モデルに適した形式にフォーマットし、同期型の invoke_model APIを使用してBedrockにリクエストを送信し、応答を解析して1024次元の埋め込みベクトルを抽出します。これらのテキスト埋め込みはビデオ埋め込みと同じベクトル空間を共有しているため、自然言語の説明に基づいて動画内の瞬間を見つけ出せるセマンティック類似度検索が可能になります。


9 - ベクトル検索の実行

準備が整いましたので、自然言語クエリを使用してビデオライブラリを検索できます。以下は、最も関連性の高いビデオセグメントを見つける検索関数の実装方法です:

def vector_query(index_name: str, text_query: str) -> dict:
    """Execute k-NN vector search against Elasticsearch"""
    query_embedding = create_text_embedding(text_query)
    
    query = {
        "retriever": {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": 10,
                "num_candidates": "25"  
            }
        },
        "size": 10,
        "_source": False,
        "fields": ["title", "video_id", "start_sec"]
    }
    
    return es.search(index=index_name, body=query).body

# Test search
text_query = "Show me scenes with dinosaurs"  
results = vector_query("twelvelabs-movie-trailer-flat", text_query)
print(results)

このコードは、ビデオコンテンツに対する自然言語クエリを可能にするベクトル検索機能を実装しています。

  • まず、ビデオを処理したのと同じMarengoモデルを使用してクエリからテキスト埋め込みを生成し、ベクトル空間での互換性を確保します。

  • 次に、この関数はElasticsearchのk-NNクエリを構築し、精度の向上のために25個の候補を評価しながら、最も類似した上位10個のビデオセグメントを要求します。

  • 検索結果は、タイトル、動画ID、タイムスタンプなどの関連メタデータを返すため、アプリケーションはクエリのセマンティックの意味に合致する動画内の特定の瞬間に直接リンクできます。


10 - シンプルな検索インターフェースの構築

よりユーザーフレンドリーな体験のために、IPythonのウィジェットを使用してシンプルな検索インターフェースを作成できます。これにより、ユーザーは検索クエリを入力し、動画内の特定の瞬間への直接的なリンクを含む結果を表示できるようになります:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display

def display_search_results_html(query):
    """Format search results as clickable YouTube links"""
    results = vector_query("twelvelabs-movie-trailer-flat", query)
    hits = results.get('hits', {}).get('hits', [])
    
    if not hits:
        return "<p>No results found</p>"
    
    items = []
    for hit in hits:
        fields = hit.get('fields', {})
        title = fields.get('title', ['No Title'])[0]
        score = hit.get('_score', 0) 
        video_id = fields.get('video_id', [''])[0]
        start_sec = fields.get('start_sec', [0])[0]
        
        # Create YouTube deep-link to specific timestamp
        url = f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
        items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')
    
    return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"

def search_videos():
    """Create interactive search widget"""
    search_input = widgets.Text(
        value='', 
        placeholder='Enter your search query…',
        description='Search:',
        layout=Layout(width='70%')
    )
    
    search_button = widgets.Button(
        description='Search Videos',
        button_style='primary',
        layout=Layout(width='20%')
    )
    
    results_box = WHTML(value="")
    
    def on_button_click(_):
        q = search_input.value.strip()
        if not q:
            results_box.value = "<p>Please enter a search query</p>"
            return
        results_box.value = "<p>Searching…</p>"
        results_box.value = display_search_results_html(q)
    
    search_button.on_click(on_button_click)
    display(HBox([search_input, search_button]))
    display(results_box)

search_videos()

このコードは、IPythonのウィジェットを使用して、ユーザーが自然言語のクエリでビデオコンテンツを検索できるインタラクティブな検索インターフェースを作成します。

  • 関連コンテンツが表示される正確なタイムスタンプで動画が開く、クリック可能なYouTubeリンクとして検索結果を表示する関数を定義しています。

  • インターフェースには、クエリ用のテキスト入力フィールド、検索ボタン、および結果表示エリアが含まれます。

  • ユーザーがクエリを入力して検索をクリックすると、システムはクエリからテキスト埋め込みを生成し、Elasticsearchでベクトル類似度検索を実行し、適合クオリティを示すスコアとともに、最もセマンティックに関連性の高い動画セグメントへのリンクを返します。


パフォーマンス分析

テストでは、検索結果に興味深いパターンが示されています。「Show me scenes with dinosaurs(恐竜のシーンを見せて)」というクエリに対して:

  • 最上位結果:『ジュラシック・ワールド/リバース』予告編の134.5秒時点(スコア: 0.6405)

  • 第2適合結果:『ヒックとドラゴン』クリップの121.2秒時点(スコア: 0.6228)

  • 適合度:Marengoは文字通りの恐竜とドラゴンのような生物の両方を正しく特定しています

TwelveLabsでは、高次元の埋め込みを処理する際に、精度とパフォーマンスのバランスを取ることの重要性を理解しています。Elasticsearchは、精度を維持しながら検索のパフォーマンスを劇的に向上させることができる、いくつかの量子化方法を提供しています。

特に、ElasticsearchのBetter Binary Quantization (BBQ)は、TwelveLabsの1024次元埋め込みにとって優れたオプションとして際立っています。BBQは最終的なスコアリング用に元のベクトルを保持しつつ、内部的にバイナリ表現を使用することで、メモリ使用量を削減し、検索処理を高速化します。

TwelveLabsの埋め込みを使用したほとんどの本番環境向けデプロイメントにおいて、以下を推奨します:

  • bbq_hnsw: 速度と精度の両方が重要となる大規模なコレクション(数百万のベクトル)に最適です。

  • flat: 最大限の精度が求められる小規模なコレクションに最適です。


本番環境における考慮事項


本番環境におけるビデオ処理のスケーリング

本番環境にElasticsearchとTwelveLabs Marengoをデプロイする場合、効率的なビデオ処理が非常に重要になります。ワークフローを最適化するために、新しいビデオのアップロードを自動的に処理するS3イベントトリガーを使用した、サーバーレスアーキテクチャの実装を検討してください。このアプローチにより、手動での介入が不要になり、シームレスな取り込みパイプラインが作成されます。さらに、Amazon Bedrockの非同期呼び出し機能を活用して複数の動画を並行して処理し、大規模なメディアライブラリにおける全体的な処理時間を劇的に削減します。

ビデオAIを大規模に運用する際には、コスト管理も同様に重要です。初回の生成後に埋め込みをS3に保存するキャッシュ戦略を導入し、同じコンテンツの重複処理を防ぎます。この手法により、特に頻繁にアクセスされるビデオやElasticsearchデプロイメントの再インデックスを行う際に、APIコストと処理時間を大幅に削減できます。


効果的なインデックス管理

最適な検索パフォーマンスを維持するためには、適切なインデックス管理が不可欠です。次のコードは、ベクトルインデックスに対してライフサイクルポリシーを設定する方法を示しています:

# Clean up indexes when done
for index_variety in index_varieties:
    es.indices.delete(index=f"twelvelabs-movie-trailer-{index_variety}")

# Clean up S3 objects
s3_objects = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
if 'Contents' in s3_objects:
    delete_keys = [{'Key': obj['Key']} for obj in s3_objects['Contents']]
    s3_client.delete_objects(Bucket=S3_BUCKET_NAME, Delete={'Objects': delete_keys})

本番システムでは、削除ではなくインデックスのローテーション戦略の導入を検討してください。時間ベースのインデックス(例:月次)を作成し、エイリアスを使用して一貫した検索エンドポイントを維持しながら、バックグラウンドでの効率的な再インデックスと最適化を可能にします。


高度な検索アプリケーション

TwelveLabs MarengoとElasticsearchの統合の真の力は、高度な検索機能を実装する際に発揮されます。Marengoの特徴であるセマンティックな理解と、Elasticsearchがビデオの文字起こしに対して行うBM25スコアリングを組み合わせた、ハイブリッド検索システムを作成します。このアプローチにより、両方のテクノロジーの強み—Marengoの深い視覚的理解とテキスト検索の精度—のバランスが取れ、より包括的で正確な結果が得られます。

より高度なアプリケーションの場合、MarengoとTwelveLabsのPegasus 1.2モデルを組み合わせることで、マルチモーダルRAG(Retrieval Augmented Generation)ワークフローを構築します。この組み合わせにより、関連するビデオコンテンツを単純に見つけるだけでなく、全体を視聴しなくてもユーザーがより深い洞察を得られる、文脈的な要約を生成できます。

最後に、ベクトル検索に加えてリアルタイムのフィルタリング機能を実装することで、ユーザー体験を向上させます。Elasticsearchはベクトルの類似度とブーリアンフィルターの組み合わせに優れており、地理空間的な制約(例: "ニューヨークの屋外のシーンを見せて")、時間的なフィルター(例: "過去30日間の動画")、およびアクセス制御制限を、Marengoの強力なセマンティック理解を犠牲にすることなく統合できます。\


はじめに

Marengoのビデオ理解機能とElasticsearchのベクトル検索により、テキストを検索するのと同じくらい自然にビデオコンテンツを検索できるようになり、これまでメディアライブラリに隠されていたインサイトを解き放つことができます。

今すぐAmazon BedrockでTwelveLabsモデルへのアクセスをリクエストし、Elasticsearchデプロイメントをセットアップし、コンテンツを真に理解するインテリジェントなビデオアプリケーションの構築を始めましょう。完全なノートブックは Elasticsearch Labs リポジトリで公開されています。


基本リソース

今すぐ始める:

実装ガイド:

デベロッパー向けリソース:

このチュートリアルの基礎となるアプリケーションを構築してくださったElastic社のDave Erickson氏に大いに感謝いたします。Elastic Search Labsブログの彼の投稿をご覧ください: https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock


TwelveLabsのモデルがAmazon Bedrockで利用可能になり、デベロッパーはデータの完全な制御を維持しながら、高度なビデオAIアプリケーションを構築できるようになりました。このチュートリアルでは、当社のMarengo 2.7モデルとElasticsearchのベクトル検索機能を組み合わせることで、セマンティックビデオ検索システムを作成する方法を示します。


マルチモーダルビデオの課題

従来のビデオ検索は、メタデータ、文字起こし、または手動でタグ付けされたコンテンツに依存していました。しかし、音声による会話や説明的なテキストがまったくない状態で、「恐竜の登場するシーン」や「チームワークを示す瞬間」を見つけたい場合はどうなるでしょうか?個別の画像モデルと音声テキスト化機能を使用する標準的なアプローチでは、ビデオをユニークなものにしている時間的ダイナミクスを見落としてしまいます。

TwelveLabs Marengo 2.7は、ビデオを連続的なマルチモーダルストリームとして理解し、個々のフレームに表示されるものだけでなく、視覚要素、音声の手がかり、そして動きのパターンが時間の経過とともにどのように連携して機能するかを捉えることで、これを解決します。


TwelveLabs Marengoモデルの理解

具体的には、Marengoは異なるメディアタイプ間で一貫した1024次元のベクトル表現を生成する、当社の最先端のマルチモーダル埋め込み(Embedding)モデルです。これらの埋め込みはコンテンツのセマンティックな本質を捉え、直感的なクロスモーダル検索機能を可能にします。Marengoを使用してビデオを埋め込むと、各セグメントがベクトルに変換され、テキストクエリ、画像、または他のビデオから生成された埋め込みと比較できるようになります。


ビデオ検索になぜElasticsearchなのか?

Elasticsearchは、これらのベクトル埋め込みを大規模に保存して検索するための理想的なプラットフォームを提供します。そのベクトル検索機能により、数千から数百万ものビデオセグメントにわたって、効率的に類似度検索を実行できます。TwelveLabsの埋め込みとElasticsearchの検索インフラの組み合わせは、ビデオコンテンツの発見性やアクセシビリティを高めたいと考えている企業にとって強力なソリューションとなります。


アーキテクチャの概要

私たちのソリューションは、主に4つのステージでビデオコンテンツを処理します:

  1. ビデオの取り込み:トレーラー動画をダウンロードし、S3にアップロード

  2. 非同期埋め込み生成:スケーラブルなベクトル作成のために Bedrock の start_async_invoke を使用

  3. ベクトルインデックス処理:複数の量子化オプションを使用して、埋め込みをElasticsearchに保存

  4. リアルタイム検索:k-NNリトリーバルのためにテキストクエリをベクトルに変換


完全なワークフロー


1 - 環境のセットアップ

開始する前に、以下が準備されていることを確認してください:

import os, json, time, copy
import boto3, botocore
import yt_dlp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pathlib import Path
from dotenv import load_dotenv
import tqdm

# Configuration
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
MARENGO_MODEL_ID = "twelvelabs.marengo-embed-2-7-v1:0"  
TEXT_EMBEDDING_MODEL_ID = "us.twelvelabs.marengo-embed-2-7-v1:0"
ELASTICSEARCH_ENDPOINT = os.getenv("ELASTICSEARCH_ENDPOINT")
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")

# Test dataset: 2025 summer blockbuster trailers
videos = [
    "https://www.youtube.com/watch?v=VWqJifMMgZE",  # Lilo and Stitch 2025
    "https://www.youtube.com/watch?v=Ox8ZLF6cGM0",  # Superman 2025
    "https://www.youtube.com/watch?v=jan5CFWs9ic",  # Jurassic World Rebirth
    "https://www.youtube.com/watch?v=qpoBjOg5RHU",  # Fantastic Four: First Steps
    "https://www.youtube.com/watch?v=22w7z_lT6YM",  # How to Train Your Dragon
]

上記のコードは、Amazon Bedrock上のTwelveLabs MarengoモデルとElasticsearchを使用して、ビデオ検索ソリューションを構築するための基礎を準備します。

  • これは必要なインポート、設定変数、および2025年公開予定の映画予告編のテストデータセットを設定します。

  • 主要なコンポーネントには、AWS認証情報の設定、Elasticsearchエンドポイント情報、ビデオとテキスト両方の埋め込み生成用モデルID、サンプル動画のYouTubeリンクが含まれます。

  • この初期設定により、チュートリアルの残りの部分で詳しく説明されるビデオ処理、埋め込み生成、およびベクトル検索の実装に必要な環境が整います。


2 - データ管理クラス

 VideoIntelligence クラスは、動画のすべてのメタデータ、ファイルパス、および埋め込みデータをカプセル化します:

class VideoIntelligence:
    def __init__(self, url, platform, video_id):
        self.url = url
        self.platform = platform  
        self.video_id = video_id
        self.video_string = f"{self.platform}_{self.video_id}"
        self.base_path = f"{DATA_PATH}/videos/{self.video_string}"
        
        self.video_path = None
        self.s3_key = None
        self.metadata = None
        self.title = None
        self.embeddings_list = None
    
    def get_video_object(self):
        """Return indexable document structure"""
        return {
            "url": self.url,
            "platform": self.platform, 
            "video_id": self.video_id,
            "title": self.title
        }
    
    # Additional getters/setters omitted for brevity

VideoIntelligenceクラスは、検索システムにおけるビデオ処理のすべての側面を管理するための、包括的なデータコンテナとして機能します。

  • 動画のソースURL、プラットフォーム情報、一意の識別子に加え、ローカルおよびS3ストレージの関連ファイルパスを保存します。

  • また、動画のメタデータ、タイトル情報、およびTwelveLabs Marengoモデルによって生成される重要な埋め込みリストへの参照も保持します。

  • get_video_object()メソッドを通じて、Elasticsearchにビデオ情報をインデックス登録するための構造化されたフォーマットを提供し、取り込みから埋め込み生成、検索可能なインデックス登録までの動画のライフサイクル全体の管理を容易にします。


3 - ビデオのダウンロードと処理

次のステップでは、YouTubeビデオをダウンロードし、いくつかのビデオ処理を行います:

def get_video(video: VideoIntelligence):
    """Download video using yt-dlp and extract metadata"""
    base_directory = Path(video.get_base_path())
    base_directory.mkdir(parents=True, exist_ok=True)
    
    video_path = f"{video.get_base_path()}/{video.get_video_string()}.mp4"
    metadata_path = f"{video.get_base_path()}/metadata.json"
    
    ydl_opts = {
        "format": "bestvideo+bestaudio/best",
        "outtmpl": video_path,
        "merge_output_format": "mp4"
    }
    
    if not os.path.exists(video_path):
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            metadata = ydl.extract_info(video.url, download=False)
            ydl.download([video.url])
        with open(metadata_path, "w") as f:
            json.dump(metadata, f)
    else:
        metadata = json.load(open(metadata_path, "r"))
    
    video.set_metadata(metadata) 
    video.set_video_path(video_path)

# Process all videos
video_objects = []
for video_str in videos:
    if "youtube.com" in video_str:
        platform = "youtube"
        video_id = video_str.split("v=")[1] 
        video_objects.append(VideoIntelligence(video_str, platform, video_id))

for video_object in video_objects:
    get_video(video_object)

こちらのコードは、以下の主要な機能を用いてビデオ処理のワークフローを処理します:

  • get_video関数は、yt-dlpを使用してYouTubeから動画をダウンロードし、ローカルに保存して、メタデータを抽出します。

  • 必要なディレクトリを作成し、適切なファイルパスを設定し、既存の動画を再度ダウンロードするのを防ぐための条件付きダウンロードを処理します。

  • 次に、コードはリスト内の各YouTube動画に対してVideoIntelligenceオブジェクトを初期化し、プラットフォーム情報と動画IDを記録します。

  • 最後に、get_videoを呼び出すことで各動画を処理し、オブジェクトにダウンロードされた動画パスとメタデータを格納します。


4 - 重複除去機能付きS3アップロード

次のステップでは、これらの動画をS3バケットにアップロードします:

# Initialize AWS clients
session = boto3.session.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
    region_name=AWS_REGION
)

aws_account_id = session.client('sts').get_caller_identity()["Account"]
s3_client = session.client('s3')
bedrock_client = session.client("bedrock-runtime", region_name=AWS_REGION)

# Upload videos to S3 with existence checking
for video_object in video_objects:
    video_path = video_object.get_video_path()
    s3_key = f"videos/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
    video_object.set_s3_key(s3_key)
    
    try:
        s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
        print(f"Video {video_object.get_video_string()} already exists in S3")
        continue
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Uploading {video_object.get_video_string()} to S3...")
            s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
            print(f"Successfully uploaded {video_object.get_video_string()}")

このコードブロックは、AWSセッションを初期化し、S3に接続し、Bedrockサービスとの認証を行うことで、ビデオ処理のためのAWS環境を構築します。これは、組み込みの重複除去機能を用いて、インテリジェントに動画をS3にアップロードします。アップロードする前に、各動画がバケット内にすでに存在するかどうかを確認し、無駄なストレージ消費と処理を防ぎます。

各ビデオオブジェクトに対して、プラットフォームと動画IDに基づいて標準化されたS3キーを構築し、既存の電子ビデオをスキップするか、新しいものをアップロードします。この土台により、その後に続く、生の動画を検索可能なコンテンツに変換する埋め込み生成およびベクトル検索処理が可能になります。


5 - 非同期での動画からの埋め込み生成

TwelveLabsでは、さまざまなコンテンツタイプに対応できるように埋め込みモデルを設計しています。ビデオを処理する際、Marengoはコンテンツを有意なセグメントに自動的に分割し、視覚的な情報と記述的な情報の両方を捉えた埋め込みを生成します。これらの埋め込みにより、自然言語のクエリを使用して特定の瞬間を検索することが可能になります。

この例における主要な革新は、スケーラブルな埋め込み生成のために、Bedrockの非同期呼び出しパターンを使用している点です:

def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, 
                             invocation_arn: str, verbose: bool = False) -> list:
    """Poll Bedrock async job until completion and retrieve results"""
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            tqdm.tqdm.write(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            return json.loads(content).get("data", [])
    
    raise Exception("No output.json found in S3 prefix")

def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
    """Start async Marengo embedding job for video in S3"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {"uri": video_s3_uri, "bucketOwner": aws_account_id}
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'}
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    return wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)

def check_existing_embedding(video_id: str):
    """Check S3 for cached embeddings to avoid re-processing"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
        if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
            # Load existing embeddings from S3
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('output.json'):
                    output_key = obj['Key']
                    obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
                    content = obj['Body'].read().decode('utf-8')
                    return json.loads(content).get("data", [])
        return None
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return None
        raise e

# Generate embeddings with caching
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
    video_id = video_object.get_video_id()
    video_uri = f"s3://{S3_BUCKET_NAME}/{video_object.get_s3_key()}"
    
    # Check for existing embeddings first
    retrieved_embeddings = check_existing_embedding(video_id)
    if retrieved_embeddings:
        video_object.set_embeddings_list(retrieved_embeddings)
    else:
        embedding_data = create_video_embedding(video_uri, video_id)
        video_object.set_embeddings_list(embedding_data)

このコードはコアとなるビデオ埋め込み生成機能を実装しており、スケーラブルな処理のためにAmazon Bedrockの非同期APIを効果的に活用しています。この実装には3つの主要な関数が含まれています:

  1. wait_for_embedding_output は、非同期ジョブのステータスを監視し、完了時にS3から結果を取得します

  2. create_video_embedding は、S3に保存されているビデオファイルに対する埋め込み生成ジョブを開始します、そして

  3. check_existing_embedding は、重複する処理を避けるためのキャッシュロジックを実装しています。

メインループは、組み込みの重複排除機能を使用して各ビデオを処理し、キャッシュされた埋め込みを取得するか、必要に応じて新しい埋め込みを生成します。このアプローチにより、リソースの使用が最適化され、検索アプリケーション向けの高スループットなビデオ処理が可能になります。


6 - Marengo出力構造の理解

各ビデオは、1,024次元の埋め込みを持つ約6秒のチャンクに分割されます:

# Preview embedding structure 
video_embedding_data = video_objects[0].get_embeddings_list()
for i, embedding in enumerate(video_embedding_data[:3]):
    print(f"{i}")
    for key in embedding:
        if "embedding" == key:
            print(f"\t{key}: len {len(embedding[key])}")
        else:
            print(f"\t{key}: {embedding[key]}")

# Output:
# 0
#   embedding: len 1024
#   embeddingOption: visual-text  
#   startSec: 0.0
#   endSec: 6.199999809265137
# 1 
#   embedding: len 1024
#   embeddingOption: visual-text
#   startSec: 6.199999809265137  
#   endSec: 10.399999618530273

このコードサンプルは、TwelveLabs Marengoモデルによって生成されたビデオ埋め込みの構造を検査する方法を示しています。

  • 最初のビデオオブジェクトから埋め込みデータを取得し、最初の3つの埋め込みチャンクの詳細を出力します。

  • 各埋め込みは、約6秒のビデオコンテンツセグメントを表す1024次元のベクトルであり、埋め込みタイプ(visual-text)や正確なタイムスタンプ(startSec および endSec)などのメタデータが含まれています。

  • この構造により、ビデオコンテンツ全体に対して細粒度のセマンティック検索が可能になり、ユーザーは自然言語のクエリを使って特定の瞬間を見つけることができます。


7 - ベクトル検索向けのElasticsearchの設定

埋め込みを取得したら、それらを効率的に保存して検索できるようにElasticsearchを設定する必要があります。Elasticsearchはベクトル検索用にいくつかのインデックスタイプを提供しており、それぞれ検索速度、精度、ストレージ要件の間で異なるトレードオフがあります。

TwelveLabsの1024次元埋め込みに最適化されたインデックスのセットアップ方法は以下の通りです:

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[ELASTICSEARCH_ENDPOINT],
    api_key=ELASTICSEARCH_API_KEY
)

# Prepare documents for indexing
docs = []
for video_object in video_objects:
    persist_object = video_object.get_video_object()
    embeddings = video_object.get_embeddings_list()
    
    for embedding in embeddings:
        if embedding["embeddingOption"] == "visual-image":  # Filter for visual embeddings
            doc = copy.deepcopy(persist_object) 
            doc["embedding"] = embedding["embedding"]
            doc["start_sec"] = embedding["startSec"] 
            doc["end_sec"] = embedding["endSec"]
            docs.append(doc)

# Create indices for different vector search methods
index_varieties = [
    "flat",       # Brute force, highest accuracy
    "hnsw",       # Hierarchical navigable small world graph
    "int8_hnsw",  # Quantized for efficiency
    "bbq_hnsw",   # Better Binary Quantization with HNSW
    "bbq_flat"    # BBQ with flat search
]

for index_variety in index_varieties:
    index_name = f"twelvelabs-movie-trailer-{index_variety}"
    mappings = {
        "properties": {
            "url": {"type": "keyword"},
            "platform": {"type": "keyword"},
            "video_id": {"type": "keyword"},
            "title": {"type": "text", "analyzer": "standard"},
            "embedding": {
                "type": "dense_vector", 
                "dims": 1024,
                "similarity": "cosine",
                "index_options": {
                    "type": index_variety
                }
            },
            "start_sec": {"type": "float"},
            "end_sec": {"type": "float"}
        }
    }
    
    # Recreate index if exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        time.sleep(2)
    
    es.indices.create(index=index_name, mappings=mappings)
    print(f"Index '{index_name}' created successfully")
    
    # Bulk insert documents
    actions = [{"_index": index_name, "_source": doc} for doc in docs]
    success, failed = bulk(es, actions, chunk_size=100, max_retries=3)
    print(f"Successfully indexed {success} documents into {index_name}")

このコードは、TwelveLabsの1024次元埋め込み用に最適化された5つの異なるインデックスタイプ(flat、HNSW、および量子化されたバイリアント)を作成することにより、ベクトル検索用のElasticsearchを設定します。Elasticsearchに接続し、埋め込みを持つビデオセグメントドキュメントを準備し、密なベクトルに適したマッピングを持つ特別なインデックスを作成し、ドキュメントを一括アップロードします。各インデックスタイプは検索精度、速度、およびストレージ要件の間で異なるトレードオフを提供し、デベロッパーは特定のユースケースの要件に基づいて最適なアプローチを選択できます。


8 - 検索用テキスト埋め込みの作成

TwelveLabsのテクノロジーの最も強力な側面の1つは、自然言語を使用してビデオコンテンツを検索できる点です。これを可能にするために、ビデオを処理したのと同じMarengo 2.7モデルを使用してテキスト埋め込みを生成し、それらが同じベクトル空間に存在するようにします:

# Generate text embedding for search
def create_text_embedding(text_query: str) -> list:
    """Generate text embedding using Marengo via Bedrock"""
    text_model_input = {"inputType": "text", "inputText": text_query}
    
    response = bedrock_client.invoke_model(
        modelId=TEXT_EMBEDDING_MODEL_ID,
        body=json.dumps(text_model_input)
    )
    
    response_body = json.loads(response['body'].read().decode('utf-8'))
    embedding_data = response_body.get("data", [])
    
    return embedding_data[0]["embedding"] if embedding_data else None

このコードは、Amazon Bedrockを介してTwelveLabs Marengoモデルを使用し、テキストクエリのベクトル表現(埋め込み)を生成する create_text_embedding という関数を定義しています。この関数は、テキストクエリ文字列を入力として受け取り、モデルに適した形式にフォーマットし、同期型の invoke_model APIを使用してBedrockにリクエストを送信し、応答を解析して1024次元の埋め込みベクトルを抽出します。これらのテキスト埋め込みはビデオ埋め込みと同じベクトル空間を共有しているため、自然言語の説明に基づいて動画内の瞬間を見つけ出せるセマンティック類似度検索が可能になります。


9 - ベクトル検索の実行

準備が整いましたので、自然言語クエリを使用してビデオライブラリを検索できます。以下は、最も関連性の高いビデオセグメントを見つける検索関数の実装方法です:

def vector_query(index_name: str, text_query: str) -> dict:
    """Execute k-NN vector search against Elasticsearch"""
    query_embedding = create_text_embedding(text_query)
    
    query = {
        "retriever": {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": 10,
                "num_candidates": "25"  
            }
        },
        "size": 10,
        "_source": False,
        "fields": ["title", "video_id", "start_sec"]
    }
    
    return es.search(index=index_name, body=query).body

# Test search
text_query = "Show me scenes with dinosaurs"  
results = vector_query("twelvelabs-movie-trailer-flat", text_query)
print(results)

このコードは、ビデオコンテンツに対する自然言語クエリを可能にするベクトル検索機能を実装しています。

  • まず、ビデオを処理したのと同じMarengoモデルを使用してクエリからテキスト埋め込みを生成し、ベクトル空間での互換性を確保します。

  • 次に、この関数はElasticsearchのk-NNクエリを構築し、精度の向上のために25個の候補を評価しながら、最も類似した上位10個のビデオセグメントを要求します。

  • 検索結果は、タイトル、動画ID、タイムスタンプなどの関連メタデータを返すため、アプリケーションはクエリのセマンティックの意味に合致する動画内の特定の瞬間に直接リンクできます。


10 - シンプルな検索インターフェースの構築

よりユーザーフレンドリーな体験のために、IPythonのウィジェットを使用してシンプルな検索インターフェースを作成できます。これにより、ユーザーは検索クエリを入力し、動画内の特定の瞬間への直接的なリンクを含む結果を表示できるようになります:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display

def display_search_results_html(query):
    """Format search results as clickable YouTube links"""
    results = vector_query("twelvelabs-movie-trailer-flat", query)
    hits = results.get('hits', {}).get('hits', [])
    
    if not hits:
        return "<p>No results found</p>"
    
    items = []
    for hit in hits:
        fields = hit.get('fields', {})
        title = fields.get('title', ['No Title'])[0]
        score = hit.get('_score', 0) 
        video_id = fields.get('video_id', [''])[0]
        start_sec = fields.get('start_sec', [0])[0]
        
        # Create YouTube deep-link to specific timestamp
        url = f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
        items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')
    
    return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"

def search_videos():
    """Create interactive search widget"""
    search_input = widgets.Text(
        value='', 
        placeholder='Enter your search query…',
        description='Search:',
        layout=Layout(width='70%')
    )
    
    search_button = widgets.Button(
        description='Search Videos',
        button_style='primary',
        layout=Layout(width='20%')
    )
    
    results_box = WHTML(value="")
    
    def on_button_click(_):
        q = search_input.value.strip()
        if not q:
            results_box.value = "<p>Please enter a search query</p>"
            return
        results_box.value = "<p>Searching…</p>"
        results_box.value = display_search_results_html(q)
    
    search_button.on_click(on_button_click)
    display(HBox([search_input, search_button]))
    display(results_box)

search_videos()

このコードは、IPythonのウィジェットを使用して、ユーザーが自然言語のクエリでビデオコンテンツを検索できるインタラクティブな検索インターフェースを作成します。

  • 関連コンテンツが表示される正確なタイムスタンプで動画が開く、クリック可能なYouTubeリンクとして検索結果を表示する関数を定義しています。

  • インターフェースには、クエリ用のテキスト入力フィールド、検索ボタン、および結果表示エリアが含まれます。

  • ユーザーがクエリを入力して検索をクリックすると、システムはクエリからテキスト埋め込みを生成し、Elasticsearchでベクトル類似度検索を実行し、適合クオリティを示すスコアとともに、最もセマンティックに関連性の高い動画セグメントへのリンクを返します。


パフォーマンス分析

テストでは、検索結果に興味深いパターンが示されています。「Show me scenes with dinosaurs(恐竜のシーンを見せて)」というクエリに対して:

  • 最上位結果:『ジュラシック・ワールド/リバース』予告編の134.5秒時点(スコア: 0.6405)

  • 第2適合結果:『ヒックとドラゴン』クリップの121.2秒時点(スコア: 0.6228)

  • 適合度:Marengoは文字通りの恐竜とドラゴンのような生物の両方を正しく特定しています

TwelveLabsでは、高次元の埋め込みを処理する際に、精度とパフォーマンスのバランスを取ることの重要性を理解しています。Elasticsearchは、精度を維持しながら検索のパフォーマンスを劇的に向上させることができる、いくつかの量子化方法を提供しています。

特に、ElasticsearchのBetter Binary Quantization (BBQ)は、TwelveLabsの1024次元埋め込みにとって優れたオプションとして際立っています。BBQは最終的なスコアリング用に元のベクトルを保持しつつ、内部的にバイナリ表現を使用することで、メモリ使用量を削減し、検索処理を高速化します。

TwelveLabsの埋め込みを使用したほとんどの本番環境向けデプロイメントにおいて、以下を推奨します:

  • bbq_hnsw: 速度と精度の両方が重要となる大規模なコレクション(数百万のベクトル)に最適です。

  • flat: 最大限の精度が求められる小規模なコレクションに最適です。


本番環境における考慮事項


本番環境におけるビデオ処理のスケーリング

本番環境にElasticsearchとTwelveLabs Marengoをデプロイする場合、効率的なビデオ処理が非常に重要になります。ワークフローを最適化するために、新しいビデオのアップロードを自動的に処理するS3イベントトリガーを使用した、サーバーレスアーキテクチャの実装を検討してください。このアプローチにより、手動での介入が不要になり、シームレスな取り込みパイプラインが作成されます。さらに、Amazon Bedrockの非同期呼び出し機能を活用して複数の動画を並行して処理し、大規模なメディアライブラリにおける全体的な処理時間を劇的に削減します。

ビデオAIを大規模に運用する際には、コスト管理も同様に重要です。初回の生成後に埋め込みをS3に保存するキャッシュ戦略を導入し、同じコンテンツの重複処理を防ぎます。この手法により、特に頻繁にアクセスされるビデオやElasticsearchデプロイメントの再インデックスを行う際に、APIコストと処理時間を大幅に削減できます。


効果的なインデックス管理

最適な検索パフォーマンスを維持するためには、適切なインデックス管理が不可欠です。次のコードは、ベクトルインデックスに対してライフサイクルポリシーを設定する方法を示しています:

# Clean up indexes when done
for index_variety in index_varieties:
    es.indices.delete(index=f"twelvelabs-movie-trailer-{index_variety}")

# Clean up S3 objects
s3_objects = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
if 'Contents' in s3_objects:
    delete_keys = [{'Key': obj['Key']} for obj in s3_objects['Contents']]
    s3_client.delete_objects(Bucket=S3_BUCKET_NAME, Delete={'Objects': delete_keys})

本番システムでは、削除ではなくインデックスのローテーション戦略の導入を検討してください。時間ベースのインデックス(例:月次)を作成し、エイリアスを使用して一貫した検索エンドポイントを維持しながら、バックグラウンドでの効率的な再インデックスと最適化を可能にします。


高度な検索アプリケーション

TwelveLabs MarengoとElasticsearchの統合の真の力は、高度な検索機能を実装する際に発揮されます。Marengoの特徴であるセマンティックな理解と、Elasticsearchがビデオの文字起こしに対して行うBM25スコアリングを組み合わせた、ハイブリッド検索システムを作成します。このアプローチにより、両方のテクノロジーの強み—Marengoの深い視覚的理解とテキスト検索の精度—のバランスが取れ、より包括的で正確な結果が得られます。

より高度なアプリケーションの場合、MarengoとTwelveLabsのPegasus 1.2モデルを組み合わせることで、マルチモーダルRAG(Retrieval Augmented Generation)ワークフローを構築します。この組み合わせにより、関連するビデオコンテンツを単純に見つけるだけでなく、全体を視聴しなくてもユーザーがより深い洞察を得られる、文脈的な要約を生成できます。

最後に、ベクトル検索に加えてリアルタイムのフィルタリング機能を実装することで、ユーザー体験を向上させます。Elasticsearchはベクトルの類似度とブーリアンフィルターの組み合わせに優れており、地理空間的な制約(例: "ニューヨークの屋外のシーンを見せて")、時間的なフィルター(例: "過去30日間の動画")、およびアクセス制御制限を、Marengoの強力なセマンティック理解を犠牲にすることなく統合できます。\


はじめに

Marengoのビデオ理解機能とElasticsearchのベクトル検索により、テキストを検索するのと同じくらい自然にビデオコンテンツを検索できるようになり、これまでメディアライブラリに隠されていたインサイトを解き放つことができます。

今すぐAmazon BedrockでTwelveLabsモデルへのアクセスをリクエストし、Elasticsearchデプロイメントをセットアップし、コンテンツを真に理解するインテリジェントなビデオアプリケーションの構築を始めましょう。完全なノートブックは Elasticsearch Labs リポジトリで公開されています。


基本リソース

今すぐ始める:

実装ガイド:

デベロッパー向けリソース:

このチュートリアルの基礎となるアプリケーションを構築してくださったElastic社のDave Erickson氏に大いに感謝いたします。Elastic Search Labsブログの彼の投稿をご覧ください: https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock


TwelveLabsのモデルがAmazon Bedrockで利用可能になり、デベロッパーはデータの完全な制御を維持しながら、高度なビデオAIアプリケーションを構築できるようになりました。このチュートリアルでは、当社のMarengo 2.7モデルとElasticsearchのベクトル検索機能を組み合わせることで、セマンティックビデオ検索システムを作成する方法を示します。


マルチモーダルビデオの課題

従来のビデオ検索は、メタデータ、文字起こし、または手動でタグ付けされたコンテンツに依存していました。しかし、音声による会話や説明的なテキストがまったくない状態で、「恐竜の登場するシーン」や「チームワークを示す瞬間」を見つけたい場合はどうなるでしょうか?個別の画像モデルと音声テキスト化機能を使用する標準的なアプローチでは、ビデオをユニークなものにしている時間的ダイナミクスを見落としてしまいます。

TwelveLabs Marengo 2.7は、ビデオを連続的なマルチモーダルストリームとして理解し、個々のフレームに表示されるものだけでなく、視覚要素、音声の手がかり、そして動きのパターンが時間の経過とともにどのように連携して機能するかを捉えることで、これを解決します。


TwelveLabs Marengoモデルの理解

具体的には、Marengoは異なるメディアタイプ間で一貫した1024次元のベクトル表現を生成する、当社の最先端のマルチモーダル埋め込み(Embedding)モデルです。これらの埋め込みはコンテンツのセマンティックな本質を捉え、直感的なクロスモーダル検索機能を可能にします。Marengoを使用してビデオを埋め込むと、各セグメントがベクトルに変換され、テキストクエリ、画像、または他のビデオから生成された埋め込みと比較できるようになります。


ビデオ検索になぜElasticsearchなのか?

Elasticsearchは、これらのベクトル埋め込みを大規模に保存して検索するための理想的なプラットフォームを提供します。そのベクトル検索機能により、数千から数百万ものビデオセグメントにわたって、効率的に類似度検索を実行できます。TwelveLabsの埋め込みとElasticsearchの検索インフラの組み合わせは、ビデオコンテンツの発見性やアクセシビリティを高めたいと考えている企業にとって強力なソリューションとなります。


アーキテクチャの概要

私たちのソリューションは、主に4つのステージでビデオコンテンツを処理します:

  1. ビデオの取り込み:トレーラー動画をダウンロードし、S3にアップロード

  2. 非同期埋め込み生成:スケーラブルなベクトル作成のために Bedrock の start_async_invoke を使用

  3. ベクトルインデックス処理:複数の量子化オプションを使用して、埋め込みをElasticsearchに保存

  4. リアルタイム検索:k-NNリトリーバルのためにテキストクエリをベクトルに変換


完全なワークフロー


1 - 環境のセットアップ

開始する前に、以下が準備されていることを確認してください:

import os, json, time, copy
import boto3, botocore
import yt_dlp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pathlib import Path
from dotenv import load_dotenv
import tqdm

# Configuration
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
MARENGO_MODEL_ID = "twelvelabs.marengo-embed-2-7-v1:0"  
TEXT_EMBEDDING_MODEL_ID = "us.twelvelabs.marengo-embed-2-7-v1:0"
ELASTICSEARCH_ENDPOINT = os.getenv("ELASTICSEARCH_ENDPOINT")
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")

# Test dataset: 2025 summer blockbuster trailers
videos = [
    "https://www.youtube.com/watch?v=VWqJifMMgZE",  # Lilo and Stitch 2025
    "https://www.youtube.com/watch?v=Ox8ZLF6cGM0",  # Superman 2025
    "https://www.youtube.com/watch?v=jan5CFWs9ic",  # Jurassic World Rebirth
    "https://www.youtube.com/watch?v=qpoBjOg5RHU",  # Fantastic Four: First Steps
    "https://www.youtube.com/watch?v=22w7z_lT6YM",  # How to Train Your Dragon
]

上記のコードは、Amazon Bedrock上のTwelveLabs MarengoモデルとElasticsearchを使用して、ビデオ検索ソリューションを構築するための基礎を準備します。

  • これは必要なインポート、設定変数、および2025年公開予定の映画予告編のテストデータセットを設定します。

  • 主要なコンポーネントには、AWS認証情報の設定、Elasticsearchエンドポイント情報、ビデオとテキスト両方の埋め込み生成用モデルID、サンプル動画のYouTubeリンクが含まれます。

  • この初期設定により、チュートリアルの残りの部分で詳しく説明されるビデオ処理、埋め込み生成、およびベクトル検索の実装に必要な環境が整います。


2 - データ管理クラス

 VideoIntelligence クラスは、動画のすべてのメタデータ、ファイルパス、および埋め込みデータをカプセル化します:

class VideoIntelligence:
    def __init__(self, url, platform, video_id):
        self.url = url
        self.platform = platform  
        self.video_id = video_id
        self.video_string = f"{self.platform}_{self.video_id}"
        self.base_path = f"{DATA_PATH}/videos/{self.video_string}"
        
        self.video_path = None
        self.s3_key = None
        self.metadata = None
        self.title = None
        self.embeddings_list = None
    
    def get_video_object(self):
        """Return indexable document structure"""
        return {
            "url": self.url,
            "platform": self.platform, 
            "video_id": self.video_id,
            "title": self.title
        }
    
    # Additional getters/setters omitted for brevity

VideoIntelligenceクラスは、検索システムにおけるビデオ処理のすべての側面を管理するための、包括的なデータコンテナとして機能します。

  • 動画のソースURL、プラットフォーム情報、一意の識別子に加え、ローカルおよびS3ストレージの関連ファイルパスを保存します。

  • また、動画のメタデータ、タイトル情報、およびTwelveLabs Marengoモデルによって生成される重要な埋め込みリストへの参照も保持します。

  • get_video_object()メソッドを通じて、Elasticsearchにビデオ情報をインデックス登録するための構造化されたフォーマットを提供し、取り込みから埋め込み生成、検索可能なインデックス登録までの動画のライフサイクル全体の管理を容易にします。


3 - ビデオのダウンロードと処理

次のステップでは、YouTubeビデオをダウンロードし、いくつかのビデオ処理を行います:

def get_video(video: VideoIntelligence):
    """Download video using yt-dlp and extract metadata"""
    base_directory = Path(video.get_base_path())
    base_directory.mkdir(parents=True, exist_ok=True)
    
    video_path = f"{video.get_base_path()}/{video.get_video_string()}.mp4"
    metadata_path = f"{video.get_base_path()}/metadata.json"
    
    ydl_opts = {
        "format": "bestvideo+bestaudio/best",
        "outtmpl": video_path,
        "merge_output_format": "mp4"
    }
    
    if not os.path.exists(video_path):
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            metadata = ydl.extract_info(video.url, download=False)
            ydl.download([video.url])
        with open(metadata_path, "w") as f:
            json.dump(metadata, f)
    else:
        metadata = json.load(open(metadata_path, "r"))
    
    video.set_metadata(metadata) 
    video.set_video_path(video_path)

# Process all videos
video_objects = []
for video_str in videos:
    if "youtube.com" in video_str:
        platform = "youtube"
        video_id = video_str.split("v=")[1] 
        video_objects.append(VideoIntelligence(video_str, platform, video_id))

for video_object in video_objects:
    get_video(video_object)

こちらのコードは、以下の主要な機能を用いてビデオ処理のワークフローを処理します:

  • get_video関数は、yt-dlpを使用してYouTubeから動画をダウンロードし、ローカルに保存して、メタデータを抽出します。

  • 必要なディレクトリを作成し、適切なファイルパスを設定し、既存の動画を再度ダウンロードするのを防ぐための条件付きダウンロードを処理します。

  • 次に、コードはリスト内の各YouTube動画に対してVideoIntelligenceオブジェクトを初期化し、プラットフォーム情報と動画IDを記録します。

  • 最後に、get_videoを呼び出すことで各動画を処理し、オブジェクトにダウンロードされた動画パスとメタデータを格納します。


4 - 重複除去機能付きS3アップロード

次のステップでは、これらの動画をS3バケットにアップロードします:

# Initialize AWS clients
session = boto3.session.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
    region_name=AWS_REGION
)

aws_account_id = session.client('sts').get_caller_identity()["Account"]
s3_client = session.client('s3')
bedrock_client = session.client("bedrock-runtime", region_name=AWS_REGION)

# Upload videos to S3 with existence checking
for video_object in video_objects:
    video_path = video_object.get_video_path()
    s3_key = f"videos/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
    video_object.set_s3_key(s3_key)
    
    try:
        s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
        print(f"Video {video_object.get_video_string()} already exists in S3")
        continue
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Uploading {video_object.get_video_string()} to S3...")
            s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
            print(f"Successfully uploaded {video_object.get_video_string()}")

このコードブロックは、AWSセッションを初期化し、S3に接続し、Bedrockサービスとの認証を行うことで、ビデオ処理のためのAWS環境を構築します。これは、組み込みの重複除去機能を用いて、インテリジェントに動画をS3にアップロードします。アップロードする前に、各動画がバケット内にすでに存在するかどうかを確認し、無駄なストレージ消費と処理を防ぎます。

各ビデオオブジェクトに対して、プラットフォームと動画IDに基づいて標準化されたS3キーを構築し、既存の電子ビデオをスキップするか、新しいものをアップロードします。この土台により、その後に続く、生の動画を検索可能なコンテンツに変換する埋め込み生成およびベクトル検索処理が可能になります。


5 - 非同期での動画からの埋め込み生成

TwelveLabsでは、さまざまなコンテンツタイプに対応できるように埋め込みモデルを設計しています。ビデオを処理する際、Marengoはコンテンツを有意なセグメントに自動的に分割し、視覚的な情報と記述的な情報の両方を捉えた埋め込みを生成します。これらの埋め込みにより、自然言語のクエリを使用して特定の瞬間を検索することが可能になります。

この例における主要な革新は、スケーラブルな埋め込み生成のために、Bedrockの非同期呼び出しパターンを使用している点です:

def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, 
                             invocation_arn: str, verbose: bool = False) -> list:
    """Poll Bedrock async job until completion and retrieve results"""
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            tqdm.tqdm.write(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            return json.loads(content).get("data", [])
    
    raise Exception("No output.json found in S3 prefix")

def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
    """Start async Marengo embedding job for video in S3"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {"uri": video_s3_uri, "bucketOwner": aws_account_id}
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'}
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    return wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)

def check_existing_embedding(video_id: str):
    """Check S3 for cached embeddings to avoid re-processing"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
        if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
            # Load existing embeddings from S3
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('output.json'):
                    output_key = obj['Key']
                    obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
                    content = obj['Body'].read().decode('utf-8')
                    return json.loads(content).get("data", [])
        return None
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return None
        raise e

# Generate embeddings with caching
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
    video_id = video_object.get_video_id()
    video_uri = f"s3://{S3_BUCKET_NAME}/{video_object.get_s3_key()}"
    
    # Check for existing embeddings first
    retrieved_embeddings = check_existing_embedding(video_id)
    if retrieved_embeddings:
        video_object.set_embeddings_list(retrieved_embeddings)
    else:
        embedding_data = create_video_embedding(video_uri, video_id)
        video_object.set_embeddings_list(embedding_data)

このコードはコアとなるビデオ埋め込み生成機能を実装しており、スケーラブルな処理のためにAmazon Bedrockの非同期APIを効果的に活用しています。この実装には3つの主要な関数が含まれています:

  1. wait_for_embedding_output は、非同期ジョブのステータスを監視し、完了時にS3から結果を取得します

  2. create_video_embedding は、S3に保存されているビデオファイルに対する埋め込み生成ジョブを開始します、そして

  3. check_existing_embedding は、重複する処理を避けるためのキャッシュロジックを実装しています。

メインループは、組み込みの重複排除機能を使用して各ビデオを処理し、キャッシュされた埋め込みを取得するか、必要に応じて新しい埋め込みを生成します。このアプローチにより、リソースの使用が最適化され、検索アプリケーション向けの高スループットなビデオ処理が可能になります。


6 - Marengo出力構造の理解

各ビデオは、1,024次元の埋め込みを持つ約6秒のチャンクに分割されます:

# Preview embedding structure 
video_embedding_data = video_objects[0].get_embeddings_list()
for i, embedding in enumerate(video_embedding_data[:3]):
    print(f"{i}")
    for key in embedding:
        if "embedding" == key:
            print(f"\t{key}: len {len(embedding[key])}")
        else:
            print(f"\t{key}: {embedding[key]}")

# Output:
# 0
#   embedding: len 1024
#   embeddingOption: visual-text  
#   startSec: 0.0
#   endSec: 6.199999809265137
# 1 
#   embedding: len 1024
#   embeddingOption: visual-text
#   startSec: 6.199999809265137  
#   endSec: 10.399999618530273

このコードサンプルは、TwelveLabs Marengoモデルによって生成されたビデオ埋め込みの構造を検査する方法を示しています。

  • 最初のビデオオブジェクトから埋め込みデータを取得し、最初の3つの埋め込みチャンクの詳細を出力します。

  • 各埋め込みは、約6秒のビデオコンテンツセグメントを表す1024次元のベクトルであり、埋め込みタイプ(visual-text)や正確なタイムスタンプ(startSec および endSec)などのメタデータが含まれています。

  • この構造により、ビデオコンテンツ全体に対して細粒度のセマンティック検索が可能になり、ユーザーは自然言語のクエリを使って特定の瞬間を見つけることができます。


7 - ベクトル検索向けのElasticsearchの設定

埋め込みを取得したら、それらを効率的に保存して検索できるようにElasticsearchを設定する必要があります。Elasticsearchはベクトル検索用にいくつかのインデックスタイプを提供しており、それぞれ検索速度、精度、ストレージ要件の間で異なるトレードオフがあります。

TwelveLabsの1024次元埋め込みに最適化されたインデックスのセットアップ方法は以下の通りです:

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[ELASTICSEARCH_ENDPOINT],
    api_key=ELASTICSEARCH_API_KEY
)

# Prepare documents for indexing
docs = []
for video_object in video_objects:
    persist_object = video_object.get_video_object()
    embeddings = video_object.get_embeddings_list()
    
    for embedding in embeddings:
        if embedding["embeddingOption"] == "visual-image":  # Filter for visual embeddings
            doc = copy.deepcopy(persist_object) 
            doc["embedding"] = embedding["embedding"]
            doc["start_sec"] = embedding["startSec"] 
            doc["end_sec"] = embedding["endSec"]
            docs.append(doc)

# Create indices for different vector search methods
index_varieties = [
    "flat",       # Brute force, highest accuracy
    "hnsw",       # Hierarchical navigable small world graph
    "int8_hnsw",  # Quantized for efficiency
    "bbq_hnsw",   # Better Binary Quantization with HNSW
    "bbq_flat"    # BBQ with flat search
]

for index_variety in index_varieties:
    index_name = f"twelvelabs-movie-trailer-{index_variety}"
    mappings = {
        "properties": {
            "url": {"type": "keyword"},
            "platform": {"type": "keyword"},
            "video_id": {"type": "keyword"},
            "title": {"type": "text", "analyzer": "standard"},
            "embedding": {
                "type": "dense_vector", 
                "dims": 1024,
                "similarity": "cosine",
                "index_options": {
                    "type": index_variety
                }
            },
            "start_sec": {"type": "float"},
            "end_sec": {"type": "float"}
        }
    }
    
    # Recreate index if exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        time.sleep(2)
    
    es.indices.create(index=index_name, mappings=mappings)
    print(f"Index '{index_name}' created successfully")
    
    # Bulk insert documents
    actions = [{"_index": index_name, "_source": doc} for doc in docs]
    success, failed = bulk(es, actions, chunk_size=100, max_retries=3)
    print(f"Successfully indexed {success} documents into {index_name}")

このコードは、TwelveLabsの1024次元埋め込み用に最適化された5つの異なるインデックスタイプ(flat、HNSW、および量子化されたバイリアント)を作成することにより、ベクトル検索用のElasticsearchを設定します。Elasticsearchに接続し、埋め込みを持つビデオセグメントドキュメントを準備し、密なベクトルに適したマッピングを持つ特別なインデックスを作成し、ドキュメントを一括アップロードします。各インデックスタイプは検索精度、速度、およびストレージ要件の間で異なるトレードオフを提供し、デベロッパーは特定のユースケースの要件に基づいて最適なアプローチを選択できます。


8 - 検索用テキスト埋め込みの作成

TwelveLabsのテクノロジーの最も強力な側面の1つは、自然言語を使用してビデオコンテンツを検索できる点です。これを可能にするために、ビデオを処理したのと同じMarengo 2.7モデルを使用してテキスト埋め込みを生成し、それらが同じベクトル空間に存在するようにします:

# Generate text embedding for search
def create_text_embedding(text_query: str) -> list:
    """Generate text embedding using Marengo via Bedrock"""
    text_model_input = {"inputType": "text", "inputText": text_query}
    
    response = bedrock_client.invoke_model(
        modelId=TEXT_EMBEDDING_MODEL_ID,
        body=json.dumps(text_model_input)
    )
    
    response_body = json.loads(response['body'].read().decode('utf-8'))
    embedding_data = response_body.get("data", [])
    
    return embedding_data[0]["embedding"] if embedding_data else None

このコードは、Amazon Bedrockを介してTwelveLabs Marengoモデルを使用し、テキストクエリのベクトル表現(埋め込み)を生成する create_text_embedding という関数を定義しています。この関数は、テキストクエリ文字列を入力として受け取り、モデルに適した形式にフォーマットし、同期型の invoke_model APIを使用してBedrockにリクエストを送信し、応答を解析して1024次元の埋め込みベクトルを抽出します。これらのテキスト埋め込みはビデオ埋め込みと同じベクトル空間を共有しているため、自然言語の説明に基づいて動画内の瞬間を見つけ出せるセマンティック類似度検索が可能になります。


9 - ベクトル検索の実行

準備が整いましたので、自然言語クエリを使用してビデオライブラリを検索できます。以下は、最も関連性の高いビデオセグメントを見つける検索関数の実装方法です:

def vector_query(index_name: str, text_query: str) -> dict:
    """Execute k-NN vector search against Elasticsearch"""
    query_embedding = create_text_embedding(text_query)
    
    query = {
        "retriever": {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": 10,
                "num_candidates": "25"  
            }
        },
        "size": 10,
        "_source": False,
        "fields": ["title", "video_id", "start_sec"]
    }
    
    return es.search(index=index_name, body=query).body

# Test search
text_query = "Show me scenes with dinosaurs"  
results = vector_query("twelvelabs-movie-trailer-flat", text_query)
print(results)

このコードは、ビデオコンテンツに対する自然言語クエリを可能にするベクトル検索機能を実装しています。

  • まず、ビデオを処理したのと同じMarengoモデルを使用してクエリからテキスト埋め込みを生成し、ベクトル空間での互換性を確保します。

  • 次に、この関数はElasticsearchのk-NNクエリを構築し、精度の向上のために25個の候補を評価しながら、最も類似した上位10個のビデオセグメントを要求します。

  • 検索結果は、タイトル、動画ID、タイムスタンプなどの関連メタデータを返すため、アプリケーションはクエリのセマンティックの意味に合致する動画内の特定の瞬間に直接リンクできます。


10 - シンプルな検索インターフェースの構築

よりユーザーフレンドリーな体験のために、IPythonのウィジェットを使用してシンプルな検索インターフェースを作成できます。これにより、ユーザーは検索クエリを入力し、動画内の特定の瞬間への直接的なリンクを含む結果を表示できるようになります:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display

def display_search_results_html(query):
    """Format search results as clickable YouTube links"""
    results = vector_query("twelvelabs-movie-trailer-flat", query)
    hits = results.get('hits', {}).get('hits', [])
    
    if not hits:
        return "<p>No results found</p>"
    
    items = []
    for hit in hits:
        fields = hit.get('fields', {})
        title = fields.get('title', ['No Title'])[0]
        score = hit.get('_score', 0) 
        video_id = fields.get('video_id', [''])[0]
        start_sec = fields.get('start_sec', [0])[0]
        
        # Create YouTube deep-link to specific timestamp
        url = f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
        items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')
    
    return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"

def search_videos():
    """Create interactive search widget"""
    search_input = widgets.Text(
        value='', 
        placeholder='Enter your search query…',
        description='Search:',
        layout=Layout(width='70%')
    )
    
    search_button = widgets.Button(
        description='Search Videos',
        button_style='primary',
        layout=Layout(width='20%')
    )
    
    results_box = WHTML(value="")
    
    def on_button_click(_):
        q = search_input.value.strip()
        if not q:
            results_box.value = "<p>Please enter a search query</p>"
            return
        results_box.value = "<p>Searching…</p>"
        results_box.value = display_search_results_html(q)
    
    search_button.on_click(on_button_click)
    display(HBox([search_input, search_button]))
    display(results_box)

search_videos()

このコードは、IPythonのウィジェットを使用して、ユーザーが自然言語のクエリでビデオコンテンツを検索できるインタラクティブな検索インターフェースを作成します。

  • 関連コンテンツが表示される正確なタイムスタンプで動画が開く、クリック可能なYouTubeリンクとして検索結果を表示する関数を定義しています。

  • インターフェースには、クエリ用のテキスト入力フィールド、検索ボタン、および結果表示エリアが含まれます。

  • ユーザーがクエリを入力して検索をクリックすると、システムはクエリからテキスト埋め込みを生成し、Elasticsearchでベクトル類似度検索を実行し、適合クオリティを示すスコアとともに、最もセマンティックに関連性の高い動画セグメントへのリンクを返します。


パフォーマンス分析

テストでは、検索結果に興味深いパターンが示されています。「Show me scenes with dinosaurs(恐竜のシーンを見せて)」というクエリに対して:

  • 最上位結果:『ジュラシック・ワールド/リバース』予告編の134.5秒時点(スコア: 0.6405)

  • 第2適合結果:『ヒックとドラゴン』クリップの121.2秒時点(スコア: 0.6228)

  • 適合度:Marengoは文字通りの恐竜とドラゴンのような生物の両方を正しく特定しています

TwelveLabsでは、高次元の埋め込みを処理する際に、精度とパフォーマンスのバランスを取ることの重要性を理解しています。Elasticsearchは、精度を維持しながら検索のパフォーマンスを劇的に向上させることができる、いくつかの量子化方法を提供しています。

特に、ElasticsearchのBetter Binary Quantization (BBQ)は、TwelveLabsの1024次元埋め込みにとって優れたオプションとして際立っています。BBQは最終的なスコアリング用に元のベクトルを保持しつつ、内部的にバイナリ表現を使用することで、メモリ使用量を削減し、検索処理を高速化します。

TwelveLabsの埋め込みを使用したほとんどの本番環境向けデプロイメントにおいて、以下を推奨します:

  • bbq_hnsw: 速度と精度の両方が重要となる大規模なコレクション(数百万のベクトル)に最適です。

  • flat: 最大限の精度が求められる小規模なコレクションに最適です。


本番環境における考慮事項


本番環境におけるビデオ処理のスケーリング

本番環境にElasticsearchとTwelveLabs Marengoをデプロイする場合、効率的なビデオ処理が非常に重要になります。ワークフローを最適化するために、新しいビデオのアップロードを自動的に処理するS3イベントトリガーを使用した、サーバーレスアーキテクチャの実装を検討してください。このアプローチにより、手動での介入が不要になり、シームレスな取り込みパイプラインが作成されます。さらに、Amazon Bedrockの非同期呼び出し機能を活用して複数の動画を並行して処理し、大規模なメディアライブラリにおける全体的な処理時間を劇的に削減します。

ビデオAIを大規模に運用する際には、コスト管理も同様に重要です。初回の生成後に埋め込みをS3に保存するキャッシュ戦略を導入し、同じコンテンツの重複処理を防ぎます。この手法により、特に頻繁にアクセスされるビデオやElasticsearchデプロイメントの再インデックスを行う際に、APIコストと処理時間を大幅に削減できます。


効果的なインデックス管理

最適な検索パフォーマンスを維持するためには、適切なインデックス管理が不可欠です。次のコードは、ベクトルインデックスに対してライフサイクルポリシーを設定する方法を示しています:

# Clean up indexes when done
for index_variety in index_varieties:
    es.indices.delete(index=f"twelvelabs-movie-trailer-{index_variety}")

# Clean up S3 objects
s3_objects = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
if 'Contents' in s3_objects:
    delete_keys = [{'Key': obj['Key']} for obj in s3_objects['Contents']]
    s3_client.delete_objects(Bucket=S3_BUCKET_NAME, Delete={'Objects': delete_keys})

本番システムでは、削除ではなくインデックスのローテーション戦略の導入を検討してください。時間ベースのインデックス(例:月次)を作成し、エイリアスを使用して一貫した検索エンドポイントを維持しながら、バックグラウンドでの効率的な再インデックスと最適化を可能にします。


高度な検索アプリケーション

TwelveLabs MarengoとElasticsearchの統合の真の力は、高度な検索機能を実装する際に発揮されます。Marengoの特徴であるセマンティックな理解と、Elasticsearchがビデオの文字起こしに対して行うBM25スコアリングを組み合わせた、ハイブリッド検索システムを作成します。このアプローチにより、両方のテクノロジーの強み—Marengoの深い視覚的理解とテキスト検索の精度—のバランスが取れ、より包括的で正確な結果が得られます。

より高度なアプリケーションの場合、MarengoとTwelveLabsのPegasus 1.2モデルを組み合わせることで、マルチモーダルRAG(Retrieval Augmented Generation)ワークフローを構築します。この組み合わせにより、関連するビデオコンテンツを単純に見つけるだけでなく、全体を視聴しなくてもユーザーがより深い洞察を得られる、文脈的な要約を生成できます。

最後に、ベクトル検索に加えてリアルタイムのフィルタリング機能を実装することで、ユーザー体験を向上させます。Elasticsearchはベクトルの類似度とブーリアンフィルターの組み合わせに優れており、地理空間的な制約(例: "ニューヨークの屋外のシーンを見せて")、時間的なフィルター(例: "過去30日間の動画")、およびアクセス制御制限を、Marengoの強力なセマンティック理解を犠牲にすることなく統合できます。\


はじめに

Marengoのビデオ理解機能とElasticsearchのベクトル検索により、テキストを検索するのと同じくらい自然にビデオコンテンツを検索できるようになり、これまでメディアライブラリに隠されていたインサイトを解き放つことができます。

今すぐAmazon BedrockでTwelveLabsモデルへのアクセスをリクエストし、Elasticsearchデプロイメントをセットアップし、コンテンツを真に理解するインテリジェントなビデオアプリケーションの構築を始めましょう。完全なノートブックは Elasticsearch Labs リポジトリで公開されています。


基本リソース

今すぐ始める:

実装ガイド:

デベロッパー向けリソース: