Partnerships

Partnerships

Partnerships

Finding Needles in Video Haystacks: Building Intelligent Video Search with TwelveLabs Marengo, Amazon Bedrock, and Elasticsearch

James Le

James Le

James Le

With TwelveLabs models now available on Amazon Bedrock, developers can build sophisticated video AI applications while maintaining complete control over their data. This tutorial demonstrates how to create a semantic video search system by combining our Marengo 2.7 model with Elasticsearch's vector search capabilities. Elasticsearch provides an ideal platform for storing and searching these vector embeddings at scale. With its vector search capabilities, you can perform efficient similarity searches across thousands or even millions of video segments. The combination of TwelveLabs embeddings and Elasticsearch's search infrastructure creates a powerful solution for organizations looking to make their video content more discoverable and accessible.

With TwelveLabs models now available on Amazon Bedrock, developers can build sophisticated video AI applications while maintaining complete control over their data. This tutorial demonstrates how to create a semantic video search system by combining our Marengo 2.7 model with Elasticsearch's vector search capabilities. Elasticsearch provides an ideal platform for storing and searching these vector embeddings at scale. With its vector search capabilities, you can perform efficient similarity searches across thousands or even millions of video segments. The combination of TwelveLabs embeddings and Elasticsearch's search infrastructure creates a powerful solution for organizations looking to make their video content more discoverable and accessible.

Join our newsletter

Receive the latest advancements, tutorials, and industry insights in video understanding

Search, analyze, and explore your videos with AI.

Sep 20, 2025

Sep 20, 2025

Sep 20, 2025

13 Minutes

13 Minutes

13 Minutes

Copy link to article

Copy link to article

Copy link to article

Huge thanks to Dave Erickson from Elastic for building the application that serves the basis of this tutorial. Check out his post on the Elastic Search Labs blog: https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock


With TwelveLabs models now available on Amazon Bedrock, developers can build sophisticated video AI applications while maintaining complete control over their data. This tutorial demonstrates how to create a semantic video search system by combining our Marengo 2.7 model with Elasticsearch's vector search capabilities.


The Multi-Modal Video Challenge

Traditional video search relies on metadata, transcripts, or manually tagged content. But what happens when you need to find "scenes with dinosaurs" or "moments showing teamwork" without any spoken dialogue or descriptive text? Standard approaches using separate image models and audio transcription miss the temporal dynamics that make video unique.

TwelveLabs Marengo 2.7 solves this by understanding video as a continuous multi-modal stream, capturing not just what appears in individual frames, but how visual elements, audio cues, and motion patterns work together across time.


Understanding the TwelveLabs Marengo Model

Specifically, Marengo is our state-of-the-art multimodal embedding model that generates consistent 1024-dimensional vector representations across different media types. These embeddings capture the semantic essence of your content, allowing for intuitive cross-modal search capabilities. When you embed a video using Marengo, each segment is transformed into a vector that can be compared with embeddings from text queries, images, or other videos.


Why Elasticsearch for Video Search?

Elasticsearch provides an ideal platform for storing and searching these vector embeddings at scale. With its vector search capabilities, you can perform efficient similarity searches across thousands or even millions of video segments. The combination of TwelveLabs embeddings and Elasticsearch's search infrastructure creates a powerful solution for organizations looking to make their video content more discoverable and accessible.


Architecture Overview

Our solution processes video content through four main stages :

  1. Video ingestion: Download trailers and upload to S3

  2. Async embedding generation: Use Bedrock's start_async_invoke for scalable vector creation

  3. Vector indexing: Store embeddings in Elasticsearch with multiple quantization options

  4. Real-time search: Convert text queries to vectors for k-NN retrieval


Complete Workflow


1 - Setting Up Your Environment

Before starting, ensure you have :

import os, json, time, copy
import boto3, botocore
import yt_dlp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pathlib import Path
from dotenv import load_dotenv
import tqdm

# Configuration
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
MARENGO_MODEL_ID = "twelvelabs.marengo-embed-2-7-v1:0"  
TEXT_EMBEDDING_MODEL_ID = "us.twelvelabs.marengo-embed-2-7-v1:0"
ELASTICSEARCH_ENDPOINT = os.getenv("ELASTICSEARCH_ENDPOINT")
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")

# Test dataset: 2025 summer blockbuster trailers
videos = [
    "https://www.youtube.com/watch?v=VWqJifMMgZE",  # Lilo and Stitch 2025
    "https://www.youtube.com/watch?v=Ox8ZLF6cGM0",  # Superman 2025
    "https://www.youtube.com/watch?v=jan5CFWs9ic",  # Jurassic World Rebirth
    "https://www.youtube.com/watch?v=qpoBjOg5RHU",  # Fantastic Four: First Steps
    "https://www.youtube.com/watch?v=22w7z_lT6YM",  # How to Train Your Dragon
]

The code above sets up the foundation for building a video search solution using TwelveLabs Marengo model on Amazon Bedrock and Elasticsearch.

  • It establishes the required imports, configuration variables, and a test dataset of 2025 movie trailers.

  • Key components include AWS credentials configuration, Elasticsearch endpoint information, model IDs for both video and text embedding generation, and YouTube links for the sample videos.

  • This initial setup creates the environment necessary for the subsequent video processing, embedding generation, and vector search implementation detailed in the rest of the tutorial.


2 - Data Management Class

The VideoIntelligence class encapsulates all video metadata, file paths, and embedding data:

class VideoIntelligence:
    def __init__(self, url, platform, video_id):
        self.url = url
        self.platform = platform  
        self.video_id = video_id
        self.video_string = f"{self.platform}_{self.video_id}"
        self.base_path = f"{DATA_PATH}/videos/{self.video_string}"
        
        self.video_path = None
        self.s3_key = None
        self.metadata = None
        self.title = None
        self.embeddings_list = None
    
    def get_video_object(self):
        """Return indexable document structure"""
        return {
            "url": self.url,
            "platform": self.platform, 
            "video_id": self.video_id,
            "title": self.title
        }
    
    # Additional getters/setters omitted for brevity

The VideoIntelligence class serves as a comprehensive data container for managing all aspects of video processing in our search system.

  • It stores the video's source URL, platform information, unique identifier, and associated file paths for both local and S3 storage.

  • The class also maintains references to video metadata, title information, and the critical embeddings list that will be generated by the TwelveLabs Marengo model.

  • Through its get_video_object() method, it provides a structured format for indexing video information in Elasticsearch, making it easy to manage the complete lifecycle of a video from ingestion through embedding generation to searchable indexing.


3 - Video Download and Processing

The next step is to download the YouTube videos and does some video processing:

def get_video(video: VideoIntelligence):
    """Download video using yt-dlp and extract metadata"""
    base_directory = Path(video.get_base_path())
    base_directory.mkdir(parents=True, exist_ok=True)
    
    video_path = f"{video.get_base_path()}/{video.get_video_string()}.mp4"
    metadata_path = f"{video.get_base_path()}/metadata.json"
    
    ydl_opts = {
        "format": "bestvideo+bestaudio/best",
        "outtmpl": video_path,
        "merge_output_format": "mp4"
    }
    
    if not os.path.exists(video_path):
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            metadata = ydl.extract_info(video.url, download=False)
            ydl.download([video.url])
        with open(metadata_path, "w") as f:
            json.dump(metadata, f)
    else:
        metadata = json.load(open(metadata_path, "r"))
    
    video.set_metadata(metadata) 
    video.set_video_path(video_path)

# Process all videos
video_objects = []
for video_str in videos:
    if "youtube.com" in video_str:
        platform = "youtube"
        video_id = video_str.split("v=")[1] 
        video_objects.append(VideoIntelligence(video_str, platform, video_id))

for video_object in video_objects:
    get_video(video_object)

This code here handles video processing workflow with the following key functions:

  • The get_video function downloads videos from YouTube using yt-dlp, stores them locally, and extracts metadata.

  • It creates necessary directories, sets appropriate file paths, and handles conditional downloading to prevent re-downloading existing videos.

  • The code then initializes VideoIntelligence objects for each YouTube video in the list, capturing platform information and video IDs.

  • Finally, it processes each video by calling get_video, which populates the objects with downloaded video paths and metadata.


4 - S3 Upload with Deduplication

The next step is to upload those videos into S3 buckets:

# Initialize AWS clients
session = boto3.session.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
    region_name=AWS_REGION
)

aws_account_id = session.client('sts').get_caller_identity()["Account"]
s3_client = session.client('s3')
bedrock_client = session.client("bedrock-runtime", region_name=AWS_REGION)

# Upload videos to S3 with existence checking
for video_object in video_objects:
    video_path = video_object.get_video_path()
    s3_key = f"videos/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
    video_object.set_s3_key(s3_key)
    
    try:
        s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
        print(f"Video {video_object.get_video_string()} already exists in S3")
        continue
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Uploading {video_object.get_video_string()} to S3...")
            s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
            print(f"Successfully uploaded {video_object.get_video_string()}")

This code block establishes the AWS environment for video processing by initializing the AWS session, connecting to S3, and authenticating with the Bedrock service. It performs intelligent video uploading to S3 with built-in deduplication—checking if each video already exists in the bucket before uploading to prevent redundant storage and processing.

For each video object, it constructs a standardized S3 key based on the platform and video ID, then either skips existing videos or uploads new ones. This foundation enables the subsequent embedding generation and vector search operations that will transform the raw videos into searchable content.


5 - Generating Embeddings from Videos Asynchronously

At TwelveLabs, we've designed our embedding models to work with various content types. When processing videos, Marengo automatically chunks your content into meaningful segments and generates embeddings that capture both visual and textual information. These embeddings enable you to search for specific moments using natural language queries.

In this example, the key innovation is using Bedrock's async invoke pattern for scalable embedding generation:

def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, 
                             invocation_arn: str, verbose: bool = False) -> list:
    """Poll Bedrock async job until completion and retrieve results"""
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            tqdm.tqdm.write(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            return json.loads(content).get("data", [])
    
    raise Exception("No output.json found in S3 prefix")

def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
    """Start async Marengo embedding job for video in S3"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {"uri": video_s3_uri, "bucketOwner": aws_account_id}
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'}
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    return wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)

def check_existing_embedding(video_id: str):
    """Check S3 for cached embeddings to avoid re-processing"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
        if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
            # Load existing embeddings from S3
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('output.json'):
                    output_key = obj['Key']
                    obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
                    content = obj['Body'].read().decode('utf-8')
                    return json.loads(content).get("data", [])
        return None
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return None
        raise e

# Generate embeddings with caching
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
    video_id = video_object.get_video_id()
    video_uri = f"s3://{S3_BUCKET_NAME}/{video_object.get_s3_key()}"
    
    # Check for existing embeddings first
    retrieved_embeddings = check_existing_embedding(video_id)
    if retrieved_embeddings:
        video_object.set_embeddings_list(retrieved_embeddings)
    else:
        embedding_data = create_video_embedding(video_uri, video_id)
        video_object.set_embeddings_list(embedding_data)

This code implements the core video embedding generation functionality, efficiently leveraging Amazon Bedrock's asynchronous API for scalable processing. The implementation includes three key functions:

  1. wait_for_embedding_output polls the async job status and retrieves results from S3 upon completion

  2. create_video_embedding initiates the embedding generation job for a video file stored in S3, and

  3. check_existing_embedding implements caching logic to avoid redundant processing.

The main loop processes each video with built-in deduplication, either retrieving cached embeddings or generating new ones when needed. This approach optimizes resource usage while enabling high-throughput video processing for search applications.


6 - Understanding Marengo Output Structure

Each video gets segmented into ~6-second chunks with 1,024-dimensional embeddings:

# Preview embedding structure 
video_embedding_data = video_objects[0].get_embeddings_list()
for i, embedding in enumerate(video_embedding_data[:3]):
    print(f"{i}")
    for key in embedding:
        if "embedding" == key:
            print(f"\t{key}: len {len(embedding[key])}")
        else:
            print(f"\t{key}: {embedding[key]}")

# Output:
# 0
#   embedding: len 1024
#   embeddingOption: visual-text  
#   startSec: 0.0
#   endSec: 6.199999809265137
# 1 
#   embedding: len 1024
#   embeddingOption: visual-text
#   startSec: 6.199999809265137  
#   endSec: 10.399999618530273

The code sample illustrates how to examine the structure of video embeddings generated by TwelveLabs Marengo model.

  • It retrieves the embedding data from the first video object and prints details of the first three embedding chunks.

  • Each embedding is a 1024-dimensional vector representing approximately 6-second segments of video content, with metadata including the embedding type (visual-text) and precise timestamps (startSec and endSec).

  • This structure allows for fine-grained semantic search across video content, enabling users to find specific moments using natural language queries.


7 - Configuring Elasticsearch for Vector Search

Once we have our embeddings, we'll need to configure Elasticsearch to store and search them efficiently. Elasticsearch offers several index types for vector search, each with different trade-offs between search speed, accuracy, and storage requirements.

Here's how we set up an index optimized for TwelveLabs' 1024-dimensional embeddings:

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[ELASTICSEARCH_ENDPOINT],
    api_key=ELASTICSEARCH_API_KEY
)

# Prepare documents for indexing
docs = []
for video_object in video_objects:
    persist_object = video_object.get_video_object()
    embeddings = video_object.get_embeddings_list()
    
    for embedding in embeddings:
        if embedding["embeddingOption"] == "visual-image":  # Filter for visual embeddings
            doc = copy.deepcopy(persist_object) 
            doc["embedding"] = embedding["embedding"]
            doc["start_sec"] = embedding["startSec"] 
            doc["end_sec"] = embedding["endSec"]
            docs.append(doc)

# Create indices for different vector search methods
index_varieties = [
    "flat",       # Brute force, highest accuracy
    "hnsw",       # Hierarchical navigable small world graph
    "int8_hnsw",  # Quantized for efficiency
    "bbq_hnsw",   # Better Binary Quantization with HNSW
    "bbq_flat"    # BBQ with flat search
]

for index_variety in index_varieties:
    index_name = f"twelvelabs-movie-trailer-{index_variety}"
    mappings = {
        "properties": {
            "url": {"type": "keyword"},
            "platform": {"type": "keyword"},
            "video_id": {"type": "keyword"},
            "title": {"type": "text", "analyzer": "standard"},
            "embedding": {
                "type": "dense_vector", 
                "dims": 1024,
                "similarity": "cosine",
                "index_options": {
                    "type": index_variety
                }
            },
            "start_sec": {"type": "float"},
            "end_sec": {"type": "float"}
        }
    }
    
    # Recreate index if exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        time.sleep(2)
    
    es.indices.create(index=index_name, mappings=mappings)
    print(f"Index '{index_name}' created successfully")
    
    # Bulk insert documents
    actions = [{"_index": index_name, "_source": doc} for doc in docs]
    success, failed = bulk(es, actions, chunk_size=100, max_retries=3)
    print(f"Successfully indexed {success} documents into {index_name}")

This code configures Elasticsearch for vector search by creating five different index varieties (flat, HNSW, and quantized variants) optimized for TwelveLabs' 1024-dimensional embeddings. It connects to Elasticsearch, prepares video segment documents with embeddings, creates specialized indices with appropriate mappings for dense vectors, and bulk uploads the documents. Each index type offers different trade-offs between search accuracy, speed, and storage requirements, allowing developers to choose the best approach based on their specific use case requirements.


8 - Creating Text Embeddings for Search

One of the most powerful aspects of TwelveLabs' technology is the ability to search video content using natural language. To enable this, we generate text embeddings using the same Marengo 2.7 model that processed our videos, ensuring they exist in the same vector space:

# Generate text embedding for search
def create_text_embedding(text_query: str) -> list:
    """Generate text embedding using Marengo via Bedrock"""
    text_model_input = {"inputType": "text", "inputText": text_query}
    
    response = bedrock_client.invoke_model(
        modelId=TEXT_EMBEDDING_MODEL_ID,
        body=json.dumps(text_model_input)
    )
    
    response_body = json.loads(response['body'].read().decode('utf-8'))
    embedding_data = response_body.get("data", [])
    
    return embedding_data[0]["embedding"] if embedding_data else None

This code defines a function called create_text_embedding that generates vector representations (embeddings) for text queries using the TwelveLabs Marengo model via Amazon Bedrock. The function takes a text query string as input, formats it properly for the model, sends the request to Bedrock using the synchronous invoke_model API, and then parses the response to extract the 1024-dimensional embedding vector. These text embeddings share the same vector space as the video embeddings, enabling semantic similarity searches that can find video moments based on natural language descriptions.


9 - Performing Vector Searches

With everything in place, we can now search our video library using natural language queries. Here's how to implement a search function that finds the most relevant video segments:

def vector_query(index_name: str, text_query: str) -> dict:
    """Execute k-NN vector search against Elasticsearch"""
    query_embedding = create_text_embedding(text_query)
    
    query = {
        "retriever": {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": 10,
                "num_candidates": "25"  
            }
        },
        "size": 10,
        "_source": False,
        "fields": ["title", "video_id", "start_sec"]
    }
    
    return es.search(index=index_name, body=query).body

# Test search
text_query = "Show me scenes with dinosaurs"  
results = vector_query("twelvelabs-movie-trailer-flat", text_query)
print(results)

This code implements a vector search function that enables natural language queries against video content.

  • It first generates a text embedding from the query using the same Marengo model that processed the videos, ensuring compatibility in the vector space.

  • The function then constructs an Elasticsearch k-NN query, requesting the top 10 most similar video segments while evaluating 25 candidates for better accuracy.

  • The search returns relevant metadata like title, video ID, and timestamp, allowing applications to link directly to the specific moments in videos that match the query's semantic meaning.


10 - Building a Simple Search Interface

For a more user-friendly experience, we can create a simple search interface using IPython widgets. This allows users to input search queries and view results with direct links to the specific moments in videos:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display

def display_search_results_html(query):
    """Format search results as clickable YouTube links"""
    results = vector_query("twelvelabs-movie-trailer-flat", query)
    hits = results.get('hits', {}).get('hits', [])
    
    if not hits:
        return "<p>No results found</p>"
    
    items = []
    for hit in hits:
        fields = hit.get('fields', {})
        title = fields.get('title', ['No Title'])[0]
        score = hit.get('_score', 0) 
        video_id = fields.get('video_id', [''])[0]
        start_sec = fields.get('start_sec', [0])[0]
        
        # Create YouTube deep-link to specific timestamp
        url = f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
        items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')
    
    return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"

def search_videos():
    """Create interactive search widget"""
    search_input = widgets.Text(
        value='', 
        placeholder='Enter your search query…',
        description='Search:',
        layout=Layout(width='70%')
    )
    
    search_button = widgets.Button(
        description='Search Videos',
        button_style='primary',
        layout=Layout(width='20%')
    )
    
    results_box = WHTML(value="")
    
    def on_button_click(_):
        q = search_input.value.strip()
        if not q:
            results_box.value = "<p>Please enter a search query</p>"
            return
        results_box.value = "<p>Searching…</p>"
        results_box.value = display_search_results_html(q)
    
    search_button.on_click(on_button_click)
    display(HBox([search_input, search_button]))
    display(results_box)

search_videos()

This code creates an interactive search interface using IPython widgets that allows users to search video content with natural language queries.

  • It defines a function to display search results as clickable YouTube links that open videos at the exact timestamp where relevant content appears.

  • The interface includes a text input field for queries, a search button, and a results display area.

  • When a user enters a query and clicks search, the system generates a text embedding from the query, performs a vector similarity search in Elasticsearch, and returns links to the most semantically relevant video segments, complete with scores indicating match quality.


Performance Analysis

Testing shows interesting patterns in search results. For the query "Show me scenes with dinosaurs":

  • Top result: Jurassic World Rebirth trailer at 134.5s (score: 0.6405)

  • Secondary match: How to Train Your Dragon clip at 121.2s (scores: 0.6228)

  • Precision: Marengo correctly identifies both literal dinosaurs and dragon-like creatures

At TwelveLabs, we understand the importance of balancing accuracy and performance when working with high-dimensional embeddings. Elasticsearch offers several quantization methods that can dramatically improve search performance while maintaining accuracy.

In particular, Elasticsearch's Better Binary Quantization (BBQ) stands out as an excellent option for TwelveLabs' 1024-dimensional embeddings. BBQ reduces memory usage and speeds up search operations by using a binary representation internally while preserving the original vectors for final scoring.

For most production deployments with TwelveLabs embeddings, we recommend:

  • bbq_hnsw: Best for large collections (millions of vectors) where both speed and accuracy are important.

  • flat: Best for smaller collections where maximum accuracy is required.


Production Considerations


Scaling Video Processing for Production

When deploying TwelveLabs Marengo with Elasticsearch in production environments, efficient video processing becomes crucial. To optimize your workflow, implement a serverless architecture using S3 event triggers that automatically process new video uploads. This approach eliminates manual intervention and creates a seamless ingestion pipeline. Additionally, leverage Amazon Bedrock's asynchronous invocation capabilities to process multiple videos in parallel, dramatically reducing overall processing time for large media libraries.

Cost management is equally important when working with video AI at scale. Implement a caching strategy that stores embeddings in S3 after initial generation, preventing redundant processing of the same content. This practice can significantly reduce both API costs and processing time, especially for frequently accessed videos or when reindexing your Elasticsearch deployment.


Effective Index Management

Proper index management is essential for maintaining optimal search performance. The following code demonstrates how to implement lifecycle policies for your vector indices:

# Clean up indexes when done
for index_variety in index_varieties:
    es.indices.delete(index=f"twelvelabs-movie-trailer-{index_variety}")

# Clean up S3 objects
s3_objects = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
if 'Contents' in s3_objects:
    delete_keys = [{'Key': obj['Key']} for obj in s3_objects['Contents']]
    s3_client.delete_objects(Bucket=S3_BUCKET_NAME, Delete={'Objects': delete_keys})

In production systems, consider implementing index rotation strategies rather than deletion. Create time-based indices (e.g., monthly) and use aliases to maintain consistent search endpoints while allowing for efficient background reindexing and optimization.


Advanced Search Applications

The true power of integrating TwelveLabs Marengo with Elasticsearch emerges when implementing advanced search capabilities. Create hybrid search systems that combine Marengo's semantic understanding with Elasticsearch's BM25 scoring on video transcripts. This approach balances the strengths of both technologies – Marengo's deep visual understanding with the precision of text search – resulting in more comprehensive and accurate results.

For more sophisticated applications, implement multi-modal RAG (Retrieval Augmented Generation) workflows by pairing Marengo with TwelveLabs' Pegasus 1.2 model. This combination allows you to not only find relevant video content but also generate contextual summaries that provide users with deeper insights without requiring them to watch entire videos.

Finally, enhance user experience by implementing real-time filtering capabilities alongside vector search. Elasticsearch excels at combining vector similarity with boolean filters, enabling you to incorporate geospatial constraints (e.g., "show me outdoor scenes in New York"), temporal filters (e.g., "videos from the last 30 days"), and access control restrictions – all without sacrificing the semantic understanding that makes Marengo so powerful.\


Getting Started

With Marengo's video understanding and Elasticsearch's vector search, you can finally search video content as naturally as you search text—unlocking insights that were previously hidden in your media libraries.

Request access to TwelveLabs models in Amazon Bedrock, set up your Elasticsearch deployment, and start building intelligent video applications that truly understand your content today. The complete notebook is available in the Elasticsearch Labs repository.


Essential Resources

Get Started Immediately:

Implementation Guides:

Developer Resources:

Huge thanks to Dave Erickson from Elastic for building the application that serves the basis of this tutorial. Check out his post on the Elastic Search Labs blog: https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock


With TwelveLabs models now available on Amazon Bedrock, developers can build sophisticated video AI applications while maintaining complete control over their data. This tutorial demonstrates how to create a semantic video search system by combining our Marengo 2.7 model with Elasticsearch's vector search capabilities.


The Multi-Modal Video Challenge

Traditional video search relies on metadata, transcripts, or manually tagged content. But what happens when you need to find "scenes with dinosaurs" or "moments showing teamwork" without any spoken dialogue or descriptive text? Standard approaches using separate image models and audio transcription miss the temporal dynamics that make video unique.

TwelveLabs Marengo 2.7 solves this by understanding video as a continuous multi-modal stream, capturing not just what appears in individual frames, but how visual elements, audio cues, and motion patterns work together across time.


Understanding the TwelveLabs Marengo Model

Specifically, Marengo is our state-of-the-art multimodal embedding model that generates consistent 1024-dimensional vector representations across different media types. These embeddings capture the semantic essence of your content, allowing for intuitive cross-modal search capabilities. When you embed a video using Marengo, each segment is transformed into a vector that can be compared with embeddings from text queries, images, or other videos.


Why Elasticsearch for Video Search?

Elasticsearch provides an ideal platform for storing and searching these vector embeddings at scale. With its vector search capabilities, you can perform efficient similarity searches across thousands or even millions of video segments. The combination of TwelveLabs embeddings and Elasticsearch's search infrastructure creates a powerful solution for organizations looking to make their video content more discoverable and accessible.


Architecture Overview

Our solution processes video content through four main stages :

  1. Video ingestion: Download trailers and upload to S3

  2. Async embedding generation: Use Bedrock's start_async_invoke for scalable vector creation

  3. Vector indexing: Store embeddings in Elasticsearch with multiple quantization options

  4. Real-time search: Convert text queries to vectors for k-NN retrieval


Complete Workflow


1 - Setting Up Your Environment

Before starting, ensure you have :

import os, json, time, copy
import boto3, botocore
import yt_dlp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pathlib import Path
from dotenv import load_dotenv
import tqdm

# Configuration
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
MARENGO_MODEL_ID = "twelvelabs.marengo-embed-2-7-v1:0"  
TEXT_EMBEDDING_MODEL_ID = "us.twelvelabs.marengo-embed-2-7-v1:0"
ELASTICSEARCH_ENDPOINT = os.getenv("ELASTICSEARCH_ENDPOINT")
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")

# Test dataset: 2025 summer blockbuster trailers
videos = [
    "https://www.youtube.com/watch?v=VWqJifMMgZE",  # Lilo and Stitch 2025
    "https://www.youtube.com/watch?v=Ox8ZLF6cGM0",  # Superman 2025
    "https://www.youtube.com/watch?v=jan5CFWs9ic",  # Jurassic World Rebirth
    "https://www.youtube.com/watch?v=qpoBjOg5RHU",  # Fantastic Four: First Steps
    "https://www.youtube.com/watch?v=22w7z_lT6YM",  # How to Train Your Dragon
]

The code above sets up the foundation for building a video search solution using TwelveLabs Marengo model on Amazon Bedrock and Elasticsearch.

  • It establishes the required imports, configuration variables, and a test dataset of 2025 movie trailers.

  • Key components include AWS credentials configuration, Elasticsearch endpoint information, model IDs for both video and text embedding generation, and YouTube links for the sample videos.

  • This initial setup creates the environment necessary for the subsequent video processing, embedding generation, and vector search implementation detailed in the rest of the tutorial.


2 - Data Management Class

The VideoIntelligence class encapsulates all video metadata, file paths, and embedding data:

class VideoIntelligence:
    def __init__(self, url, platform, video_id):
        self.url = url
        self.platform = platform  
        self.video_id = video_id
        self.video_string = f"{self.platform}_{self.video_id}"
        self.base_path = f"{DATA_PATH}/videos/{self.video_string}"
        
        self.video_path = None
        self.s3_key = None
        self.metadata = None
        self.title = None
        self.embeddings_list = None
    
    def get_video_object(self):
        """Return indexable document structure"""
        return {
            "url": self.url,
            "platform": self.platform, 
            "video_id": self.video_id,
            "title": self.title
        }
    
    # Additional getters/setters omitted for brevity

The VideoIntelligence class serves as a comprehensive data container for managing all aspects of video processing in our search system.

  • It stores the video's source URL, platform information, unique identifier, and associated file paths for both local and S3 storage.

  • The class also maintains references to video metadata, title information, and the critical embeddings list that will be generated by the TwelveLabs Marengo model.

  • Through its get_video_object() method, it provides a structured format for indexing video information in Elasticsearch, making it easy to manage the complete lifecycle of a video from ingestion through embedding generation to searchable indexing.


3 - Video Download and Processing

The next step is to download the YouTube videos and does some video processing:

def get_video(video: VideoIntelligence):
    """Download video using yt-dlp and extract metadata"""
    base_directory = Path(video.get_base_path())
    base_directory.mkdir(parents=True, exist_ok=True)
    
    video_path = f"{video.get_base_path()}/{video.get_video_string()}.mp4"
    metadata_path = f"{video.get_base_path()}/metadata.json"
    
    ydl_opts = {
        "format": "bestvideo+bestaudio/best",
        "outtmpl": video_path,
        "merge_output_format": "mp4"
    }
    
    if not os.path.exists(video_path):
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            metadata = ydl.extract_info(video.url, download=False)
            ydl.download([video.url])
        with open(metadata_path, "w") as f:
            json.dump(metadata, f)
    else:
        metadata = json.load(open(metadata_path, "r"))
    
    video.set_metadata(metadata) 
    video.set_video_path(video_path)

# Process all videos
video_objects = []
for video_str in videos:
    if "youtube.com" in video_str:
        platform = "youtube"
        video_id = video_str.split("v=")[1] 
        video_objects.append(VideoIntelligence(video_str, platform, video_id))

for video_object in video_objects:
    get_video(video_object)

This code here handles video processing workflow with the following key functions:

  • The get_video function downloads videos from YouTube using yt-dlp, stores them locally, and extracts metadata.

  • It creates necessary directories, sets appropriate file paths, and handles conditional downloading to prevent re-downloading existing videos.

  • The code then initializes VideoIntelligence objects for each YouTube video in the list, capturing platform information and video IDs.

  • Finally, it processes each video by calling get_video, which populates the objects with downloaded video paths and metadata.


4 - S3 Upload with Deduplication

The next step is to upload those videos into S3 buckets:

# Initialize AWS clients
session = boto3.session.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
    region_name=AWS_REGION
)

aws_account_id = session.client('sts').get_caller_identity()["Account"]
s3_client = session.client('s3')
bedrock_client = session.client("bedrock-runtime", region_name=AWS_REGION)

# Upload videos to S3 with existence checking
for video_object in video_objects:
    video_path = video_object.get_video_path()
    s3_key = f"videos/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
    video_object.set_s3_key(s3_key)
    
    try:
        s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
        print(f"Video {video_object.get_video_string()} already exists in S3")
        continue
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Uploading {video_object.get_video_string()} to S3...")
            s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
            print(f"Successfully uploaded {video_object.get_video_string()}")

This code block establishes the AWS environment for video processing by initializing the AWS session, connecting to S3, and authenticating with the Bedrock service. It performs intelligent video uploading to S3 with built-in deduplication—checking if each video already exists in the bucket before uploading to prevent redundant storage and processing.

For each video object, it constructs a standardized S3 key based on the platform and video ID, then either skips existing videos or uploads new ones. This foundation enables the subsequent embedding generation and vector search operations that will transform the raw videos into searchable content.


5 - Generating Embeddings from Videos Asynchronously

At TwelveLabs, we've designed our embedding models to work with various content types. When processing videos, Marengo automatically chunks your content into meaningful segments and generates embeddings that capture both visual and textual information. These embeddings enable you to search for specific moments using natural language queries.

In this example, the key innovation is using Bedrock's async invoke pattern for scalable embedding generation:

def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, 
                             invocation_arn: str, verbose: bool = False) -> list:
    """Poll Bedrock async job until completion and retrieve results"""
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            tqdm.tqdm.write(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            return json.loads(content).get("data", [])
    
    raise Exception("No output.json found in S3 prefix")

def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
    """Start async Marengo embedding job for video in S3"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {"uri": video_s3_uri, "bucketOwner": aws_account_id}
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'}
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    return wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)

def check_existing_embedding(video_id: str):
    """Check S3 for cached embeddings to avoid re-processing"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
        if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
            # Load existing embeddings from S3
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('output.json'):
                    output_key = obj['Key']
                    obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
                    content = obj['Body'].read().decode('utf-8')
                    return json.loads(content).get("data", [])
        return None
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return None
        raise e

# Generate embeddings with caching
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
    video_id = video_object.get_video_id()
    video_uri = f"s3://{S3_BUCKET_NAME}/{video_object.get_s3_key()}"
    
    # Check for existing embeddings first
    retrieved_embeddings = check_existing_embedding(video_id)
    if retrieved_embeddings:
        video_object.set_embeddings_list(retrieved_embeddings)
    else:
        embedding_data = create_video_embedding(video_uri, video_id)
        video_object.set_embeddings_list(embedding_data)

This code implements the core video embedding generation functionality, efficiently leveraging Amazon Bedrock's asynchronous API for scalable processing. The implementation includes three key functions:

  1. wait_for_embedding_output polls the async job status and retrieves results from S3 upon completion

  2. create_video_embedding initiates the embedding generation job for a video file stored in S3, and

  3. check_existing_embedding implements caching logic to avoid redundant processing.

The main loop processes each video with built-in deduplication, either retrieving cached embeddings or generating new ones when needed. This approach optimizes resource usage while enabling high-throughput video processing for search applications.


6 - Understanding Marengo Output Structure

Each video gets segmented into ~6-second chunks with 1,024-dimensional embeddings:

# Preview embedding structure 
video_embedding_data = video_objects[0].get_embeddings_list()
for i, embedding in enumerate(video_embedding_data[:3]):
    print(f"{i}")
    for key in embedding:
        if "embedding" == key:
            print(f"\t{key}: len {len(embedding[key])}")
        else:
            print(f"\t{key}: {embedding[key]}")

# Output:
# 0
#   embedding: len 1024
#   embeddingOption: visual-text  
#   startSec: 0.0
#   endSec: 6.199999809265137
# 1 
#   embedding: len 1024
#   embeddingOption: visual-text
#   startSec: 6.199999809265137  
#   endSec: 10.399999618530273

The code sample illustrates how to examine the structure of video embeddings generated by TwelveLabs Marengo model.

  • It retrieves the embedding data from the first video object and prints details of the first three embedding chunks.

  • Each embedding is a 1024-dimensional vector representing approximately 6-second segments of video content, with metadata including the embedding type (visual-text) and precise timestamps (startSec and endSec).

  • This structure allows for fine-grained semantic search across video content, enabling users to find specific moments using natural language queries.


7 - Configuring Elasticsearch for Vector Search

Once we have our embeddings, we'll need to configure Elasticsearch to store and search them efficiently. Elasticsearch offers several index types for vector search, each with different trade-offs between search speed, accuracy, and storage requirements.

Here's how we set up an index optimized for TwelveLabs' 1024-dimensional embeddings:

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[ELASTICSEARCH_ENDPOINT],
    api_key=ELASTICSEARCH_API_KEY
)

# Prepare documents for indexing
docs = []
for video_object in video_objects:
    persist_object = video_object.get_video_object()
    embeddings = video_object.get_embeddings_list()
    
    for embedding in embeddings:
        if embedding["embeddingOption"] == "visual-image":  # Filter for visual embeddings
            doc = copy.deepcopy(persist_object) 
            doc["embedding"] = embedding["embedding"]
            doc["start_sec"] = embedding["startSec"] 
            doc["end_sec"] = embedding["endSec"]
            docs.append(doc)

# Create indices for different vector search methods
index_varieties = [
    "flat",       # Brute force, highest accuracy
    "hnsw",       # Hierarchical navigable small world graph
    "int8_hnsw",  # Quantized for efficiency
    "bbq_hnsw",   # Better Binary Quantization with HNSW
    "bbq_flat"    # BBQ with flat search
]

for index_variety in index_varieties:
    index_name = f"twelvelabs-movie-trailer-{index_variety}"
    mappings = {
        "properties": {
            "url": {"type": "keyword"},
            "platform": {"type": "keyword"},
            "video_id": {"type": "keyword"},
            "title": {"type": "text", "analyzer": "standard"},
            "embedding": {
                "type": "dense_vector", 
                "dims": 1024,
                "similarity": "cosine",
                "index_options": {
                    "type": index_variety
                }
            },
            "start_sec": {"type": "float"},
            "end_sec": {"type": "float"}
        }
    }
    
    # Recreate index if exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        time.sleep(2)
    
    es.indices.create(index=index_name, mappings=mappings)
    print(f"Index '{index_name}' created successfully")
    
    # Bulk insert documents
    actions = [{"_index": index_name, "_source": doc} for doc in docs]
    success, failed = bulk(es, actions, chunk_size=100, max_retries=3)
    print(f"Successfully indexed {success} documents into {index_name}")

This code configures Elasticsearch for vector search by creating five different index varieties (flat, HNSW, and quantized variants) optimized for TwelveLabs' 1024-dimensional embeddings. It connects to Elasticsearch, prepares video segment documents with embeddings, creates specialized indices with appropriate mappings for dense vectors, and bulk uploads the documents. Each index type offers different trade-offs between search accuracy, speed, and storage requirements, allowing developers to choose the best approach based on their specific use case requirements.


8 - Creating Text Embeddings for Search

One of the most powerful aspects of TwelveLabs' technology is the ability to search video content using natural language. To enable this, we generate text embeddings using the same Marengo 2.7 model that processed our videos, ensuring they exist in the same vector space:

# Generate text embedding for search
def create_text_embedding(text_query: str) -> list:
    """Generate text embedding using Marengo via Bedrock"""
    text_model_input = {"inputType": "text", "inputText": text_query}
    
    response = bedrock_client.invoke_model(
        modelId=TEXT_EMBEDDING_MODEL_ID,
        body=json.dumps(text_model_input)
    )
    
    response_body = json.loads(response['body'].read().decode('utf-8'))
    embedding_data = response_body.get("data", [])
    
    return embedding_data[0]["embedding"] if embedding_data else None

This code defines a function called create_text_embedding that generates vector representations (embeddings) for text queries using the TwelveLabs Marengo model via Amazon Bedrock. The function takes a text query string as input, formats it properly for the model, sends the request to Bedrock using the synchronous invoke_model API, and then parses the response to extract the 1024-dimensional embedding vector. These text embeddings share the same vector space as the video embeddings, enabling semantic similarity searches that can find video moments based on natural language descriptions.


9 - Performing Vector Searches

With everything in place, we can now search our video library using natural language queries. Here's how to implement a search function that finds the most relevant video segments:

def vector_query(index_name: str, text_query: str) -> dict:
    """Execute k-NN vector search against Elasticsearch"""
    query_embedding = create_text_embedding(text_query)
    
    query = {
        "retriever": {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": 10,
                "num_candidates": "25"  
            }
        },
        "size": 10,
        "_source": False,
        "fields": ["title", "video_id", "start_sec"]
    }
    
    return es.search(index=index_name, body=query).body

# Test search
text_query = "Show me scenes with dinosaurs"  
results = vector_query("twelvelabs-movie-trailer-flat", text_query)
print(results)

This code implements a vector search function that enables natural language queries against video content.

  • It first generates a text embedding from the query using the same Marengo model that processed the videos, ensuring compatibility in the vector space.

  • The function then constructs an Elasticsearch k-NN query, requesting the top 10 most similar video segments while evaluating 25 candidates for better accuracy.

  • The search returns relevant metadata like title, video ID, and timestamp, allowing applications to link directly to the specific moments in videos that match the query's semantic meaning.


10 - Building a Simple Search Interface

For a more user-friendly experience, we can create a simple search interface using IPython widgets. This allows users to input search queries and view results with direct links to the specific moments in videos:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display

def display_search_results_html(query):
    """Format search results as clickable YouTube links"""
    results = vector_query("twelvelabs-movie-trailer-flat", query)
    hits = results.get('hits', {}).get('hits', [])
    
    if not hits:
        return "<p>No results found</p>"
    
    items = []
    for hit in hits:
        fields = hit.get('fields', {})
        title = fields.get('title', ['No Title'])[0]
        score = hit.get('_score', 0) 
        video_id = fields.get('video_id', [''])[0]
        start_sec = fields.get('start_sec', [0])[0]
        
        # Create YouTube deep-link to specific timestamp
        url = f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
        items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')
    
    return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"

def search_videos():
    """Create interactive search widget"""
    search_input = widgets.Text(
        value='', 
        placeholder='Enter your search query…',
        description='Search:',
        layout=Layout(width='70%')
    )
    
    search_button = widgets.Button(
        description='Search Videos',
        button_style='primary',
        layout=Layout(width='20%')
    )
    
    results_box = WHTML(value="")
    
    def on_button_click(_):
        q = search_input.value.strip()
        if not q:
            results_box.value = "<p>Please enter a search query</p>"
            return
        results_box.value = "<p>Searching…</p>"
        results_box.value = display_search_results_html(q)
    
    search_button.on_click(on_button_click)
    display(HBox([search_input, search_button]))
    display(results_box)

search_videos()

This code creates an interactive search interface using IPython widgets that allows users to search video content with natural language queries.

  • It defines a function to display search results as clickable YouTube links that open videos at the exact timestamp where relevant content appears.

  • The interface includes a text input field for queries, a search button, and a results display area.

  • When a user enters a query and clicks search, the system generates a text embedding from the query, performs a vector similarity search in Elasticsearch, and returns links to the most semantically relevant video segments, complete with scores indicating match quality.


Performance Analysis

Testing shows interesting patterns in search results. For the query "Show me scenes with dinosaurs":

  • Top result: Jurassic World Rebirth trailer at 134.5s (score: 0.6405)

  • Secondary match: How to Train Your Dragon clip at 121.2s (scores: 0.6228)

  • Precision: Marengo correctly identifies both literal dinosaurs and dragon-like creatures

At TwelveLabs, we understand the importance of balancing accuracy and performance when working with high-dimensional embeddings. Elasticsearch offers several quantization methods that can dramatically improve search performance while maintaining accuracy.

In particular, Elasticsearch's Better Binary Quantization (BBQ) stands out as an excellent option for TwelveLabs' 1024-dimensional embeddings. BBQ reduces memory usage and speeds up search operations by using a binary representation internally while preserving the original vectors for final scoring.

For most production deployments with TwelveLabs embeddings, we recommend:

  • bbq_hnsw: Best for large collections (millions of vectors) where both speed and accuracy are important.

  • flat: Best for smaller collections where maximum accuracy is required.


Production Considerations


Scaling Video Processing for Production

When deploying TwelveLabs Marengo with Elasticsearch in production environments, efficient video processing becomes crucial. To optimize your workflow, implement a serverless architecture using S3 event triggers that automatically process new video uploads. This approach eliminates manual intervention and creates a seamless ingestion pipeline. Additionally, leverage Amazon Bedrock's asynchronous invocation capabilities to process multiple videos in parallel, dramatically reducing overall processing time for large media libraries.

Cost management is equally important when working with video AI at scale. Implement a caching strategy that stores embeddings in S3 after initial generation, preventing redundant processing of the same content. This practice can significantly reduce both API costs and processing time, especially for frequently accessed videos or when reindexing your Elasticsearch deployment.


Effective Index Management

Proper index management is essential for maintaining optimal search performance. The following code demonstrates how to implement lifecycle policies for your vector indices:

# Clean up indexes when done
for index_variety in index_varieties:
    es.indices.delete(index=f"twelvelabs-movie-trailer-{index_variety}")

# Clean up S3 objects
s3_objects = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
if 'Contents' in s3_objects:
    delete_keys = [{'Key': obj['Key']} for obj in s3_objects['Contents']]
    s3_client.delete_objects(Bucket=S3_BUCKET_NAME, Delete={'Objects': delete_keys})

In production systems, consider implementing index rotation strategies rather than deletion. Create time-based indices (e.g., monthly) and use aliases to maintain consistent search endpoints while allowing for efficient background reindexing and optimization.


Advanced Search Applications

The true power of integrating TwelveLabs Marengo with Elasticsearch emerges when implementing advanced search capabilities. Create hybrid search systems that combine Marengo's semantic understanding with Elasticsearch's BM25 scoring on video transcripts. This approach balances the strengths of both technologies – Marengo's deep visual understanding with the precision of text search – resulting in more comprehensive and accurate results.

For more sophisticated applications, implement multi-modal RAG (Retrieval Augmented Generation) workflows by pairing Marengo with TwelveLabs' Pegasus 1.2 model. This combination allows you to not only find relevant video content but also generate contextual summaries that provide users with deeper insights without requiring them to watch entire videos.

Finally, enhance user experience by implementing real-time filtering capabilities alongside vector search. Elasticsearch excels at combining vector similarity with boolean filters, enabling you to incorporate geospatial constraints (e.g., "show me outdoor scenes in New York"), temporal filters (e.g., "videos from the last 30 days"), and access control restrictions – all without sacrificing the semantic understanding that makes Marengo so powerful.\


Getting Started

With Marengo's video understanding and Elasticsearch's vector search, you can finally search video content as naturally as you search text—unlocking insights that were previously hidden in your media libraries.

Request access to TwelveLabs models in Amazon Bedrock, set up your Elasticsearch deployment, and start building intelligent video applications that truly understand your content today. The complete notebook is available in the Elasticsearch Labs repository.


Essential Resources

Get Started Immediately:

Implementation Guides:

Developer Resources:

Huge thanks to Dave Erickson from Elastic for building the application that serves the basis of this tutorial. Check out his post on the Elastic Search Labs blog: https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock


With TwelveLabs models now available on Amazon Bedrock, developers can build sophisticated video AI applications while maintaining complete control over their data. This tutorial demonstrates how to create a semantic video search system by combining our Marengo 2.7 model with Elasticsearch's vector search capabilities.


The Multi-Modal Video Challenge

Traditional video search relies on metadata, transcripts, or manually tagged content. But what happens when you need to find "scenes with dinosaurs" or "moments showing teamwork" without any spoken dialogue or descriptive text? Standard approaches using separate image models and audio transcription miss the temporal dynamics that make video unique.

TwelveLabs Marengo 2.7 solves this by understanding video as a continuous multi-modal stream, capturing not just what appears in individual frames, but how visual elements, audio cues, and motion patterns work together across time.


Understanding the TwelveLabs Marengo Model

Specifically, Marengo is our state-of-the-art multimodal embedding model that generates consistent 1024-dimensional vector representations across different media types. These embeddings capture the semantic essence of your content, allowing for intuitive cross-modal search capabilities. When you embed a video using Marengo, each segment is transformed into a vector that can be compared with embeddings from text queries, images, or other videos.


Why Elasticsearch for Video Search?

Elasticsearch provides an ideal platform for storing and searching these vector embeddings at scale. With its vector search capabilities, you can perform efficient similarity searches across thousands or even millions of video segments. The combination of TwelveLabs embeddings and Elasticsearch's search infrastructure creates a powerful solution for organizations looking to make their video content more discoverable and accessible.


Architecture Overview

Our solution processes video content through four main stages :

  1. Video ingestion: Download trailers and upload to S3

  2. Async embedding generation: Use Bedrock's start_async_invoke for scalable vector creation

  3. Vector indexing: Store embeddings in Elasticsearch with multiple quantization options

  4. Real-time search: Convert text queries to vectors for k-NN retrieval


Complete Workflow


1 - Setting Up Your Environment

Before starting, ensure you have :

import os, json, time, copy
import boto3, botocore
import yt_dlp
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from pathlib import Path
from dotenv import load_dotenv
import tqdm

# Configuration
AWS_REGION = "us-east-1"
S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME")
MARENGO_MODEL_ID = "twelvelabs.marengo-embed-2-7-v1:0"  
TEXT_EMBEDDING_MODEL_ID = "us.twelvelabs.marengo-embed-2-7-v1:0"
ELASTICSEARCH_ENDPOINT = os.getenv("ELASTICSEARCH_ENDPOINT")
ELASTICSEARCH_API_KEY = os.getenv("ELASTICSEARCH_API_KEY")

# Test dataset: 2025 summer blockbuster trailers
videos = [
    "https://www.youtube.com/watch?v=VWqJifMMgZE",  # Lilo and Stitch 2025
    "https://www.youtube.com/watch?v=Ox8ZLF6cGM0",  # Superman 2025
    "https://www.youtube.com/watch?v=jan5CFWs9ic",  # Jurassic World Rebirth
    "https://www.youtube.com/watch?v=qpoBjOg5RHU",  # Fantastic Four: First Steps
    "https://www.youtube.com/watch?v=22w7z_lT6YM",  # How to Train Your Dragon
]

The code above sets up the foundation for building a video search solution using TwelveLabs Marengo model on Amazon Bedrock and Elasticsearch.

  • It establishes the required imports, configuration variables, and a test dataset of 2025 movie trailers.

  • Key components include AWS credentials configuration, Elasticsearch endpoint information, model IDs for both video and text embedding generation, and YouTube links for the sample videos.

  • This initial setup creates the environment necessary for the subsequent video processing, embedding generation, and vector search implementation detailed in the rest of the tutorial.


2 - Data Management Class

The VideoIntelligence class encapsulates all video metadata, file paths, and embedding data:

class VideoIntelligence:
    def __init__(self, url, platform, video_id):
        self.url = url
        self.platform = platform  
        self.video_id = video_id
        self.video_string = f"{self.platform}_{self.video_id}"
        self.base_path = f"{DATA_PATH}/videos/{self.video_string}"
        
        self.video_path = None
        self.s3_key = None
        self.metadata = None
        self.title = None
        self.embeddings_list = None
    
    def get_video_object(self):
        """Return indexable document structure"""
        return {
            "url": self.url,
            "platform": self.platform, 
            "video_id": self.video_id,
            "title": self.title
        }
    
    # Additional getters/setters omitted for brevity

The VideoIntelligence class serves as a comprehensive data container for managing all aspects of video processing in our search system.

  • It stores the video's source URL, platform information, unique identifier, and associated file paths for both local and S3 storage.

  • The class also maintains references to video metadata, title information, and the critical embeddings list that will be generated by the TwelveLabs Marengo model.

  • Through its get_video_object() method, it provides a structured format for indexing video information in Elasticsearch, making it easy to manage the complete lifecycle of a video from ingestion through embedding generation to searchable indexing.


3 - Video Download and Processing

The next step is to download the YouTube videos and does some video processing:

def get_video(video: VideoIntelligence):
    """Download video using yt-dlp and extract metadata"""
    base_directory = Path(video.get_base_path())
    base_directory.mkdir(parents=True, exist_ok=True)
    
    video_path = f"{video.get_base_path()}/{video.get_video_string()}.mp4"
    metadata_path = f"{video.get_base_path()}/metadata.json"
    
    ydl_opts = {
        "format": "bestvideo+bestaudio/best",
        "outtmpl": video_path,
        "merge_output_format": "mp4"
    }
    
    if not os.path.exists(video_path):
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            metadata = ydl.extract_info(video.url, download=False)
            ydl.download([video.url])
        with open(metadata_path, "w") as f:
            json.dump(metadata, f)
    else:
        metadata = json.load(open(metadata_path, "r"))
    
    video.set_metadata(metadata) 
    video.set_video_path(video_path)

# Process all videos
video_objects = []
for video_str in videos:
    if "youtube.com" in video_str:
        platform = "youtube"
        video_id = video_str.split("v=")[1] 
        video_objects.append(VideoIntelligence(video_str, platform, video_id))

for video_object in video_objects:
    get_video(video_object)

This code here handles video processing workflow with the following key functions:

  • The get_video function downloads videos from YouTube using yt-dlp, stores them locally, and extracts metadata.

  • It creates necessary directories, sets appropriate file paths, and handles conditional downloading to prevent re-downloading existing videos.

  • The code then initializes VideoIntelligence objects for each YouTube video in the list, capturing platform information and video IDs.

  • Finally, it processes each video by calling get_video, which populates the objects with downloaded video paths and metadata.


4 - S3 Upload with Deduplication

The next step is to upload those videos into S3 buckets:

# Initialize AWS clients
session = boto3.session.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY, 
    region_name=AWS_REGION
)

aws_account_id = session.client('sts').get_caller_identity()["Account"]
s3_client = session.client('s3')
bedrock_client = session.client("bedrock-runtime", region_name=AWS_REGION)

# Upload videos to S3 with existence checking
for video_object in video_objects:
    video_path = video_object.get_video_path()
    s3_key = f"videos/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
    video_object.set_s3_key(s3_key)
    
    try:
        s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
        print(f"Video {video_object.get_video_string()} already exists in S3")
        continue
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            print(f"Uploading {video_object.get_video_string()} to S3...")
            s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
            print(f"Successfully uploaded {video_object.get_video_string()}")

This code block establishes the AWS environment for video processing by initializing the AWS session, connecting to S3, and authenticating with the Bedrock service. It performs intelligent video uploading to S3 with built-in deduplication—checking if each video already exists in the bucket before uploading to prevent redundant storage and processing.

For each video object, it constructs a standardized S3 key based on the platform and video ID, then either skips existing videos or uploads new ones. This foundation enables the subsequent embedding generation and vector search operations that will transform the raw videos into searchable content.


5 - Generating Embeddings from Videos Asynchronously

At TwelveLabs, we've designed our embedding models to work with various content types. When processing videos, Marengo automatically chunks your content into meaningful segments and generates embeddings that capture both visual and textual information. These embeddings enable you to search for specific moments using natural language queries.

In this example, the key innovation is using Bedrock's async invoke pattern for scalable embedding generation:

def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, 
                             invocation_arn: str, verbose: bool = False) -> list:
    """Poll Bedrock async job until completion and retrieve results"""
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            tqdm.tqdm.write(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            return json.loads(content).get("data", [])
    
    raise Exception("No output.json found in S3 prefix")

def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
    """Start async Marengo embedding job for video in S3"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {"uri": video_s3_uri, "bucketOwner": aws_account_id}
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'}
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    return wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)

def check_existing_embedding(video_id: str):
    """Check S3 for cached embeddings to avoid re-processing"""
    s3_output_prefix = f'embeddings/videos/{video_id}'
    
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
        if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
            # Load existing embeddings from S3
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('output.json'):
                    output_key = obj['Key']
                    obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
                    content = obj['Body'].read().decode('utf-8')
                    return json.loads(content).get("data", [])
        return None
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            return None
        raise e

# Generate embeddings with caching
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
    video_id = video_object.get_video_id()
    video_uri = f"s3://{S3_BUCKET_NAME}/{video_object.get_s3_key()}"
    
    # Check for existing embeddings first
    retrieved_embeddings = check_existing_embedding(video_id)
    if retrieved_embeddings:
        video_object.set_embeddings_list(retrieved_embeddings)
    else:
        embedding_data = create_video_embedding(video_uri, video_id)
        video_object.set_embeddings_list(embedding_data)

This code implements the core video embedding generation functionality, efficiently leveraging Amazon Bedrock's asynchronous API for scalable processing. The implementation includes three key functions:

  1. wait_for_embedding_output polls the async job status and retrieves results from S3 upon completion

  2. create_video_embedding initiates the embedding generation job for a video file stored in S3, and

  3. check_existing_embedding implements caching logic to avoid redundant processing.

The main loop processes each video with built-in deduplication, either retrieving cached embeddings or generating new ones when needed. This approach optimizes resource usage while enabling high-throughput video processing for search applications.


6 - Understanding Marengo Output Structure

Each video gets segmented into ~6-second chunks with 1,024-dimensional embeddings:

# Preview embedding structure 
video_embedding_data = video_objects[0].get_embeddings_list()
for i, embedding in enumerate(video_embedding_data[:3]):
    print(f"{i}")
    for key in embedding:
        if "embedding" == key:
            print(f"\t{key}: len {len(embedding[key])}")
        else:
            print(f"\t{key}: {embedding[key]}")

# Output:
# 0
#   embedding: len 1024
#   embeddingOption: visual-text  
#   startSec: 0.0
#   endSec: 6.199999809265137
# 1 
#   embedding: len 1024
#   embeddingOption: visual-text
#   startSec: 6.199999809265137  
#   endSec: 10.399999618530273

The code sample illustrates how to examine the structure of video embeddings generated by TwelveLabs Marengo model.

  • It retrieves the embedding data from the first video object and prints details of the first three embedding chunks.

  • Each embedding is a 1024-dimensional vector representing approximately 6-second segments of video content, with metadata including the embedding type (visual-text) and precise timestamps (startSec and endSec).

  • This structure allows for fine-grained semantic search across video content, enabling users to find specific moments using natural language queries.


7 - Configuring Elasticsearch for Vector Search

Once we have our embeddings, we'll need to configure Elasticsearch to store and search them efficiently. Elasticsearch offers several index types for vector search, each with different trade-offs between search speed, accuracy, and storage requirements.

Here's how we set up an index optimized for TwelveLabs' 1024-dimensional embeddings:

# Connect to Elasticsearch
es = Elasticsearch(
    hosts=[ELASTICSEARCH_ENDPOINT],
    api_key=ELASTICSEARCH_API_KEY
)

# Prepare documents for indexing
docs = []
for video_object in video_objects:
    persist_object = video_object.get_video_object()
    embeddings = video_object.get_embeddings_list()
    
    for embedding in embeddings:
        if embedding["embeddingOption"] == "visual-image":  # Filter for visual embeddings
            doc = copy.deepcopy(persist_object) 
            doc["embedding"] = embedding["embedding"]
            doc["start_sec"] = embedding["startSec"] 
            doc["end_sec"] = embedding["endSec"]
            docs.append(doc)

# Create indices for different vector search methods
index_varieties = [
    "flat",       # Brute force, highest accuracy
    "hnsw",       # Hierarchical navigable small world graph
    "int8_hnsw",  # Quantized for efficiency
    "bbq_hnsw",   # Better Binary Quantization with HNSW
    "bbq_flat"    # BBQ with flat search
]

for index_variety in index_varieties:
    index_name = f"twelvelabs-movie-trailer-{index_variety}"
    mappings = {
        "properties": {
            "url": {"type": "keyword"},
            "platform": {"type": "keyword"},
            "video_id": {"type": "keyword"},
            "title": {"type": "text", "analyzer": "standard"},
            "embedding": {
                "type": "dense_vector", 
                "dims": 1024,
                "similarity": "cosine",
                "index_options": {
                    "type": index_variety
                }
            },
            "start_sec": {"type": "float"},
            "end_sec": {"type": "float"}
        }
    }
    
    # Recreate index if exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        time.sleep(2)
    
    es.indices.create(index=index_name, mappings=mappings)
    print(f"Index '{index_name}' created successfully")
    
    # Bulk insert documents
    actions = [{"_index": index_name, "_source": doc} for doc in docs]
    success, failed = bulk(es, actions, chunk_size=100, max_retries=3)
    print(f"Successfully indexed {success} documents into {index_name}")

This code configures Elasticsearch for vector search by creating five different index varieties (flat, HNSW, and quantized variants) optimized for TwelveLabs' 1024-dimensional embeddings. It connects to Elasticsearch, prepares video segment documents with embeddings, creates specialized indices with appropriate mappings for dense vectors, and bulk uploads the documents. Each index type offers different trade-offs between search accuracy, speed, and storage requirements, allowing developers to choose the best approach based on their specific use case requirements.


8 - Creating Text Embeddings for Search

One of the most powerful aspects of TwelveLabs' technology is the ability to search video content using natural language. To enable this, we generate text embeddings using the same Marengo 2.7 model that processed our videos, ensuring they exist in the same vector space:

# Generate text embedding for search
def create_text_embedding(text_query: str) -> list:
    """Generate text embedding using Marengo via Bedrock"""
    text_model_input = {"inputType": "text", "inputText": text_query}
    
    response = bedrock_client.invoke_model(
        modelId=TEXT_EMBEDDING_MODEL_ID,
        body=json.dumps(text_model_input)
    )
    
    response_body = json.loads(response['body'].read().decode('utf-8'))
    embedding_data = response_body.get("data", [])
    
    return embedding_data[0]["embedding"] if embedding_data else None

This code defines a function called create_text_embedding that generates vector representations (embeddings) for text queries using the TwelveLabs Marengo model via Amazon Bedrock. The function takes a text query string as input, formats it properly for the model, sends the request to Bedrock using the synchronous invoke_model API, and then parses the response to extract the 1024-dimensional embedding vector. These text embeddings share the same vector space as the video embeddings, enabling semantic similarity searches that can find video moments based on natural language descriptions.


9 - Performing Vector Searches

With everything in place, we can now search our video library using natural language queries. Here's how to implement a search function that finds the most relevant video segments:

def vector_query(index_name: str, text_query: str) -> dict:
    """Execute k-NN vector search against Elasticsearch"""
    query_embedding = create_text_embedding(text_query)
    
    query = {
        "retriever": {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": 10,
                "num_candidates": "25"  
            }
        },
        "size": 10,
        "_source": False,
        "fields": ["title", "video_id", "start_sec"]
    }
    
    return es.search(index=index_name, body=query).body

# Test search
text_query = "Show me scenes with dinosaurs"  
results = vector_query("twelvelabs-movie-trailer-flat", text_query)
print(results)

This code implements a vector search function that enables natural language queries against video content.

  • It first generates a text embedding from the query using the same Marengo model that processed the videos, ensuring compatibility in the vector space.

  • The function then constructs an Elasticsearch k-NN query, requesting the top 10 most similar video segments while evaluating 25 candidates for better accuracy.

  • The search returns relevant metadata like title, video ID, and timestamp, allowing applications to link directly to the specific moments in videos that match the query's semantic meaning.


10 - Building a Simple Search Interface

For a more user-friendly experience, we can create a simple search interface using IPython widgets. This allows users to input search queries and view results with direct links to the specific moments in videos:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display

def display_search_results_html(query):
    """Format search results as clickable YouTube links"""
    results = vector_query("twelvelabs-movie-trailer-flat", query)
    hits = results.get('hits', {}).get('hits', [])
    
    if not hits:
        return "<p>No results found</p>"
    
    items = []
    for hit in hits:
        fields = hit.get('fields', {})
        title = fields.get('title', ['No Title'])[0]
        score = hit.get('_score', 0) 
        video_id = fields.get('video_id', [''])[0]
        start_sec = fields.get('start_sec', [0])[0]
        
        # Create YouTube deep-link to specific timestamp
        url = f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
        items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')
    
    return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"

def search_videos():
    """Create interactive search widget"""
    search_input = widgets.Text(
        value='', 
        placeholder='Enter your search query…',
        description='Search:',
        layout=Layout(width='70%')
    )
    
    search_button = widgets.Button(
        description='Search Videos',
        button_style='primary',
        layout=Layout(width='20%')
    )
    
    results_box = WHTML(value="")
    
    def on_button_click(_):
        q = search_input.value.strip()
        if not q:
            results_box.value = "<p>Please enter a search query</p>"
            return
        results_box.value = "<p>Searching…</p>"
        results_box.value = display_search_results_html(q)
    
    search_button.on_click(on_button_click)
    display(HBox([search_input, search_button]))
    display(results_box)

search_videos()

This code creates an interactive search interface using IPython widgets that allows users to search video content with natural language queries.

  • It defines a function to display search results as clickable YouTube links that open videos at the exact timestamp where relevant content appears.

  • The interface includes a text input field for queries, a search button, and a results display area.

  • When a user enters a query and clicks search, the system generates a text embedding from the query, performs a vector similarity search in Elasticsearch, and returns links to the most semantically relevant video segments, complete with scores indicating match quality.


Performance Analysis

Testing shows interesting patterns in search results. For the query "Show me scenes with dinosaurs":

  • Top result: Jurassic World Rebirth trailer at 134.5s (score: 0.6405)

  • Secondary match: How to Train Your Dragon clip at 121.2s (scores: 0.6228)

  • Precision: Marengo correctly identifies both literal dinosaurs and dragon-like creatures

At TwelveLabs, we understand the importance of balancing accuracy and performance when working with high-dimensional embeddings. Elasticsearch offers several quantization methods that can dramatically improve search performance while maintaining accuracy.

In particular, Elasticsearch's Better Binary Quantization (BBQ) stands out as an excellent option for TwelveLabs' 1024-dimensional embeddings. BBQ reduces memory usage and speeds up search operations by using a binary representation internally while preserving the original vectors for final scoring.

For most production deployments with TwelveLabs embeddings, we recommend:

  • bbq_hnsw: Best for large collections (millions of vectors) where both speed and accuracy are important.

  • flat: Best for smaller collections where maximum accuracy is required.


Production Considerations


Scaling Video Processing for Production

When deploying TwelveLabs Marengo with Elasticsearch in production environments, efficient video processing becomes crucial. To optimize your workflow, implement a serverless architecture using S3 event triggers that automatically process new video uploads. This approach eliminates manual intervention and creates a seamless ingestion pipeline. Additionally, leverage Amazon Bedrock's asynchronous invocation capabilities to process multiple videos in parallel, dramatically reducing overall processing time for large media libraries.

Cost management is equally important when working with video AI at scale. Implement a caching strategy that stores embeddings in S3 after initial generation, preventing redundant processing of the same content. This practice can significantly reduce both API costs and processing time, especially for frequently accessed videos or when reindexing your Elasticsearch deployment.


Effective Index Management

Proper index management is essential for maintaining optimal search performance. The following code demonstrates how to implement lifecycle policies for your vector indices:

# Clean up indexes when done
for index_variety in index_varieties:
    es.indices.delete(index=f"twelvelabs-movie-trailer-{index_variety}")

# Clean up S3 objects
s3_objects = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME)
if 'Contents' in s3_objects:
    delete_keys = [{'Key': obj['Key']} for obj in s3_objects['Contents']]
    s3_client.delete_objects(Bucket=S3_BUCKET_NAME, Delete={'Objects': delete_keys})

In production systems, consider implementing index rotation strategies rather than deletion. Create time-based indices (e.g., monthly) and use aliases to maintain consistent search endpoints while allowing for efficient background reindexing and optimization.


Advanced Search Applications

The true power of integrating TwelveLabs Marengo with Elasticsearch emerges when implementing advanced search capabilities. Create hybrid search systems that combine Marengo's semantic understanding with Elasticsearch's BM25 scoring on video transcripts. This approach balances the strengths of both technologies – Marengo's deep visual understanding with the precision of text search – resulting in more comprehensive and accurate results.

For more sophisticated applications, implement multi-modal RAG (Retrieval Augmented Generation) workflows by pairing Marengo with TwelveLabs' Pegasus 1.2 model. This combination allows you to not only find relevant video content but also generate contextual summaries that provide users with deeper insights without requiring them to watch entire videos.

Finally, enhance user experience by implementing real-time filtering capabilities alongside vector search. Elasticsearch excels at combining vector similarity with boolean filters, enabling you to incorporate geospatial constraints (e.g., "show me outdoor scenes in New York"), temporal filters (e.g., "videos from the last 30 days"), and access control restrictions – all without sacrificing the semantic understanding that makes Marengo so powerful.\


Getting Started

With Marengo's video understanding and Elasticsearch's vector search, you can finally search video content as naturally as you search text—unlocking insights that were previously hidden in your media libraries.

Request access to TwelveLabs models in Amazon Bedrock, set up your Elasticsearch deployment, and start building intelligent video applications that truly understand your content today. The complete notebook is available in the Elasticsearch Labs repository.


Essential Resources

Get Started Immediately:

Implementation Guides:

Developer Resources: