パートナーシップ

埋め込みからインサイトへ:TwelveLabs MarengoとS3ベクトルを活用した実践クロスモーダル検索

阮 景談、ジェームズ・ル

開発者は、Amazon Bedrock上のTwelve Labs Marengo埋め込みモデルをAmazon S3 Vectorsと統合することで、ビデオ、オーディオ、画像、テキストから1,024次元の埋め込みを生成し、別のベクトルデータベースを管理することなくコサイン類似度を使用してクエリを実行できる、クロスモーダルなセマンティック検索パイプラインを構築できます。

開発者は、Amazon Bedrock上のTwelve Labs Marengo埋め込みモデルをAmazon S3 Vectorsと統合することで、ビデオ、オーディオ、画像、テキストから1,024次元の埋め込みを生成し、別のベクトルデータベースを管理することなくコサイン類似度を使用してクエリを実行できる、クロスモーダルなセマンティック検索パイプラインを構築できます。

この記事の内容

No headings found on page

ニュースレターに登録する

ニュースレターに登録する

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

AIを活用してビデオを検索、分析、探索します。

2025/08/29

17分

記事へのリンクをコピー

1 - はじめに

AWSテクノロジーを使用して構築している開発者であれば、Amazon BedrockやS3などのマネージドサービスを使用したスケーラブルなAIアプリケーションの構築には馴染みがあるでしょう。今回、BedrockでTwelveLabsの最先端モデルが利用可能になったことにより、高度なビデオ理解機能を使用してワークフローを大幅に強化できます。特にMarengoモデルは傑出しており、テキスト、ビデオ、画像、オーディオを含むマルチモーダルデータに対してリッチな512次元の埋め込みを生成し、手動でのラベル付けなしでアクション、オブジェクト、サウンドなどのニュアンス豊かなコンテキストを捉えます。

これを、ネイティブベクトルをサポートする初のクラウドストレージであるAmazon S3 Vectorsと組み合わせることで、S3バケット内でシームレスかつスケーラブルなストレージと類似性検索を直接利用できます。外部データベースとの格闘はもう不要です。使い慣れたBoto3クライアントを使用して、エンタープライズ規模のコサインベースのクエリを実行するだけです。

この統合は、TwelveLabsのBedrockでのローンチの一環として発表され、自然言語によるビデオ検索、コンテンツ推薦、検索拡張生成(RAG)システムなどの強力なユースケースへの扉を開きます。このチュートリアルでは、Pythonノートブックの実践を通じて、Marengoを使用した埋め込みの生成、S3 Vectorsへの保存、およびセマンティック検索の実行についてステップバイステップで説明し、これらのツールを迅速に導入してAWSスタックを拡張できるようにします。


2 - 前提条件とセットアップ

マルチモーダル埋め込みのワークフローに進む前に、環境が正しく構成されていることを確認しましょう。AWS環境に慣れている開発者であれば、標準のAWSツールキットに加え、新しくローンチされたBedrock上のTwelveLabsモデルへのアクセス権が必要です。


前提条件

AWS要件

以下のサービスへのアクセス権を持つAWSアカウントが必要です:

  • Amazon Bedrock、S3、およびS3 Vectorsに対する適切なIAMアクセス権限を持つAWSアカウント

  • お使いのリージョンでTwelveLabs Marengoモデルが有効化されたAmazon Bedrockへのアクセス権

  • Bedrockの一時的な出力用として使用する既存のS3バケット(非同期処理に必要)

  • 認証情報が設定されたAWS CLI(aws configure

開発環境:

  • Python 3.8以上

  • Jupyter Notebookまたはお好みのPython環境


インストールと依存関係

まず、必要なPythonパッケージをインストールします。この統合は、最新のBoto3 SDKとベクトル演算用のNumPyに依存しています:

pip install boto3==1.40.7 numpy matplotlib Pillow -q

次に、Python環境に必要なライブラリをインポートします:

import boto3
import json
import numpy as np
import uuid
import time
import os
from typing import List, Dict
from botocore.exceptions import ClientError


設定

AWSおよびモデルの設定を構成します。お使いの環境に合わせて以下の変数を更新してください:

# AWS Configuration
AWS_REGION = "us-east-1"  # Update to your region
AWS_PROFILE = "default"   # Update to your profile

# S3 Vectors Configuration
VECTOR_BUCKET_NAME = "marengo-vectors-" + str(uuid.uuid4())[:8]
VECTOR_INDEX_NAME = "embeddings-index"
VECTOR_DIMENSION = 1024  # Marengo embedding dimension

# Marengo Model
MODEL_ID = 'twelvelabs.marengo-embed-2-7-v1:0'

# Temporary S3 bucket for Bedrock output (required by Bedrock API)
TEMP_S3_BUCKET = "<YOUR_S3_BUCKET>"  # TODO: Replace with your S3 bucket name

print(f"Vector Bucket: {VECTOR_BUCKET_NAME}")
print(f"Vector Index: {VECTOR_INDEX_NAME}")
print(f"Model: {MODEL_ID}")

重要: <YOUR_S3_BUCKET> をお使いのアカウントに存在する既存のS3バケット名に置き換えてください。Bedrockの非同期呼び出し機能では、処理結果を保存するためにこの一時ストレージの場所が必要となります。


AWSクライアントの初期化

使用するAWSサービス用の認証セッションを作成します:

# Initialize AWS clients
session = boto3.Session(profile_name=AWS_PROFILE)
bedrock_client = session.client('bedrock-runtime', region_name=AWS_REGION)
s3_client = session.client('s3')
s3vectors_client = session.client('s3vectors', region_name=AWS_REGION)
print("AWS clients initialized")


S3 Vectorバケットとインデックスの作成

次に、ベクトルストレージインフラストラクチャをセットアップします。S3 Vectorsでは、埋め込みを整理および検索するために、ベクトルバケットとインデックスの両方が必要です:

# Create Vector Bucket
try:
    s3vectors_client.create_vector_bucket(
        vectorBucketName=VECTOR_BUCKET_NAME,
        encryptionConfiguration={'sseType': 'AES256'}
    )
    print(f"Vector bucket '{VECTOR_BUCKET_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Vector bucket already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

このセットアップにより、AES256暗号化を備えたベクトルバケットが作成され、コサイン距離測定基準を使用してMarengoの1024次元埋め込み用に構成されたインデックスが作成されます。これは、セマンティック類似性検索に最適です。

トラブルシューティングのヒント:

  • お使いのリージョンでS3 Vectorsが利用できない場合は、サポートされているリージョンについてAWSドキュメントを確認してください。

  • IAMユーザー/ロールに、bedrock:InvokeModels3:GetObjects3:PutObject、およびs3vectors:*アクションの権限があることを確認してください。

  • ConflictException処理により、リソースが既に存在する場合でも、エラーなしでセットアップを再実行できます。

この基盤が整えば、TwelveLabs Marengoを使用して埋め込みを生成し、スケーラブルなS3ベクトルインフラストラクチャに保存する準備は完了です。


3 - Marengoを使用した埋め込みの生成

TwelveLabsのMarengoモデルは、マルチモーダル埋め込みを生成するための柔軟な入力方法を提供します。アプリケーションをスケールさせるにつれて、パフォーマンスとコストを最適化するために適切なアプローチを選択することが極めて重要になります。このセクションでは、Base64エンコーディング(小さなファイルや迅速なプロトタイピングに最適)とS3 URIベースの処理(プロダクションワークロードや大容量のメディアファイルに最適)の両方を紹介します。


メディアファイルのS3へのアップロード

本番環境のシナリオでは、メディアファイルをS3に保存することで、優れたスケーラビリティ、耐久性、およびパフォーマンスが実現します。まず、ファイルをアップロードするためのヘルパー関数を作成します:

def upload_file_to_s3(local_path: str, bucket: str, key: str) -> str:
    """
    Upload a local file to S3
    
    Args:
        local_path: Path to local file
        bucket: S3 bucket name
        key: S3 object key (path in bucket)
    
    Returns:
        S3 URI of uploaded file
    """
    try:
        s3_client.upload_file(local_path, bucket, key)
        s3_uri = f"s3://{bucket}/{key}"
        print(f"✅ Uploaded {os.path.basename(local_path)} to {s3_uri}")
        return s3_uri
    except ClientError as e:
        print(f"❌ Error uploading file: {e}")
        raise


アプローチ1:Base64エンコーディング

このアプローチでは、ファイルをローカルに読み込み、Base64にエンコードしてからBedrockに送信します。サイズが小さなファイル、開発段階、またはメディアがすでにメモリ上にあるシナリオに最適です。

テキスト埋め込み:

def generate_text_embedding(text: str) -> List[float]:
    """
    Generate embedding for text using Marengo on Bedrock
    """
    # Create unique output path
    output_prefix = f'embeddings/{uuid.uuid4()}'
    
    # Start async embedding generation
    response = bedrock_client.start_async_invoke(
        modelId=MODEL_ID,
        modelInput={
            "inputType": "text",
            "inputText": text
        },
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{TEMP_S3_BUCKET}/{output_prefix}'
            }
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Generating text embedding for: '{text[:50]}...'")
    
    # Wait for completion
    status = None
    while status not in ["Completed", "Failed"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        time.sleep(2)
    
    if status != "Completed":
        raise Exception(f"Embedding generation failed")
    
    # Retrieve embedding from S3
    response = s3_client.list_objects_v2(Bucket=TEMP_S3_BUCKET, Prefix=output_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            result = s3_client.get_object(Bucket=TEMP_S3_BUCKET, Key=obj['Key'])
            data = json.loads(result['Body'].read())
            return data['data'][0]['embedding']
    
    raise Exception("No embedding output found")

この関数は、Bedrockの非同期ワークフローを示しています。ジョブを送信し、完了するまでポーリングし、その後S3から結果を取得します。このパターンは、数千個のアイテムを同時に処理する可能性がある本番環境のワークロードに合わせて見事に拡張できます。

ビデオ埋め込み:

def generate_video_embedding(video_path: str, start_sec: float = 0, length_sec: float = None) -> List[float]:
    """
    Generate embedding for video using Marengo on Bedrock
    """
    # Read video file and encode to base64
    with open(video_path, 'rb') as video_file:
        video_base64 = base64.b64encode(video_file.read()).decode('utf-8')
    
    # Build model input with optional time segments
    model_input = {
        "inputType": "video",
        "mediaSource": {"base64String": video_base64},
        "embeddingOption": ["visual-text", "audio"]  # Capture both visual and audio
    }
    
    if start_sec is not None:
        model_input["startSec"] = start_sec
    if length_sec is not None:
        model_input["lengthSec"] = length_sec
    
    # ... rest follows the same async pattern

オーディオおよび画像関数:

generate_audio_embedding() および generate_image_embedding() 関数も同様のパターンに従い、それぞれBase64エンコーディングと適切なモデル入力パラメーターを使用して、対応するメディアタイプ向けに細かくカスタマイズされています。


アプローチ2:S3 URI処理

この本番環境向けに最適化されたアプローチでは、まずファイルをS3にアップロードし、その後URIでファイルを参照します。大容量ファイルに適しており、メモリ使用量を削減し、より優れたエラーハンドリングと再試行ロジックを構成できます。

S3からのビデオ埋め込み:

def generate_video_embedding_from_s3(s3_uri: str, start_sec: float = 0, 
                                    length_sec: float = None) -> List[float]:
    """Generate embedding for video from S3 using Marengo on Bedrock"""
    output_prefix = f'{OUTPUT_PREFIX}video/{uuid.uuid4()}'
    
    model_input = {
        "inputType": "video",
        "mediaSource": {
            "s3Location": {
                "uri": s3_uri,
                "bucketOwner": ACCOUNT_ID
            }
        },
        "embeddingOption": ["visual-text", "audio"]
    }
    
    response = bedrock_client.start_async_invoke(
        modelId=MODEL_ID,
        modelInput=model_input,
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{TEMP_S3_BUCKET}/{output_prefix}',
                "bucketOwner": ACCOUNT_ID
            }
        }
    )
    
    print(f"🎬 Generating video embedding from S3: {s3_uri}")
    # Enhanced status monitoring with retry logic...

以下は、本番環境での使用例です:

# Upload video to S3
video_s3_key = f"{MEDIA_PREFIX}videos/sample_video.mp4"
video_s3_uri = upload_file_to_s3('video.mp4', TEMP_S3_BUCKET, video_s3_key)

# Generate embedding from S3
video_embedding_s3 = generate_video_embedding_from_s3(video_s3_uri, start_sec=0, length_sec=10)


検証:両方のアプローチから得られる同一の結果

重要な検証ステップとして、同じコンテンツに対して両方のアプローチが完全に同一の埋め込みを生成することを確認します:

# Compare VIDEO embeddings
print("🎬 VIDEO EMBEDDING VERIFICATION")
print("-" * 50)

# Generate using Base64
print("Generating via Base64...")
video_base64_emb = generate_video_embedding('video.mp4', start_sec=0, length_sec=10)

# Upload and generate using S3 URI
print("Generating via S3 URI...")
video_s3_key = f"{MEDIA_PREFIX}verification/video.mp4"
video_s3_uri = upload_file_to_s3('video.mp4', TEMP_S3_BUCKET, video_s3_key)
video_s3_emb = generate_video_embedding_from_s3(video_s3_uri, start_sec=0, length_sec=10)

# Calculate similarity
similarity = cosine_similarity(video_base64_emb, video_s3_emb)
print(f"📊 Results:")
print(f"  Cosine similarity: {similarity:.6f}")
print(f"  Are they identical? {'✅ YES' if similarity > 0.9999 else '❌ NO'}")

検証の結果、すべてのメディアタイプで一貫して完璧な類似性スコアが示され、両方のアプローチが同一の表現(埋め込み)を生成することが確認できました。これにより、開発者は特定のユースケースの要件に基づいて暗黙のうちに最適な方法を選択することができます。


埋め込みパターンの可視化

埋め込み構造を理解することは、モデルの動作の検証やアプリケーションのデバッグに役立ちます。以下は、異なるモダリティを可視化する方法です:

テキスト埋め込みの可視化:

# Visualize text embeddings
if text_embeddings:
    plt.figure(figsize=(12, 3))
    for i, (text, emb, _) in enumerate(text_embeddings[:3]):
        plt.subplot(1, 3, i+1)
        plt.imshow([emb[:100]], aspect='auto', cmap='coolwarm')
        plt.title(f'Text {i+1}', fontsize=10)
        plt.xlabel('Dimensions (first 100)')
        plt.colorbar(orientation='horizontal', pad=0.1)
    plt.suptitle('Text Embeddings Visualization', fontsize=12)
    plt.tight_layout()
    plt.show()

この可視化は、次元全体にわたる多様な活性化強度を通じて、テキスト埋め込みがどのようにセマンティックパターンをとらえるかを明らかにしています。関連する概念がどのように同様のパターンを示す一方で、異なるトピックが異なる活性化シグネチャを示すかに注目してください。

ビデオ、オーディオ、および画像の可視化:

それぞれのモダリティは、異なる埋め込みパターンを生成します:

ビデオ埋め込みは、統一された1,024次元空間で時間的視覚情報とオーディオ特徴の両方をとらえる複雑なパターンを表示します:

オーディオ埋め込みは、異なるスペクトル表現を持つ特徴的な周波数ドメインパターンを示します:

画像埋め込みは、豊かな色分けされた強度変化を伴う空間的特徴の活性化パターンを示します:

これらの可視化は、Marengoが異なるデータタイプをどのように処理するかを理解するのに役立ち、下流のアプリケーション向けのフィーチャエンジニアリングの決定に役立てることができます。


本番デプロイにおける重要な洞察

Base64を使用するタイミング: 最適なパフォーマンスを得るには、25MB未満の小さなファイルを処理する場合、開発やテスト段階、すでにメモリ上にあるデータをリアルタイムで処理する必要がある場合、または単一のファイルが関与する単純なワークフローの場合に、Base64エンコーディングを選択します。

S3 URIを使用するタイミング: 25MBを超える大きなビデオファイルを処理する場合、本番環境の一括バッチ処理システムを実装する場合、データの耐久性を必要とするマルチステージのパイプラインを構築する場合、すでにS3にあるコンテンツを処理する場合、または堅牢なエラー処理と再試行メカニズムが不可欠なシナリオでは、S3 URIが好ましいアプローチです。

パフォーマンスに関する考慮事項: S3 URIアプローチは、ローカルメモリ消費の大幅な削減、大きなファイルに対するタイムアウト問題処理の改善、包括的な再試行ロジックを備えた強化された状態監視機能、および大量のメディアを処理する際の優れたコスト効率など、複数のパフォーマンス上の利点を提供します。

どちらのアプローチでも、同一の統一されたベクトル空間に存在する完全に同じ1,024次元の埋め込みが生成されるため、生成のためにどの方法を選択したかに関わらず、シームレスなクロスメディア検索が可能になります。この柔軟性により、すべてのメディアタイプにわたって一貫したセマンティック解釈を維持しながら、ファイルサイズ、インフラストラクチャの制約、および運用要件に基づいてアーキテクチャを最適化できます。


4 - S3 Vectorsへの埋め込みの保存

マルチモーダル埋め込みを手に入れたら、次のステップはそれをスケーラブルで検索可能なストアに保存することです。Amazon S3 Vectorsはネイティブのベクトルストレージと類似性検索をS3に提供するため、余分なインフラストラクチャを立ち上げたり管理したりする必要はありません。ベクトルバケットを作成し、インデックスを定義し、メタデータとともにベクトルを一括挿入するだけです。このセクションでは、本番環境を意識した開発者向けに、更新されたノートブックとREADMEのワークフローおよびコードを再現して紹介します。


なぜMarengo埋め込みにS3 Vectorsを使用するのか

  • S3ネイティブ:既存データとともにベクトルを保存し、既存のS3コントロール、ライフサイクル、およびセキュリティ体制を再利用できます。

  • 組み込みの類似性検索:AWS SDKのクライアントを介してコサイン距離によるクエリをダイレクトに実行でき、個別のベクトルデータベースを開発・運営する必要がありません。

  • Marengoに完全適合したスキーマ:インデックスの次元数はMarengo-Embed-2.7の出力に一致するよう1,024に設定されており、効率的なストレージと検索が保証されます。


メタデータを使用したベクトルペイロードの構築

埋め込みを生成した後、一貫した構造に正規化します。それには、一意のキー、float32形式のベクトルペイロード、そして人間が読み取れるコンテキストとフィルタリング用のメタデータが含まれます。ノートブックでは、従来のタプル形式と、メディアタイプを含む拡張されたトリプレット形式の両方を処理します。

def store_embeddings(embeddings_data: List[tuple]) -> bool:
    """
    Store embeddings in S3 Vectors with metadata
    """
    vectors_to_insert = []
    
    for i, item in enumerate(embeddings_data):
        # Support (text, embedding) and (text, embedding, media_type)
        if len(item) == 3:
            text, embedding, media_type = item
        else:
            text, embedding = item
            media_type = "text"
        
        vector_entry = {
            'key': f'vector_{i:04d}',           # deterministic, idempotent upserts
            'data': {
                'float32': [float(v) for v in embedding]  # convert to float32-compatible list
            },
            'metadata': {
                'text': text,                   # human-readable description
                'media_type': media_type,       # useful for UI badges and filtering
                'id': i                         # numeric id for quick reference
            }
        }
        vectors_to_insert.append(vector_entry)

この構造フォーマットはS3 Vectorsのput_vectors APIに適合しており、合理化された検索結果と分析のために、埋め込みのすぐ近くにメタデータを保持できます。


S3 Vectorsへの一括挿入

1回のバッチ呼び出しを使用することで、ラウンドトリップを最小限に抑え、操作を繰り返し可能にします。ノートブックには、メディアタイプ別の役立つサマリーも印刷されます。これはサニティチェックやダッシュボードに役立ちます。

    try:
        s3vectors_client.put_vectors(
            vectorBucketName=VECTOR_BUCKET_NAME,
            indexName=VECTOR_INDEX_NAME,
            vectors=vectors_to_insert
        )
        print(f"Stored {len(vectors_to_insert)} vectors in S3 Vectors")
        
        # Optional: quick summary
        media_counts = {}
        for item in embeddings_data:
            mtype = item[2] if len(item) == 3 else "text"
            media_counts[mtype] = media_counts.get(mtype, 0) + 1
        print("Media types stored:")
        for m, c in media_counts.items():
            print(f"  - {m}: {c}")
        return True
    except ClientError as e:
        print(f"Error storing vectors: {e}")
        return False

典型的な実行では、テキスト、ビデオ、オーディオ、および画像に分散された6つのベクトルを保存したことが報告され、クロスモーダル検索のエンドツーエンドの準備が完了したことが確認できます。


インデックスの設計と距離の測定基準

インデックスを作成する際は、パフォーマンスを最適化するためにいくつか重要なパラメーターを構成する必要があります。まず、Marengoのベクトル値を適切に保存するためにdataTypeをfloat32に設定します。次に、Marengoの埋め込みサイズに完全に一致させるためにdimensionを1,024に指定します。distanceMetricについては、ベクトル間のセマンティック類似性を測定する標準的なアプローチであるコサインを選択します。最後に、metadataConfigurationに少なくとも1つのキーを含めるようにします(この例ではフィルター不可のプレースホルダーを使用し、挿入時にリッチなメタデータを動的に割り当てています)。

これらの慎重に選定されたデフォルト構成は、Marengoモデルの出力形式との互換性を確保し、このチュートリアルの後半で使用する下流の類似性検索メソッドとも完全に一致します。


べき等性、更新、およびバージョニング

構成にあたってのベストプラクティスをいくつか確認しておきましょう:

  • コンテンツのURIハッシュや固定UUIDに基づく確定的(デターミニスティック)キーを使用することでベクトル管理を最適化し、ベクトルデータベースを更新する際に安全で予測可能な再取り込み処理を確実に実行できるようにします。

  • まったく同一のキーで取り込みジョブを再実行した場合、ベクトルはアップサート(upserts)によって綺麗に上書きされる必要があります。過去のバージョン履歴を維持する場合は、キーにバージョンのサフィックス(接尾辞)を追加し、DynamoDBなどの個別のシステムでアクティブなバージョンの参照関係を管理します。

  • メタデータは個別のベクトルに紐付けられているため、既存と同じキーのまま(新しいラベルやタイムスタンプなどで)メタデータ属性だけを一括更新してベクトルを再保存(re-put)することで、スキーマを段階的に進化させることができます。

これによりベクトルがデータベースに格納されてインデックスが作成され、低レイテンシーのセマンティック検索を実行する準備が整いました。次に、新しいクエリの埋め込み、もしくは事前に計算されたベクトルを使用してインデックスを検索し、S3のセキュリティおよびガバナンスの境界から一歩も出ることなく、テキスト、ビデオ、オーディオ、画像にまたがるクロスモーダルな検索体験を提供しましょう。


5 - 類似性検索の実行

Amazon S3 Vectorsでベクトルインデックスが構築されれば、問い合わせは低レイテンシーのシンプルなAPI呼び出しを実行するだけです。同一のフローで、リアルタイムの自然言語クエリと任意のモダリティの事前計算済み埋め込みの両方をサポートできます。これは、AWS上における一連のクロスモーダル情報検索パターンを実装する上で非常に理想的です。


テキストまたは事前計算済みの埋め込みによる検索

配布しているデモのノートブックには、テキストプロンプト(Marengoを使って即座に埋め込みをオンザフライ生成)または事前に用意されたベクトル形式の埋め込みの両方を受け入れることができる単一のラッパー関数が含まれています。その後、関数はS3 Vectorsインデックスに対してコサイン類似度クエリを実行し、結果表示に便利なメタデータと共に、スコア順にソートされたランク順位を返します。

def search_similar(query_text: str = None, query_embedding: List[float] = None, top_k: int = 3) -> List[Dict]:
    """
    Search for similar vectors using either a text query or a pre-computed embedding
    """
    # Generate embedding for query if text is provided
    if query_text and not query_embedding:
        print(f"\nSearching for: '{query_text}'")
        query_embedding = generate_text_embedding(query_text)
    elif query_embedding:
        print(f"\nSearching with provided embedding...")
    else:
        raise ValueError("Either query_text or query_embedding must be provided")
    
    # Query S3 Vectors
    try:
        response = s3vectors_client.query_vectors(
            vectorBucketName=VECTOR_BUCKET_NAME,
            indexName=VECTOR_INDEX_NAME,
            topK=top_k,
            queryVector={'float32': [float(v) for v in query_embedding]},
            returnMetadata=True,
            returnDistance=True
        )
        
        results = []
        for vector in response.get('vectors', []):
            # Handle both old vectors (without media_type) and new vectors (with media_type)
            results.append({
                'text': vector['metadata'].get('text', 'No description'),
                'media_type': vector['metadata'].get('media_type', 'text'),  # Default to 'text' for old vectors
                'similarity': 1.0 - vector.get('distance', 0),  # Convert distance to similarity
                'key': vector.get('key')
            })
        
        return results
        
    except ClientError as e:
        print(f"Query failed: {e}")
        return []

仕組み:

  • もしquery_textが指定された場合、Bedrockの非同期呼び出しを使って1,024次元のMarengo埋め込みを生成し、S3の一時的な保存先から結果を取得してから、それを用いてS3 Vectorsを検索します。

  • もしquery_embedding(ベクトル形式)が渡された場合、上記の埋め込み生成処理ステップをスキップし、そのままベクトルをS3 Vectorsに入力して直接検索を行います。これは「すべての形式を対象としたメディア類似検索(ビデオ→すべて、画像→すべて、等)」を行うのに最適です。

主要なコード実装ポイント:

  • topKは、インデックスから返す結果(上位件数)を制御します。

  • returnMetadatareturnDistanceを指定することで、人間に読みやすいメタデータを取得し、similarity = 1.0 - distanceという式を用いて距離から直感的な類似度スコアへ変換してソートできるようにします。

  • この検索ラッパー関数は、デモ構成が過渡期にある古いデータ、すなわちmedia_typeメタデータが存在しない過去のベクトルに対しても、フォールバック値として「

1 - はじめに

AWSテクノロジーを使用して構築している開発者であれば、Amazon BedrockやS3などのマネージドサービスを使用したスケーラブルなAIアプリケーションの構築には馴染みがあるでしょう。今回、BedrockでTwelveLabsの最先端モデルが利用可能になったことにより、高度なビデオ理解機能を使用してワークフローを大幅に強化できます。特にMarengoモデルは傑出しており、テキスト、ビデオ、画像、オーディオを含むマルチモーダルデータに対してリッチな512次元の埋め込みを生成し、手動でのラベル付けなしでアクション、オブジェクト、サウンドなどのニュアンス豊かなコンテキストを捉えます。

これを、ネイティブベクトルをサポートする初のクラウドストレージであるAmazon S3 Vectorsと組み合わせることで、S3バケット内でシームレスかつスケーラブルなストレージと類似性検索を直接利用できます。外部データベースとの格闘はもう不要です。使い慣れたBoto3クライアントを使用して、エンタープライズ規模のコサインベースのクエリを実行するだけです。

この統合は、TwelveLabsのBedrockでのローンチの一環として発表され、自然言語によるビデオ検索、コンテンツ推薦、検索拡張生成(RAG)システムなどの強力なユースケースへの扉を開きます。このチュートリアルでは、Pythonノートブックの実践を通じて、Marengoを使用した埋め込みの生成、S3 Vectorsへの保存、およびセマンティック検索の実行についてステップバイステップで説明し、これらのツールを迅速に導入してAWSスタックを拡張できるようにします。


2 - 前提条件とセットアップ

マルチモーダル埋め込みのワークフローに進む前に、環境が正しく構成されていることを確認しましょう。AWS環境に慣れている開発者であれば、標準のAWSツールキットに加え、新しくローンチされたBedrock上のTwelveLabsモデルへのアクセス権が必要です。


前提条件

AWS要件

以下のサービスへのアクセス権を持つAWSアカウントが必要です:

  • Amazon Bedrock、S3、およびS3 Vectorsに対する適切なIAMアクセス権限を持つAWSアカウント

  • お使いのリージョンでTwelveLabs Marengoモデルが有効化されたAmazon Bedrockへのアクセス権

  • Bedrockの一時的な出力用として使用する既存のS3バケット(非同期処理に必要)

  • 認証情報が設定されたAWS CLI(aws configure

開発環境:

  • Python 3.8以上

  • Jupyter Notebookまたはお好みのPython環境


インストールと依存関係

まず、必要なPythonパッケージをインストールします。この統合は、最新のBoto3 SDKとベクトル演算用のNumPyに依存しています:

pip install boto3==1.40.7 numpy matplotlib Pillow -q

次に、Python環境に必要なライブラリをインポートします:

import boto3
import json
import numpy as np
import uuid
import time
import os
from typing import List, Dict
from botocore.exceptions import ClientError


設定

AWSおよびモデルの設定を構成します。お使いの環境に合わせて以下の変数を更新してください:

# AWS Configuration
AWS_REGION = "us-east-1"  # Update to your region
AWS_PROFILE = "default"   # Update to your profile

# S3 Vectors Configuration
VECTOR_BUCKET_NAME = "marengo-vectors-" + str(uuid.uuid4())[:8]
VECTOR_INDEX_NAME = "embeddings-index"
VECTOR_DIMENSION = 1024  # Marengo embedding dimension

# Marengo Model
MODEL_ID = 'twelvelabs.marengo-embed-2-7-v1:0'

# Temporary S3 bucket for Bedrock output (required by Bedrock API)
TEMP_S3_BUCKET = "<YOUR_S3_BUCKET>"  # TODO: Replace with your S3 bucket name

print(f"Vector Bucket: {VECTOR_BUCKET_NAME}")
print(f"Vector Index: {VECTOR_INDEX_NAME}")
print(f"Model: {MODEL_ID}")

重要: <YOUR_S3_BUCKET> をお使いのアカウントに存在する既存のS3バケット名に置き換えてください。Bedrockの非同期呼び出し機能では、処理結果を保存するためにこの一時ストレージの場所が必要となります。


AWSクライアントの初期化

使用するAWSサービス用の認証セッションを作成します:

# Initialize AWS clients
session = boto3.Session(profile_name=AWS_PROFILE)
bedrock_client = session.client('bedrock-runtime', region_name=AWS_REGION)
s3_client = session.client('s3')
s3vectors_client = session.client('s3vectors', region_name=AWS_REGION)
print("AWS clients initialized")


S3 Vectorバケットとインデックスの作成

次に、ベクトルストレージインフラストラクチャをセットアップします。S3 Vectorsでは、埋め込みを整理および検索するために、ベクトルバケットとインデックスの両方が必要です:

# Create Vector Bucket
try:
    s3vectors_client.create_vector_bucket(
        vectorBucketName=VECTOR_BUCKET_NAME,
        encryptionConfiguration={'sseType': 'AES256'}
    )
    print(f"Vector bucket '{VECTOR_BUCKET_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Vector bucket already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

このセットアップにより、AES256暗号化を備えたベクトルバケットが作成され、コサイン距離測定基準を使用してMarengoの1024次元埋め込み用に構成されたインデックスが作成されます。これは、セマンティック類似性検索に最適です。

トラブルシューティングのヒント:

  • お使いのリージョンでS3 Vectorsが利用できない場合は、サポートされているリージョンについてAWSドキュメントを確認してください。

  • IAMユーザー/ロールに、bedrock:InvokeModels3:GetObjects3:PutObject、およびs3vectors:*アクションの権限があることを確認してください。

  • ConflictException処理により、リソースが既に存在する場合でも、エラーなしでセットアップを再実行できます。

この基盤が整えば、TwelveLabs Marengoを使用して埋め込みを生成し、スケーラブルなS3ベクトルインフラストラクチャに保存する準備は完了です。


3 - Marengoを使用した埋め込みの生成

TwelveLabsのMarengoモデルは、マルチモーダル埋め込みを生成するための柔軟な入力方法を提供します。アプリケーションをスケールさせるにつれて、パフォーマンスとコストを最適化するために適切なアプローチを選択することが極めて重要になります。このセクションでは、Base64エンコーディング(小さなファイルや迅速なプロトタイピングに最適)とS3 URIベースの処理(プロダクションワークロードや大容量のメディアファイルに最適)の両方を紹介します。


メディアファイルのS3へのアップロード

本番環境のシナリオでは、メディアファイルをS3に保存することで、優れたスケーラビリティ、耐久性、およびパフォーマンスが実現します。まず、ファイルをアップロードするためのヘルパー関数を作成します:

def upload_file_to_s3(local_path: str, bucket: str, key: str) -> str:
    """
    Upload a local file to S3
    
    Args:
        local_path: Path to local file
        bucket: S3 bucket name
        key: S3 object key (path in bucket)
    
    Returns:
        S3 URI of uploaded file
    """
    try:
        s3_client.upload_file(local_path, bucket, key)
        s3_uri = f"s3://{bucket}/{key}"
        print(f"✅ Uploaded {os.path.basename(local_path)} to {s3_uri}")
        return s3_uri
    except ClientError as e:
        print(f"❌ Error uploading file: {e}")
        raise


アプローチ1:Base64エンコーディング

このアプローチでは、ファイルをローカルに読み込み、Base64にエンコードしてからBedrockに送信します。サイズが小さなファイル、開発段階、またはメディアがすでにメモリ上にあるシナリオに最適です。

テキスト埋め込み:

def generate_text_embedding(text: str) -> List[float]:
    """
    Generate embedding for text using Marengo on Bedrock
    """
    # Create unique output path
    output_prefix = f'embeddings/{uuid.uuid4()}'
    
    # Start async embedding generation
    response = bedrock_client.start_async_invoke(
        modelId=MODEL_ID,
        modelInput={
            "inputType": "text",
            "inputText": text
        },
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{TEMP_S3_BUCKET}/{output_prefix}'
            }
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Generating text embedding for: '{text[:50]}...'")
    
    # Wait for completion
    status = None
    while status not in ["Completed", "Failed"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        time.sleep(2)
    
    if status != "Completed":
        raise Exception(f"Embedding generation failed")
    
    # Retrieve embedding from S3
    response = s3_client.list_objects_v2(Bucket=TEMP_S3_BUCKET, Prefix=output_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            result = s3_client.get_object(Bucket=TEMP_S3_BUCKET, Key=obj['Key'])
            data = json.loads(result['Body'].read())
            return data['data'][0]['embedding']
    
    raise Exception("No embedding output found")

この関数は、Bedrockの非同期ワークフローを示しています。ジョブを送信し、完了するまでポーリングし、その後S3から結果を取得します。このパターンは、数千個のアイテムを同時に処理する可能性がある本番環境のワークロードに合わせて見事に拡張できます。

ビデオ埋め込み:

def generate_video_embedding(video_path: str, start_sec: float = 0, length_sec: float = None) -> List[float]:
    """
    Generate embedding for video using Marengo on Bedrock
    """
    # Read video file and encode to base64
    with open(video_path, 'rb') as video_file:
        video_base64 = base64.b64encode(video_file.read()).decode('utf-8')
    
    # Build model input with optional time segments
    model_input = {
        "inputType": "video",
        "mediaSource": {"base64String": video_base64},
        "embeddingOption": ["visual-text", "audio"]  # Capture both visual and audio
    }
    
    if start_sec is not None:
        model_input["startSec"] = start_sec
    if length_sec is not None:
        model_input["lengthSec"] = length_sec
    
    # ... rest follows the same async pattern

オーディオおよび画像関数:

generate_audio_embedding() および generate_image_embedding() 関数も同様のパターンに従い、それぞれBase64エンコーディングと適切なモデル入力パラメーターを使用して、対応するメディアタイプ向けに細かくカスタマイズされています。


アプローチ2:S3 URI処理

この本番環境向けに最適化されたアプローチでは、まずファイルをS3にアップロードし、その後URIでファイルを参照します。大容量ファイルに適しており、メモリ使用量を削減し、より優れたエラーハンドリングと再試行ロジックを構成できます。

S3からのビデオ埋め込み:

def generate_video_embedding_from_s3(s3_uri: str, start_sec: float = 0, 
                                    length_sec: float = None) -> List[float]:
    """Generate embedding for video from S3 using Marengo on Bedrock"""
    output_prefix = f'{OUTPUT_PREFIX}video/{uuid.uuid4()}'
    
    model_input = {
        "inputType": "video",
        "mediaSource": {
            "s3Location": {
                "uri": s3_uri,
                "bucketOwner": ACCOUNT_ID
            }
        },
        "embeddingOption": ["visual-text", "audio"]
    }
    
    response = bedrock_client.start_async_invoke(
        modelId=MODEL_ID,
        modelInput=model_input,
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{TEMP_S3_BUCKET}/{output_prefix}',
                "bucketOwner": ACCOUNT_ID
            }
        }
    )
    
    print(f"🎬 Generating video embedding from S3: {s3_uri}")
    # Enhanced status monitoring with retry logic...

以下は、本番環境での使用例です:

# Upload video to S3
video_s3_key = f"{MEDIA_PREFIX}videos/sample_video.mp4"
video_s3_uri = upload_file_to_s3('video.mp4', TEMP_S3_BUCKET, video_s3_key)

# Generate embedding from S3
video_embedding_s3 = generate_video_embedding_from_s3(video_s3_uri, start_sec=0, length_sec=10)


検証:両方のアプローチから得られる同一の結果

重要な検証ステップとして、同じコンテンツに対して両方のアプローチが完全に同一の埋め込みを生成することを確認します:

# Compare VIDEO embeddings
print("🎬 VIDEO EMBEDDING VERIFICATION")
print("-" * 50)

# Generate using Base64
print("Generating via Base64...")
video_base64_emb = generate_video_embedding('video.mp4', start_sec=0, length_sec=10)

# Upload and generate using S3 URI
print("Generating via S3 URI...")
video_s3_key = f"{MEDIA_PREFIX}verification/video.mp4"
video_s3_uri = upload_file_to_s3('video.mp4', TEMP_S3_BUCKET, video_s3_key)
video_s3_emb = generate_video_embedding_from_s3(video_s3_uri, start_sec=0, length_sec=10)

# Calculate similarity
similarity = cosine_similarity(video_base64_emb, video_s3_emb)
print(f"📊 Results:")
print(f"  Cosine similarity: {similarity:.6f}")
print(f"  Are they identical? {'✅ YES' if similarity > 0.9999 else '❌ NO'}")

検証の結果、すべてのメディアタイプで一貫して完璧な類似性スコアが示され、両方のアプローチが同一の表現(埋め込み)を生成することが確認できました。これにより、開発者は特定のユースケースの要件に基づいて暗黙のうちに最適な方法を選択することができます。


埋め込みパターンの可視化

埋め込み構造を理解することは、モデルの動作の検証やアプリケーションのデバッグに役立ちます。以下は、異なるモダリティを可視化する方法です:

テキスト埋め込みの可視化:

# Visualize text embeddings
if text_embeddings:
    plt.figure(figsize=(12, 3))
    for i, (text, emb, _) in enumerate(text_embeddings[:3]):
        plt.subplot(1, 3, i+1)
        plt.imshow([emb[:100]], aspect='auto', cmap='coolwarm')
        plt.title(f'Text {i+1}', fontsize=10)
        plt.xlabel('Dimensions (first 100)')
        plt.colorbar(orientation='horizontal', pad=0.1)
    plt.suptitle('Text Embeddings Visualization', fontsize=12)
    plt.tight_layout()
    plt.show()

この可視化は、次元全体にわたる多様な活性化強度を通じて、テキスト埋め込みがどのようにセマンティックパターンをとらえるかを明らかにしています。関連する概念がどのように同様のパターンを示す一方で、異なるトピックが異なる活性化シグネチャを示すかに注目してください。

ビデオ、オーディオ、および画像の可視化:

それぞれのモダリティは、異なる埋め込みパターンを生成します:

ビデオ埋め込みは、統一された1,024次元空間で時間的視覚情報とオーディオ特徴の両方をとらえる複雑なパターンを表示します:

オーディオ埋め込みは、異なるスペクトル表現を持つ特徴的な周波数ドメインパターンを示します:

画像埋め込みは、豊かな色分けされた強度変化を伴う空間的特徴の活性化パターンを示します:

これらの可視化は、Marengoが異なるデータタイプをどのように処理するかを理解するのに役立ち、下流のアプリケーション向けのフィーチャエンジニアリングの決定に役立てることができます。


本番デプロイにおける重要な洞察

Base64を使用するタイミング: 最適なパフォーマンスを得るには、25MB未満の小さなファイルを処理する場合、開発やテスト段階、すでにメモリ上にあるデータをリアルタイムで処理する必要がある場合、または単一のファイルが関与する単純なワークフローの場合に、Base64エンコーディングを選択します。

S3 URIを使用するタイミング: 25MBを超える大きなビデオファイルを処理する場合、本番環境の一括バッチ処理システムを実装する場合、データの耐久性を必要とするマルチステージのパイプラインを構築する場合、すでにS3にあるコンテンツを処理する場合、または堅牢なエラー処理と再試行メカニズムが不可欠なシナリオでは、S3 URIが好ましいアプローチです。

パフォーマンスに関する考慮事項: S3 URIアプローチは、ローカルメモリ消費の大幅な削減、大きなファイルに対するタイムアウト問題処理の改善、包括的な再試行ロジックを備えた強化された状態監視機能、および大量のメディアを処理する際の優れたコスト効率など、複数のパフォーマンス上の利点を提供します。

どちらのアプローチでも、同一の統一されたベクトル空間に存在する完全に同じ1,024次元の埋め込みが生成されるため、生成のためにどの方法を選択したかに関わらず、シームレスなクロスメディア検索が可能になります。この柔軟性により、すべてのメディアタイプにわたって一貫したセマンティック解釈を維持しながら、ファイルサイズ、インフラストラクチャの制約、および運用要件に基づいてアーキテクチャを最適化できます。


4 - S3 Vectorsへの埋め込みの保存

マルチモーダル埋め込みを手に入れたら、次のステップはそれをスケーラブルで検索可能なストアに保存することです。Amazon S3 Vectorsはネイティブのベクトルストレージと類似性検索をS3に提供するため、余分なインフラストラクチャを立ち上げたり管理したりする必要はありません。ベクトルバケットを作成し、インデックスを定義し、メタデータとともにベクトルを一括挿入するだけです。このセクションでは、本番環境を意識した開発者向けに、更新されたノートブックとREADMEのワークフローおよびコードを再現して紹介します。


なぜMarengo埋め込みにS3 Vectorsを使用するのか

  • S3ネイティブ:既存データとともにベクトルを保存し、既存のS3コントロール、ライフサイクル、およびセキュリティ体制を再利用できます。

  • 組み込みの類似性検索:AWS SDKのクライアントを介してコサイン距離によるクエリをダイレクトに実行でき、個別のベクトルデータベースを開発・運営する必要がありません。

  • Marengoに完全適合したスキーマ:インデックスの次元数はMarengo-Embed-2.7の出力に一致するよう1,024に設定されており、効率的なストレージと検索が保証されます。


メタデータを使用したベクトルペイロードの構築

埋め込みを生成した後、一貫した構造に正規化します。それには、一意のキー、float32形式のベクトルペイロード、そして人間が読み取れるコンテキストとフィルタリング用のメタデータが含まれます。ノートブックでは、従来のタプル形式と、メディアタイプを含む拡張されたトリプレット形式の両方を処理します。

def store_embeddings(embeddings_data: List[tuple]) -> bool:
    """
    Store embeddings in S3 Vectors with metadata
    """
    vectors_to_insert = []
    
    for i, item in enumerate(embeddings_data):
        # Support (text, embedding) and (text, embedding, media_type)
        if len(item) == 3:
            text, embedding, media_type = item
        else:
            text, embedding = item
            media_type = "text"
        
        vector_entry = {
            'key': f'vector_{i:04d}',           # deterministic, idempotent upserts
            'data': {
                'float32': [float(v) for v in embedding]  # convert to float32-compatible list
            },
            'metadata': {
                'text': text,                   # human-readable description
                'media_type': media_type,       # useful for UI badges and filtering
                'id': i                         # numeric id for quick reference
            }
        }
        vectors_to_insert.append(vector_entry)

この構造フォーマットはS3 Vectorsのput_vectors APIに適合しており、合理化された検索結果と分析のために、埋め込みのすぐ近くにメタデータを保持できます。


S3 Vectorsへの一括挿入

1回のバッチ呼び出しを使用することで、ラウンドトリップを最小限に抑え、操作を繰り返し可能にします。ノートブックには、メディアタイプ別の役立つサマリーも印刷されます。これはサニティチェックやダッシュボードに役立ちます。

    try:
        s3vectors_client.put_vectors(
            vectorBucketName=VECTOR_BUCKET_NAME,
            indexName=VECTOR_INDEX_NAME,
            vectors=vectors_to_insert
        )
        print(f"Stored {len(vectors_to_insert)} vectors in S3 Vectors")
        
        # Optional: quick summary
        media_counts = {}
        for item in embeddings_data:
            mtype = item[2] if len(item) == 3 else "text"
            media_counts[mtype] = media_counts.get(mtype, 0) + 1
        print("Media types stored:")
        for m, c in media_counts.items():
            print(f"  - {m}: {c}")
        return True
    except ClientError as e:
        print(f"Error storing vectors: {e}")
        return False

典型的な実行では、テキスト、ビデオ、オーディオ、および画像に分散された6つのベクトルを保存したことが報告され、クロスモーダル検索のエンドツーエンドの準備が完了したことが確認できます。


インデックスの設計と距離の測定基準

インデックスを作成する際は、パフォーマンスを最適化するためにいくつか重要なパラメーターを構成する必要があります。まず、Marengoのベクトル値を適切に保存するためにdataTypeをfloat32に設定します。次に、Marengoの埋め込みサイズに完全に一致させるためにdimensionを1,024に指定します。distanceMetricについては、ベクトル間のセマンティック類似性を測定する標準的なアプローチであるコサインを選択します。最後に、metadataConfigurationに少なくとも1つのキーを含めるようにします(この例ではフィルター不可のプレースホルダーを使用し、挿入時にリッチなメタデータを動的に割り当てています)。

これらの慎重に選定されたデフォルト構成は、Marengoモデルの出力形式との互換性を確保し、このチュートリアルの後半で使用する下流の類似性検索メソッドとも完全に一致します。


べき等性、更新、およびバージョニング

構成にあたってのベストプラクティスをいくつか確認しておきましょう:

  • コンテンツのURIハッシュや固定UUIDに基づく確定的(デターミニスティック)キーを使用することでベクトル管理を最適化し、ベクトルデータベースを更新する際に安全で予測可能な再取り込み処理を確実に実行できるようにします。

  • まったく同一のキーで取り込みジョブを再実行した場合、ベクトルはアップサート(upserts)によって綺麗に上書きされる必要があります。過去のバージョン履歴を維持する場合は、キーにバージョンのサフィックス(接尾辞)を追加し、DynamoDBなどの個別のシステムでアクティブなバージョンの参照関係を管理します。

  • メタデータは個別のベクトルに紐付けられているため、既存と同じキーのまま(新しいラベルやタイムスタンプなどで)メタデータ属性だけを一括更新してベクトルを再保存(re-put)することで、スキーマを段階的に進化させることができます。

これによりベクトルがデータベースに格納されてインデックスが作成され、低レイテンシーのセマンティック検索を実行する準備が整いました。次に、新しいクエリの埋め込み、もしくは事前に計算されたベクトルを使用してインデックスを検索し、S3のセキュリティおよびガバナンスの境界から一歩も出ることなく、テキスト、ビデオ、オーディオ、画像にまたがるクロスモーダルな検索体験を提供しましょう。


5 - 類似性検索の実行

Amazon S3 Vectorsでベクトルインデックスが構築されれば、問い合わせは低レイテンシーのシンプルなAPI呼び出しを実行するだけです。同一のフローで、リアルタイムの自然言語クエリと任意のモダリティの事前計算済み埋め込みの両方をサポートできます。これは、AWS上における一連のクロスモーダル情報検索パターンを実装する上で非常に理想的です。


テキストまたは事前計算済みの埋め込みによる検索

配布しているデモのノートブックには、テキストプロンプト(Marengoを使って即座に埋め込みをオンザフライ生成)または事前に用意されたベクトル形式の埋め込みの両方を受け入れることができる単一のラッパー関数が含まれています。その後、関数はS3 Vectorsインデックスに対してコサイン類似度クエリを実行し、結果表示に便利なメタデータと共に、スコア順にソートされたランク順位を返します。

def search_similar(query_text: str = None, query_embedding: List[float] = None, top_k: int = 3) -> List[Dict]:
    """
    Search for similar vectors using either a text query or a pre-computed embedding
    """
    # Generate embedding for query if text is provided
    if query_text and not query_embedding:
        print(f"\nSearching for: '{query_text}'")
        query_embedding = generate_text_embedding(query_text)
    elif query_embedding:
        print(f"\nSearching with provided embedding...")
    else:
        raise ValueError("Either query_text or query_embedding must be provided")
    
    # Query S3 Vectors
    try:
        response = s3vectors_client.query_vectors(
            vectorBucketName=VECTOR_BUCKET_NAME,
            indexName=VECTOR_INDEX_NAME,
            topK=top_k,
            queryVector={'float32': [float(v) for v in query_embedding]},
            returnMetadata=True,
            returnDistance=True
        )
        
        results = []
        for vector in response.get('vectors', []):
            # Handle both old vectors (without media_type) and new vectors (with media_type)
            results.append({
                'text': vector['metadata'].get('text', 'No description'),
                'media_type': vector['metadata'].get('media_type', 'text'),  # Default to 'text' for old vectors
                'similarity': 1.0 - vector.get('distance', 0),  # Convert distance to similarity
                'key': vector.get('key')
            })
        
        return results
        
    except ClientError as e:
        print(f"Query failed: {e}")
        return []

仕組み:

  • もしquery_textが指定された場合、Bedrockの非同期呼び出しを使って1,024次元のMarengo埋め込みを生成し、S3の一時的な保存先から結果を取得してから、それを用いてS3 Vectorsを検索します。

  • もしquery_embedding(ベクトル形式)が渡された場合、上記の埋め込み生成処理ステップをスキップし、そのままベクトルをS3 Vectorsに入力して直接検索を行います。これは「すべての形式を対象としたメディア類似検索(ビデオ→すべて、画像→すべて、等)」を行うのに最適です。

主要なコード実装ポイント:

  • topKは、インデックスから返す結果(上位件数)を制御します。

  • returnMetadatareturnDistanceを指定することで、人間に読みやすいメタデータを取得し、similarity = 1.0 - distanceという式を用いて距離から直感的な類似度スコアへ変換してソートできるようにします。

  • この検索ラッパー関数は、デモ構成が過渡期にある古いデータ、すなわちmedia_typeメタデータが存在しない過去のベクトルに対しても、フォールバック値として「

1 - はじめに

AWSテクノロジーを使用して構築している開発者であれば、Amazon BedrockやS3などのマネージドサービスを使用したスケーラブルなAIアプリケーションの構築には馴染みがあるでしょう。今回、BedrockでTwelveLabsの最先端モデルが利用可能になったことにより、高度なビデオ理解機能を使用してワークフローを大幅に強化できます。特にMarengoモデルは傑出しており、テキスト、ビデオ、画像、オーディオを含むマルチモーダルデータに対してリッチな512次元の埋め込みを生成し、手動でのラベル付けなしでアクション、オブジェクト、サウンドなどのニュアンス豊かなコンテキストを捉えます。

これを、ネイティブベクトルをサポートする初のクラウドストレージであるAmazon S3 Vectorsと組み合わせることで、S3バケット内でシームレスかつスケーラブルなストレージと類似性検索を直接利用できます。外部データベースとの格闘はもう不要です。使い慣れたBoto3クライアントを使用して、エンタープライズ規模のコサインベースのクエリを実行するだけです。

この統合は、TwelveLabsのBedrockでのローンチの一環として発表され、自然言語によるビデオ検索、コンテンツ推薦、検索拡張生成(RAG)システムなどの強力なユースケースへの扉を開きます。このチュートリアルでは、Pythonノートブックの実践を通じて、Marengoを使用した埋め込みの生成、S3 Vectorsへの保存、およびセマンティック検索の実行についてステップバイステップで説明し、これらのツールを迅速に導入してAWSスタックを拡張できるようにします。


2 - 前提条件とセットアップ

マルチモーダル埋め込みのワークフローに進む前に、環境が正しく構成されていることを確認しましょう。AWS環境に慣れている開発者であれば、標準のAWSツールキットに加え、新しくローンチされたBedrock上のTwelveLabsモデルへのアクセス権が必要です。


前提条件

AWS要件

以下のサービスへのアクセス権を持つAWSアカウントが必要です:

  • Amazon Bedrock、S3、およびS3 Vectorsに対する適切なIAMアクセス権限を持つAWSアカウント

  • お使いのリージョンでTwelveLabs Marengoモデルが有効化されたAmazon Bedrockへのアクセス権

  • Bedrockの一時的な出力用として使用する既存のS3バケット(非同期処理に必要)

  • 認証情報が設定されたAWS CLI(aws configure

開発環境:

  • Python 3.8以上

  • Jupyter Notebookまたはお好みのPython環境


インストールと依存関係

まず、必要なPythonパッケージをインストールします。この統合は、最新のBoto3 SDKとベクトル演算用のNumPyに依存しています:

pip install boto3==1.40.7 numpy matplotlib Pillow -q

次に、Python環境に必要なライブラリをインポートします:

import boto3
import json
import numpy as np
import uuid
import time
import os
from typing import List, Dict
from botocore.exceptions import ClientError


設定

AWSおよびモデルの設定を構成します。お使いの環境に合わせて以下の変数を更新してください:

# AWS Configuration
AWS_REGION = "us-east-1"  # Update to your region
AWS_PROFILE = "default"   # Update to your profile

# S3 Vectors Configuration
VECTOR_BUCKET_NAME = "marengo-vectors-" + str(uuid.uuid4())[:8]
VECTOR_INDEX_NAME = "embeddings-index"
VECTOR_DIMENSION = 1024  # Marengo embedding dimension

# Marengo Model
MODEL_ID = 'twelvelabs.marengo-embed-2-7-v1:0'

# Temporary S3 bucket for Bedrock output (required by Bedrock API)
TEMP_S3_BUCKET = "<YOUR_S3_BUCKET>"  # TODO: Replace with your S3 bucket name

print(f"Vector Bucket: {VECTOR_BUCKET_NAME}")
print(f"Vector Index: {VECTOR_INDEX_NAME}")
print(f"Model: {MODEL_ID}")

重要: <YOUR_S3_BUCKET> をお使いのアカウントに存在する既存のS3バケット名に置き換えてください。Bedrockの非同期呼び出し機能では、処理結果を保存するためにこの一時ストレージの場所が必要となります。


AWSクライアントの初期化

使用するAWSサービス用の認証セッションを作成します:

# Initialize AWS clients
session = boto3.Session(profile_name=AWS_PROFILE)
bedrock_client = session.client('bedrock-runtime', region_name=AWS_REGION)
s3_client = session.client('s3')
s3vectors_client = session.client('s3vectors', region_name=AWS_REGION)
print("AWS clients initialized")


S3 Vectorバケットとインデックスの作成

次に、ベクトルストレージインフラストラクチャをセットアップします。S3 Vectorsでは、埋め込みを整理および検索するために、ベクトルバケットとインデックスの両方が必要です:

# Create Vector Bucket
try:
    s3vectors_client.create_vector_bucket(
        vectorBucketName=VECTOR_BUCKET_NAME,
        encryptionConfiguration={'sseType': 'AES256'}
    )
    print(f"Vector bucket '{VECTOR_BUCKET_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Vector bucket already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

# Create Vector Index
try:
    s3vectors_client.create_index(
        vectorBucketName=VECTOR_BUCKET_NAME,
        indexName=VECTOR_INDEX_NAME,
        dataType='float32',
        dimension=VECTOR_DIMENSION,
        distanceMetric='cosine',
        metadataConfiguration={'nonFilterableMetadataKeys': ['source']}
    )
    print(f"Index '{VECTOR_INDEX_NAME}' created")
except ClientError as e:
    if e.response['Error']['Code'] == 'ConflictException':
        print(f"Index already exists")
    else:
        print(f"Error: {e}")

このセットアップにより、AES256暗号化を備えたベクトルバケットが作成され、コサイン距離測定基準を使用してMarengoの1024次元埋め込み用に構成されたインデックスが作成されます。これは、セマンティック類似性検索に最適です。

トラブルシューティングのヒント:

  • お使いのリージョンでS3 Vectorsが利用できない場合は、サポートされているリージョンについてAWSドキュメントを確認してください。

  • IAMユーザー/ロールに、bedrock:InvokeModels3:GetObjects3:PutObject、およびs3vectors:*アクションの権限があることを確認してください。

  • ConflictException処理により、リソースが既に存在する場合でも、エラーなしでセットアップを再実行できます。

この基盤が整えば、TwelveLabs Marengoを使用して埋め込みを生成し、スケーラブルなS3ベクトルインフラストラクチャに保存する準備は完了です。


3 - Marengoを使用した埋め込みの生成

TwelveLabsのMarengoモデルは、マルチモーダル埋め込みを生成するための柔軟な入力方法を提供します。アプリケーションをスケールさせるにつれて、パフォーマンスとコストを最適化するために適切なアプローチを選択することが極めて重要になります。このセクションでは、Base64エンコーディング(小さなファイルや迅速なプロトタイピングに最適)とS3 URIベースの処理(プロダクションワークロードや大容量のメディアファイルに最適)の両方を紹介します。


メディアファイルのS3へのアップロード

本番環境のシナリオでは、メディアファイルをS3に保存することで、優れたスケーラビリティ、耐久性、およびパフォーマンスが実現します。まず、ファイルをアップロードするためのヘルパー関数を作成します:

def upload_file_to_s3(local_path: str, bucket: str, key: str) -> str:
    """
    Upload a local file to S3
    
    Args:
        local_path: Path to local file
        bucket: S3 bucket name
        key: S3 object key (path in bucket)
    
    Returns:
        S3 URI of uploaded file
    """
    try:
        s3_client.upload_file(local_path, bucket, key)
        s3_uri = f"s3://{bucket}/{key}"
        print(f"✅ Uploaded {os.path.basename(local_path)} to {s3_uri}")
        return s3_uri
    except ClientError as e:
        print(f"❌ Error uploading file: {e}")
        raise


アプローチ1:Base64エンコーディング

このアプローチでは、ファイルをローカルに読み込み、Base64にエンコードしてからBedrockに送信します。サイズが小さなファイル、開発段階、またはメディアがすでにメモリ上にあるシナリオに最適です。

テキスト埋め込み:

def generate_text_embedding(text: str) -> List[float]:
    """
    Generate embedding for text using Marengo on Bedrock
    """
    # Create unique output path
    output_prefix = f'embeddings/{uuid.uuid4()}'
    
    # Start async embedding generation
    response = bedrock_client.start_async_invoke(
        modelId=MODEL_ID,
        modelInput={
            "inputType": "text",
            "inputText": text
        },
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{TEMP_S3_BUCKET}/{output_prefix}'
            }
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Generating text embedding for: '{text[:50]}...'")
    
    # Wait for completion
    status = None
    while status not in ["Completed", "Failed"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        time.sleep(2)
    
    if status != "Completed":
        raise Exception(f"Embedding generation failed")
    
    # Retrieve embedding from S3
    response = s3_client.list_objects_v2(Bucket=TEMP_S3_BUCKET, Prefix=output_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            result = s3_client.get_object(Bucket=TEMP_S3_BUCKET, Key=obj['Key'])
            data = json.loads(result['Body'].read())
            return data['data'][0]['embedding']
    
    raise Exception("No embedding output found")

この関数は、Bedrockの非同期ワークフローを示しています。ジョブを送信し、完了するまでポーリングし、その後S3から結果を取得します。このパターンは、数千個のアイテムを同時に処理する可能性がある本番環境のワークロードに合わせて見事に拡張できます。

ビデオ埋め込み:

def generate_video_embedding(video_path: str, start_sec: float = 0, length_sec: float = None) -> List[float]:
    """
    Generate embedding for video using Marengo on Bedrock
    """
    # Read video file and encode to base64
    with open(video_path, 'rb') as video_file:
        video_base64 = base64.b64encode(video_file.read()).decode('utf-8')
    
    # Build model input with optional time segments
    model_input = {
        "inputType": "video",
        "mediaSource": {"base64String": video_base64},
        "embeddingOption": ["visual-text", "audio"]  # Capture both visual and audio
    }
    
    if start_sec is not None:
        model_input["startSec"] = start_sec
    if length_sec is not None:
        model_input["lengthSec"] = length_sec
    
    # ... rest follows the same async pattern

オーディオおよび画像関数:

generate_audio_embedding() および generate_image_embedding() 関数も同様のパターンに従い、それぞれBase64エンコーディングと適切なモデル入力パラメーターを使用して、対応するメディアタイプ向けに細かくカスタマイズされています。


アプローチ2:S3 URI処理

この本番環境向けに最適化されたアプローチでは、まずファイルをS3にアップロードし、その後URIでファイルを参照します。大容量ファイルに適しており、メモリ使用量を削減し、より優れたエラーハンドリングと再試行ロジックを構成できます。

S3からのビデオ埋め込み:

def generate_video_embedding_from_s3(s3_uri: str, start_sec: float = 0, 
                                    length_sec: float = None) -> List[float]:
    """Generate embedding for video from S3 using Marengo on Bedrock"""
    output_prefix = f'{OUTPUT_PREFIX}video/{uuid.uuid4()}'
    
    model_input = {
        "inputType": "video",
        "mediaSource": {
            "s3Location": {
                "uri": s3_uri,
                "bucketOwner": ACCOUNT_ID
            }
        },
        "embeddingOption": ["visual-text", "audio"]
    }
    
    response = bedrock_client.start_async_invoke(
        modelId=MODEL_ID,
        modelInput=model_input,
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{TEMP_S3_BUCKET}/{output_prefix}',
                "bucketOwner": ACCOUNT_ID
            }
        }
    )
    
    print(f"🎬 Generating video embedding from S3: {s3_uri}")
    # Enhanced status monitoring with retry logic...

以下は、本番環境での使用例です:

# Upload video to S3
video_s3_key = f"{MEDIA_PREFIX}videos/sample_video.mp4"
video_s3_uri = upload_file_to_s3('video.mp4', TEMP_S3_BUCKET, video_s3_key)

# Generate embedding from S3
video_embedding_s3 = generate_video_embedding_from_s3(video_s3_uri, start_sec=0, length_sec=10)


検証:両方のアプローチから得られる同一の結果

重要な検証ステップとして、同じコンテンツに対して両方のアプローチが完全に同一の埋め込みを生成することを確認します:

# Compare VIDEO embeddings
print("🎬 VIDEO EMBEDDING VERIFICATION")
print("-" * 50)

# Generate using Base64
print("Generating via Base64...")
video_base64_emb = generate_video_embedding('video.mp4', start_sec=0, length_sec=10)

# Upload and generate using S3 URI
print("Generating via S3 URI...")
video_s3_key = f"{MEDIA_PREFIX}verification/video.mp4"
video_s3_uri = upload_file_to_s3('video.mp4', TEMP_S3_BUCKET, video_s3_key)
video_s3_emb = generate_video_embedding_from_s3(video_s3_uri, start_sec=0, length_sec=10)

# Calculate similarity
similarity = cosine_similarity(video_base64_emb, video_s3_emb)
print(f"📊 Results:")
print(f"  Cosine similarity: {similarity:.6f}")
print(f"  Are they identical? {'✅ YES' if similarity > 0.9999 else '❌ NO'}")

検証の結果、すべてのメディアタイプで一貫して完璧な類似性スコアが示され、両方のアプローチが同一の表現(埋め込み)を生成することが確認できました。これにより、開発者は特定のユースケースの要件に基づいて暗黙のうちに最適な方法を選択することができます。


埋め込みパターンの可視化

埋め込み構造を理解することは、モデルの動作の検証やアプリケーションのデバッグに役立ちます。以下は、異なるモダリティを可視化する方法です:

テキスト埋め込みの可視化:

# Visualize text embeddings
if text_embeddings:
    plt.figure(figsize=(12, 3))
    for i, (text, emb, _) in enumerate(text_embeddings[:3]):
        plt.subplot(1, 3, i+1)
        plt.imshow([emb[:100]], aspect='auto', cmap='coolwarm')
        plt.title(f'Text {i+1}', fontsize=10)
        plt.xlabel('Dimensions (first 100)')
        plt.colorbar(orientation='horizontal', pad=0.1)
    plt.suptitle('Text Embeddings Visualization', fontsize=12)
    plt.tight_layout()
    plt.show()

この可視化は、次元全体にわたる多様な活性化強度を通じて、テキスト埋め込みがどのようにセマンティックパターンをとらえるかを明らかにしています。関連する概念がどのように同様のパターンを示す一方で、異なるトピックが異なる活性化シグネチャを示すかに注目してください。

ビデオ、オーディオ、および画像の可視化:

それぞれのモダリティは、異なる埋め込みパターンを生成します:

ビデオ埋め込みは、統一された1,024次元空間で時間的視覚情報とオーディオ特徴の両方をとらえる複雑なパターンを表示します:

オーディオ埋め込みは、異なるスペクトル表現を持つ特徴的な周波数ドメインパターンを示します:

画像埋め込みは、豊かな色分けされた強度変化を伴う空間的特徴の活性化パターンを示します:

これらの可視化は、Marengoが異なるデータタイプをどのように処理するかを理解するのに役立ち、下流のアプリケーション向けのフィーチャエンジニアリングの決定に役立てることができます。


本番デプロイにおける重要な洞察

Base64を使用するタイミング: 最適なパフォーマンスを得るには、25MB未満の小さなファイルを処理する場合、開発やテスト段階、すでにメモリ上にあるデータをリアルタイムで処理する必要がある場合、または単一のファイルが関与する単純なワークフローの場合に、Base64エンコーディングを選択します。

S3 URIを使用するタイミング: 25MBを超える大きなビデオファイルを処理する場合、本番環境の一括バッチ処理システムを実装する場合、データの耐久性を必要とするマルチステージのパイプラインを構築する場合、すでにS3にあるコンテンツを処理する場合、または堅牢なエラー処理と再試行メカニズムが不可欠なシナリオでは、S3 URIが好ましいアプローチです。

パフォーマンスに関する考慮事項: S3 URIアプローチは、ローカルメモリ消費の大幅な削減、大きなファイルに対するタイムアウト問題処理の改善、包括的な再試行ロジックを備えた強化された状態監視機能、および大量のメディアを処理する際の優れたコスト効率など、複数のパフォーマンス上の利点を提供します。

どちらのアプローチでも、同一の統一されたベクトル空間に存在する完全に同じ1,024次元の埋め込みが生成されるため、生成のためにどの方法を選択したかに関わらず、シームレスなクロスメディア検索が可能になります。この柔軟性により、すべてのメディアタイプにわたって一貫したセマンティック解釈を維持しながら、ファイルサイズ、インフラストラクチャの制約、および運用要件に基づいてアーキテクチャを最適化できます。


4 - S3 Vectorsへの埋め込みの保存

マルチモーダル埋め込みを手に入れたら、次のステップはそれをスケーラブルで検索可能なストアに保存することです。Amazon S3 Vectorsはネイティブのベクトルストレージと類似性検索をS3に提供するため、余分なインフラストラクチャを立ち上げたり管理したりする必要はありません。ベクトルバケットを作成し、インデックスを定義し、メタデータとともにベクトルを一括挿入するだけです。このセクションでは、本番環境を意識した開発者向けに、更新されたノートブックとREADMEのワークフローおよびコードを再現して紹介します。


なぜMarengo埋め込みにS3 Vectorsを使用するのか

  • S3ネイティブ:既存データとともにベクトルを保存し、既存のS3コントロール、ライフサイクル、およびセキュリティ体制を再利用できます。

  • 組み込みの類似性検索:AWS SDKのクライアントを介してコサイン距離によるクエリをダイレクトに実行でき、個別のベクトルデータベースを開発・運営する必要がありません。

  • Marengoに完全適合したスキーマ:インデックスの次元数はMarengo-Embed-2.7の出力に一致するよう1,024に設定されており、効率的なストレージと検索が保証されます。


メタデータを使用したベクトルペイロードの構築

埋め込みを生成した後、一貫した構造に正規化します。それには、一意のキー、float32形式のベクトルペイロード、そして人間が読み取れるコンテキストとフィルタリング用のメタデータが含まれます。ノートブックでは、従来のタプル形式と、メディアタイプを含む拡張されたトリプレット形式の両方を処理します。

def store_embeddings(embeddings_data: List[tuple]) -> bool:
    """
    Store embeddings in S3 Vectors with metadata
    """
    vectors_to_insert = []
    
    for i, item in enumerate(embeddings_data):
        # Support (text, embedding) and (text, embedding, media_type)
        if len(item) == 3:
            text, embedding, media_type = item
        else:
            text, embedding = item
            media_type = "text"
        
        vector_entry = {
            'key': f'vector_{i:04d}',           # deterministic, idempotent upserts
            'data': {
                'float32': [float(v) for v in embedding]  # convert to float32-compatible list
            },
            'metadata': {
                'text': text,                   # human-readable description
                'media_type': media_type,       # useful for UI badges and filtering
                'id': i                         # numeric id for quick reference
            }
        }
        vectors_to_insert.append(vector_entry)

この構造フォーマットはS3 Vectorsのput_vectors APIに適合しており、合理化された検索結果と分析のために、埋め込みのすぐ近くにメタデータを保持できます。


S3 Vectorsへの一括挿入

1回のバッチ呼び出しを使用することで、ラウンドトリップを最小限に抑え、操作を繰り返し可能にします。ノートブックには、メディアタイプ別の役立つサマリーも印刷されます。これはサニティチェックやダッシュボードに役立ちます。

    try:
        s3vectors_client.put_vectors(
            vectorBucketName=VECTOR_BUCKET_NAME,
            indexName=VECTOR_INDEX_NAME,
            vectors=vectors_to_insert
        )
        print(f"Stored {len(vectors_to_insert)} vectors in S3 Vectors")
        
        # Optional: quick summary
        media_counts = {}
        for item in embeddings_data:
            mtype = item[2] if len(item) == 3 else "text"
            media_counts[mtype] = media_counts.get(mtype, 0) + 1
        print("Media types stored:")
        for m, c in media_counts.items():
            print(f"  - {m}: {c}")
        return True
    except ClientError as e:
        print(f"Error storing vectors: {e}")
        return False

典型的な実行では、テキスト、ビデオ、オーディオ、および画像に分散された6つのベクトルを保存したことが報告され、クロスモーダル検索のエンドツーエンドの準備が完了したことが確認できます。


インデックスの設計と距離の測定基準

インデックスを作成する際は、パフォーマンスを最適化するためにいくつか重要なパラメーターを構成する必要があります。まず、Marengoのベクトル値を適切に保存するためにdataTypeをfloat32に設定します。次に、Marengoの埋め込みサイズに完全に一致させるためにdimensionを1,024に指定します。distanceMetricについては、ベクトル間のセマンティック類似性を測定する標準的なアプローチであるコサインを選択します。最後に、metadataConfigurationに少なくとも1つのキーを含めるようにします(この例ではフィルター不可のプレースホルダーを使用し、挿入時にリッチなメタデータを動的に割り当てています)。

これらの慎重に選定されたデフォルト構成は、Marengoモデルの出力形式との互換性を確保し、このチュートリアルの後半で使用する下流の類似性検索メソッドとも完全に一致します。


べき等性、更新、およびバージョニング

構成にあたってのベストプラクティスをいくつか確認しておきましょう:

  • コンテンツのURIハッシュや固定UUIDに基づく確定的(デターミニスティック)キーを使用することでベクトル管理を最適化し、ベクトルデータベースを更新する際に安全で予測可能な再取り込み処理を確実に実行できるようにします。

  • まったく同一のキーで取り込みジョブを再実行した場合、ベクトルはアップサート(upserts)によって綺麗に上書きされる必要があります。過去のバージョン履歴を維持する場合は、キーにバージョンのサフィックス(接尾辞)を追加し、DynamoDBなどの個別のシステムでアクティブなバージョンの参照関係を管理します。

  • メタデータは個別のベクトルに紐付けられているため、既存と同じキーのまま(新しいラベルやタイムスタンプなどで)メタデータ属性だけを一括更新してベクトルを再保存(re-put)することで、スキーマを段階的に進化させることができます。

これによりベクトルがデータベースに格納されてインデックスが作成され、低レイテンシーのセマンティック検索を実行する準備が整いました。次に、新しいクエリの埋め込み、もしくは事前に計算されたベクトルを使用してインデックスを検索し、S3のセキュリティおよびガバナンスの境界から一歩も出ることなく、テキスト、ビデオ、オーディオ、画像にまたがるクロスモーダルな検索体験を提供しましょう。


5 - 類似性検索の実行

Amazon S3 Vectorsでベクトルインデックスが構築されれば、問い合わせは低レイテンシーのシンプルなAPI呼び出しを実行するだけです。同一のフローで、リアルタイムの自然言語クエリと任意のモダリティの事前計算済み埋め込みの両方をサポートできます。これは、AWS上における一連のクロスモーダル情報検索パターンを実装する上で非常に理想的です。


テキストまたは事前計算済みの埋め込みによる検索

配布しているデモのノートブックには、テキストプロンプト(Marengoを使って即座に埋め込みをオンザフライ生成)または事前に用意されたベクトル形式の埋め込みの両方を受け入れることができる単一のラッパー関数が含まれています。その後、関数はS3 Vectorsインデックスに対してコサイン類似度クエリを実行し、結果表示に便利なメタデータと共に、スコア順にソートされたランク順位を返します。

def search_similar(query_text: str = None, query_embedding: List[float] = None, top_k: int = 3) -> List[Dict]:
    """
    Search for similar vectors using either a text query or a pre-computed embedding
    """
    # Generate embedding for query if text is provided
    if query_text and not query_embedding:
        print(f"\nSearching for: '{query_text}'")
        query_embedding = generate_text_embedding(query_text)
    elif query_embedding:
        print(f"\nSearching with provided embedding...")
    else:
        raise ValueError("Either query_text or query_embedding must be provided")
    
    # Query S3 Vectors
    try:
        response = s3vectors_client.query_vectors(
            vectorBucketName=VECTOR_BUCKET_NAME,
            indexName=VECTOR_INDEX_NAME,
            topK=top_k,
            queryVector={'float32': [float(v) for v in query_embedding]},
            returnMetadata=True,
            returnDistance=True
        )
        
        results = []
        for vector in response.get('vectors', []):
            # Handle both old vectors (without media_type) and new vectors (with media_type)
            results.append({
                'text': vector['metadata'].get('text', 'No description'),
                'media_type': vector['metadata'].get('media_type', 'text'),  # Default to 'text' for old vectors
                'similarity': 1.0 - vector.get('distance', 0),  # Convert distance to similarity
                'key': vector.get('key')
            })
        
        return results
        
    except ClientError as e:
        print(f"Query failed: {e}")
        return []

仕組み:

  • もしquery_textが指定された場合、Bedrockの非同期呼び出しを使って1,024次元のMarengo埋め込みを生成し、S3の一時的な保存先から結果を取得してから、それを用いてS3 Vectorsを検索します。

  • もしquery_embedding(ベクトル形式)が渡された場合、上記の埋め込み生成処理ステップをスキップし、そのままベクトルをS3 Vectorsに入力して直接検索を行います。これは「すべての形式を対象としたメディア類似検索(ビデオ→すべて、画像→すべて、等)」を行うのに最適です。

主要なコード実装ポイント:

  • topKは、インデックスから返す結果(上位件数)を制御します。

  • returnMetadatareturnDistanceを指定することで、人間に読みやすいメタデータを取得し、similarity = 1.0 - distanceという式を用いて距離から直感的な類似度スコアへ変換してソートできるようにします。

  • この検索ラッパー関数は、デモ構成が過渡期にある古いデータ、すなわちmedia_typeメタデータが存在しない過去のベクトルに対しても、フォールバック値として「