Tutorial

From Raw Surveillance to Training-Ready Datasets in Minutes: Building an Automated Video Curator with TwelveLabs and FiftyOne

Nathan Che

Manual video annotation costs enterprises thousands of hours per year and scales linearly with footage volume. This tutorial builds a pipeline that replaces that workflow — using TwelveLabs' Marengo 3.0 embeddings and Pegasus 1.2 reasoning to ingest raw surveillance footage, cluster it by semantic similarity, and auto-generate training labels. FiftyOne handles visualization and quality auditing. The output: a structured PyTorch DataLoader, ready for model training, with zero frames manually reviewed.

Manual video annotation costs enterprises thousands of hours per year and scales linearly with footage volume. This tutorial builds a pipeline that replaces that workflow — using TwelveLabs' Marengo 3.0 embeddings and Pegasus 1.2 reasoning to ingest raw surveillance footage, cluster it by semantic similarity, and auto-generate training labels. FiftyOne handles visualization and quality auditing. The output: a structured PyTorch DataLoader, ready for model training, with zero frames manually reviewed.

Join our newsletter

Receive the latest advancements, tutorials, and industry insights in video understanding

Search, analyze, and explore your videos with AI.

Feb 24, 2026

11 Minutes

Copy link to article

Introduction

Thousands of terabytes of surveillance footage flow through factory IP cameras every year. Somewhere in that data is the forklift violation that caused a near-miss last Tuesday, the PPE non-compliance pattern on the night shift, and the blocked fire exit that nobody noticed for six weeks. The footage exists. The labels do not.

This gap between raw video volume and usable training data is the single largest bottleneck in deploying computer vision for worker safety. Traditional solutions — AWS SageMaker Ground Truth, Scale AI, dedicated annotation teams — attack the problem by throwing human reviewers at it. That approach works, but it costs $25–50 per hour of video reviewed and scales linearly. Double your camera count, double your annotation budget.

What if you could invert that workflow entirely? Instead of scrubbing through footage to find incidents, what if the video could organize itself — grouping similar events by meaning, then describing what it found?

That is what we are building in this tutorial. By combining TwelveLabs' video understanding models with FiftyOne's dataset management platform, we will construct a pipeline that:

  1. Ingests and indexes raw surveillance footage using the TwelveLabs API

  2. Extracts multimodal embeddings (Marengo 3.0) that represent video content as dense vectors — capturing visual, audio, and contextual signals simultaneously

  3. Clusters videos semantically using KMeans, grouping similar safety incidents without any predefined categories

  4. Auto-labels each cluster using generative video reasoning (Pegasus 1.2), converting abstract cluster IDs into human-readable safety classifications

  5. Visualizes the results in FiftyOne for quality auditing, then exports a training-ready PyTorch dataset

The key insight: TwelveLabs doesn't process video frame-by-frame. Marengo encodes video as a compressed multimodal representation — what the team calls "video as volume" — where audio, text, motion, and visual context are unified into a single embedding space. This is why semantic clustering works. Videos that look different but mean the same thing (a forklift cutting off a pedestrian from two different camera angles) end up near each other in embedding space.

Watch the walkthrough below for a quick demo of the finished application:


Prerequisites

This project requires three things:

  1. Python 3.8+: Download Python

  2. TwelveLabs API Key: Authentication docs (free tier available)

  3. HuggingFace Account (optional): Only needed if you want to use the same surveillance dataset we use — Voxel51/Safe_and_Unsafe_Behaviours

Clone the repository and install dependencies:

>> git clone https://github.com/nathanchess/visual-ai-worker-safety-kit
>> cd

>> git clone https://github.com/nathanchess/visual-ai-worker-safety-kit
>> cd


Architecture Overview

Before writing any code, it helps to see the full pipeline. At a high level, this project has two halves: TwelveLabs provides the intelligence (embeddings and labels), and FiftyOne provides the infrastructure (visualization, filtering, and export).

Full-Screen View (Lucid App): [Voxel51 x TwelveLabs] - Semantic Dataset Curator Tool 

Marengo 3.0 is TwelveLabs' encoder model — it compresses video into rich multimodal embeddings that capture what is seen, heard, and contextually implied. Pegasus 1.2 is the reasoning model — it traverses that compressed representation to identify events, classify behaviors, and generate natural-language descriptions. FiftyOne sits downstream, ingesting the embeddings and labels to provide interactive visualization, similarity browsing, and direct export to PyTorch.

The complete source code is available in the GitHub repository. The walkthrough below follows the structure of main.py.


Step 1: Intelligent Video Ingestion

We start by loading the surveillance dataset from Hugging Face. FiftyOne's ViewField (imported as F) lets us build declarative filters without modifying the underlying data. Three filters matter here: restrict to the training split, drop clips shorter than 4 seconds (too brief for meaningful embedding), and skip any videos we have already indexed.

from fiftyone import ViewField as F
from fiftyone.utils.huggingface import load_from_hub

def load_or_create_dataset(dataset_name):
    # Load directly from Hugging Face Hub
    return load_from_hub("Voxel51/Safe_and_Unsafe_Behaviours", name=dataset_name)

def ingest_videos(client, index_id, dataset, min_duration=4.0):
    # Create a view: Train split ONLY, Duration >= 4s, NOT yet indexed
    base_view = (
        dataset
        .match_tags("train")
        .match(F("metadata.duration") >= min_duration)
        .match(~F("tl_video_id").exists())  # Idempotency check
    )
    
    # ... iteration logic ...
from fiftyone import ViewField as F
from fiftyone.utils.huggingface import load_from_hub

def load_or_create_dataset(dataset_name):
    # Load directly from Hugging Face Hub
    return load_from_hub("Voxel51/Safe_and_Unsafe_Behaviours", name=dataset_name)

def ingest_videos(client, index_id, dataset, min_duration=4.0):
    # Create a view: Train split ONLY, Duration >= 4s, NOT yet indexed
    base_view = (
        dataset
        .match_tags("train")
        .match(F("metadata.duration") >= min_duration)
        .match(~F("tl_video_id").exists())  # Idempotency check
    )
    
    # ... iteration logic ...

The ~F("tl_video_id").exists() filter is worth pausing on. In a production environment, you might run this pipeline daily as new footage arrives. This filter ensures you never pay to index the same video twice — if a sample already has a TwelveLabs video ID stored on it, the view silently skips it. Idempotency at the data layer, with no bookkeeping code required.


Step 2: Extracting Embeddings and Populating FiftyOne

With the filtered view constructed, we iterate through samples and upload each video to TwelveLabs for indexing. FiftyOne's iter_samples(autosave=True) batches database writes automatically, which matters when you are processing thousands of clips — it avoids a round-trip to the database on every single sample.

# Iterate through the filtered view
    for sample in label_view.iter_samples(autosave=True, progress=True):
        try:
            # Upload to TwelveLabs and save the returned Video ID to the sample
            sample["tl_video_id"] = index_video_to_twelvelabs(
                client, index_id, sample
            )
            print(f"  ✓ {sample.filename}")
        except Exception as e:
            print(f"  ✗ {sample.filename}: {e}")
# Iterate through the filtered view
    for sample in label_view.iter_samples(autosave=True, progress=True):
        try:
            # Upload to TwelveLabs and save the returned Video ID to the sample
            sample["tl_video_id"] = index_video_to_twelvelabs(
                client, index_id, sample
            )
            print(f"  ✓ {sample.filename}")
        except Exception as e:
            print(f"  ✗ {sample.filename}: {e}")

Once indexing completes, we fetch the Marengo 3.0 embeddings for each video. These are dense vectors that encode the full multimodal content of each clip — what is happening visually, what is audible, and the contextual relationship between the two. Unlike embeddings from image-only models (which would treat each frame independently), Marengo's embeddings capture temporal continuity. A "worker removing safety goggles" and a "worker putting on safety goggles" produce meaningfully different vectors, even though any single frame might look identical.


Step 3: Unsupervised Semantic Clustering

This is the core of the pipeline. We have no predefined categories. We do not know how many types of safety violations exist in this footage, or what they look like. Instead, we apply KMeans clustering on the embedding space and let the data self-organize.

def cluster_and_label(client, dataset, embeddings, num_clusters=8):
    print(f"Clustering {len(embeddings)} videos into {num_clusters} clusters...")

    # 1. Cluster embeddings
    kmeans = KMeans(n_clusters=num_clusters, random_state=0)
    cluster_labels = kmeans.fit_predict(embeddings)

    # 2. Map clusters to semantic labels using Pegasus (Generative AI)
    cluster_label_map = {}
    for cluster_idx in np.unique(cluster_labels):
         # We pick a representative video from the cluster and ask Pegasus to label it
         representative_video_id = get_video_id_for_cluster(cluster_idx)
         cluster_label_map[cluster_idx] = generate_label(client, representative_video_id)

    # 3. Batch update FiftyOne samples
    indexed_view = dataset.exists("tl_video_id")
    classifications = [Classification(label=cluster_label_map[c]) for c in cluster_labels]
    indexed_view.set_values("pred_cluster", classifications)
def cluster_and_label(client, dataset, embeddings, num_clusters=8):
    print(f"Clustering {len(embeddings)} videos into {num_clusters} clusters...")

    # 1. Cluster embeddings
    kmeans = KMeans(n_clusters=num_clusters, random_state=0)
    cluster_labels = kmeans.fit_predict(embeddings)

    # 2. Map clusters to semantic labels using Pegasus (Generative AI)
    cluster_label_map = {}
    for cluster_idx in np.unique(cluster_labels):
         # We pick a representative video from the cluster and ask Pegasus to label it
         representative_video_id = get_video_id_for_cluster(cluster_idx)
         cluster_label_map[cluster_idx] = generate_label(client, representative_video_id)

    # 3. Batch update FiftyOne samples
    indexed_view = dataset.exists("tl_video_id")
    classifications = [Classification(label=cluster_label_map[c]) for c in cluster_labels]
    indexed_view.set_values("pred_cluster", classifications)

A technical note on set_values(): this updates the entire dataset in a single bulk operation. The naive alternative — iterating sample by sample and calling sample.save() — would be orders of magnitude slower at scale. When you are working with thousands of videos, this distinction matters.

Why does semantic clustering produce meaningful groups here? Because Marengo's embeddings encode video as a compressed multimodal representation, not as a bag of per-frame features. Two clips of a forklift blocking a walkway — shot from different angles, at different times of day, with different ambient noise — will land near each other in embedding space because they share semantic meaning. Traditional approaches based on visual similarity alone would scatter them.


Step 4: Zero-Shot Auto-Labeling with Pegasus

A cluster ID like "Cluster 3" is mathematically useful but operationally meaningless. We need "Forklift Right-of-Way Violation." This is where Pegasus 1.2 comes in.

For each cluster, we select a representative video and send it to Pegasus with a structured prompt. Pegasus reasons over the full video — not a single frame, not a transcript — and returns a safety classification. No fine-tuning required. This is zero-shot generative reasoning: Pegasus has never seen this specific surveillance footage before, yet it produces labels that align with established safety protocols.

CLUSTER_LABEL_PROMPT = """
Analyze this workplace safety video and classify it as exactly ONE of the following labels.
UNSAFE BEHAVIORS:
- Safe Walkway Violation
- Unauthorized Intervention
- Opened Panel Cover
...
Return ONLY the exact label name.
"""
CLUSTER_LABEL_PROMPT = """
Analyze this workplace safety video and classify it as exactly ONE of the following labels.
UNSAFE BEHAVIORS:
- Safe Walkway Violation
- Unauthorized Intervention
- Opened Panel Cover
...
Return ONLY the exact label name.
"""

The prompt constrains Pegasus to a predefined taxonomy of safety categories. This is a deliberate design choice: in a production deployment, safety managers need labels that map to their existing incident classification systems, not free-form descriptions. You can adapt this taxonomy to match your organization's specific safety protocols by editing the prompt — no retraining, no new model, just a different set of label strings.


Step 5: Interactive Embedding Visualization with UMAP

Numbers and labels are necessary, but for computer vision work, you need to see the data. FiftyOne's Brain module computes a 2D UMAP projection of the high-dimensional embeddings, producing an interactive scatterplot where each point is a video and colors correspond to auto-generated labels.

What to look for in this visualization: tight, well-separated clusters indicate that the embeddings are producing clean semantic groupings — the model is confidently distinguishing between, say, "blocked fire exit" and "PPE violation." Overlapping clusters suggest categories that may need refinement (perhaps "improper lifting" and "ergonomic violation" are too similar to separate cleanly). Outliers — points sitting far from any cluster center — are worth investigating manually, as they may represent rare incidents or mislabeled data.

This step is not cosmetic. Visualizing embeddings before export catches data quality issues that would otherwise propagate silently into your training pipeline. A five-minute review here can prevent hours of debugging downstream.


Step 6: Exporting for PyTorch

A curated dataset is only valuable if it can train a model. We use FiftyOne's to_torch() method with a custom GetItem class that maps string labels to tensors and retrieves pre-computed embeddings on the fly.

from fiftyone.utils.torch import GetItem
import torch

class WorkerSafetyGetItem(GetItem):
    def __call__(self, d):
        return {
            "embedding": torch.tensor(d.get("tl_embedding"), dtype=torch.float32),
            "label_idx": torch.tensor(
                self.label_to_idx.get(d.get("ground_truth").label, -1),
                dtype=torch.long
            ),
        }

def create_dataloader(dataset, batch_size=4):
    indexed_view = dataset.exists("tl_video_id")
    
    # Create a PyTorch dataset directly from the FiftyOne view
    torch_dataset = indexed_view.to_torch(WorkerSafetyGetItem(LABEL_TO_IDX))
    
    return DataLoader(torch_dataset, batch_size=batch_size, shuffle=True)
from fiftyone.utils.torch import GetItem
import torch

class WorkerSafetyGetItem(GetItem):
    def __call__(self, d):
        return {
            "embedding": torch.tensor(d.get("tl_embedding"), dtype=torch.float32),
            "label_idx": torch.tensor(
                self.label_to_idx.get(d.get("ground_truth").label, -1),
                dtype=torch.long
            ),
        }

def create_dataloader(dataset, batch_size=4):
    indexed_view = dataset.exists("tl_video_id")
    
    # Create a PyTorch dataset directly from the FiftyOne view
    torch_dataset = indexed_view.to_torch(WorkerSafetyGetItem(LABEL_TO_IDX))
    
    return DataLoader(torch_dataset, batch_size=batch_size, shuffle=True)

The resulting DataLoader yields batches of pre-computed embeddings paired with integer labels. Because the embeddings were already extracted by Marengo during the indexing step, the training loop skips the most expensive part of the typical CV pipeline — feature extraction. You can train a lightweight classifier (an MLP or linear probe) on top of these embeddings in seconds rather than hours, and the labels are semantically consistent because they were generated by Pegasus reasoning over actual video content, not inferred from filenames or folder structures.

An example dataset is available in the GitHub repository.


Why This Matters: From Curated Data to Operational Intelligence

With the technical pipeline complete, it is worth stepping back to consider what this workflow enables at an organizational level.

Accelerating model development. The conventional path from raw footage to trained model involves weeks of annotation, multiple review cycles, and significant cost. This pipeline compresses that into a single script execution. The exported dataset contains pre-computed embeddings and semantically consistent labels — a training loop can begin immediately. For teams iterating on safety classification models, this removes the annotation bottleneck entirely.

Surfacing systemic patterns. For a safety manager, the clustering visualization is not just a data quality tool — it is an operational dashboard. If KMeans produces a disproportionately large cluster for "blocked fire exit," that is not a one-off incident. It is a systemic failure pattern that demands a process-level response, not just a written warning. Running this pipeline on a recurring schedule (daily, weekly) transforms surveillance footage from a passive liability into an active intelligence source. You can track whether specific violation categories are trending upward or downward over time, measure the impact of safety interventions, and identify which shifts or zones have the highest incident density.

Reducing annotation costs. Manual video annotation for safety-critical datasets typically costs $25–50 per reviewed hour of footage. This pipeline replaces that cost with API compute time. For organizations operating hundreds of cameras, the difference compounds quickly. More importantly, the labels are reproducible — run the same pipeline on new footage and you get consistent classifications, without the inter-annotator variability that plagues manual labeling at scale.


Conclusion

This tutorial walked through a complete pipeline: raw surveillance footage in, labeled PyTorch DataLoader out. TwelveLabs' Marengo 3.0 handled the hard part — encoding video into multimodal embeddings that capture semantic meaning across visual, audio, and contextual dimensions. Pegasus 1.2 converted those abstract clusters into human-readable safety classifications. FiftyOne provided the visualization and export infrastructure to make the results auditable and training-ready.

The full source code is available on GitHub. Clone it, point it at your own footage, and adapt the Pegasus labeling prompt to match your safety taxonomy.

To explore what else you can build with TwelveLabs' video understanding APIs — from content search to compliance monitoring to highlight generation — visit the TwelveLabs documentation or contact the team to discuss your use case.


Resources

Introduction

Thousands of terabytes of surveillance footage flow through factory IP cameras every year. Somewhere in that data is the forklift violation that caused a near-miss last Tuesday, the PPE non-compliance pattern on the night shift, and the blocked fire exit that nobody noticed for six weeks. The footage exists. The labels do not.

This gap between raw video volume and usable training data is the single largest bottleneck in deploying computer vision for worker safety. Traditional solutions — AWS SageMaker Ground Truth, Scale AI, dedicated annotation teams — attack the problem by throwing human reviewers at it. That approach works, but it costs $25–50 per hour of video reviewed and scales linearly. Double your camera count, double your annotation budget.

What if you could invert that workflow entirely? Instead of scrubbing through footage to find incidents, what if the video could organize itself — grouping similar events by meaning, then describing what it found?

That is what we are building in this tutorial. By combining TwelveLabs' video understanding models with FiftyOne's dataset management platform, we will construct a pipeline that:

  1. Ingests and indexes raw surveillance footage using the TwelveLabs API

  2. Extracts multimodal embeddings (Marengo 3.0) that represent video content as dense vectors — capturing visual, audio, and contextual signals simultaneously

  3. Clusters videos semantically using KMeans, grouping similar safety incidents without any predefined categories

  4. Auto-labels each cluster using generative video reasoning (Pegasus 1.2), converting abstract cluster IDs into human-readable safety classifications

  5. Visualizes the results in FiftyOne for quality auditing, then exports a training-ready PyTorch dataset

The key insight: TwelveLabs doesn't process video frame-by-frame. Marengo encodes video as a compressed multimodal representation — what the team calls "video as volume" — where audio, text, motion, and visual context are unified into a single embedding space. This is why semantic clustering works. Videos that look different but mean the same thing (a forklift cutting off a pedestrian from two different camera angles) end up near each other in embedding space.

Watch the walkthrough below for a quick demo of the finished application:


Prerequisites

This project requires three things:

  1. Python 3.8+: Download Python

  2. TwelveLabs API Key: Authentication docs (free tier available)

  3. HuggingFace Account (optional): Only needed if you want to use the same surveillance dataset we use — Voxel51/Safe_and_Unsafe_Behaviours

Clone the repository and install dependencies:

>> git clone https://github.com/nathanchess/visual-ai-worker-safety-kit
>> cd


Architecture Overview

Before writing any code, it helps to see the full pipeline. At a high level, this project has two halves: TwelveLabs provides the intelligence (embeddings and labels), and FiftyOne provides the infrastructure (visualization, filtering, and export).

Full-Screen View (Lucid App): [Voxel51 x TwelveLabs] - Semantic Dataset Curator Tool 

Marengo 3.0 is TwelveLabs' encoder model — it compresses video into rich multimodal embeddings that capture what is seen, heard, and contextually implied. Pegasus 1.2 is the reasoning model — it traverses that compressed representation to identify events, classify behaviors, and generate natural-language descriptions. FiftyOne sits downstream, ingesting the embeddings and labels to provide interactive visualization, similarity browsing, and direct export to PyTorch.

The complete source code is available in the GitHub repository. The walkthrough below follows the structure of main.py.


Step 1: Intelligent Video Ingestion

We start by loading the surveillance dataset from Hugging Face. FiftyOne's ViewField (imported as F) lets us build declarative filters without modifying the underlying data. Three filters matter here: restrict to the training split, drop clips shorter than 4 seconds (too brief for meaningful embedding), and skip any videos we have already indexed.

from fiftyone import ViewField as F
from fiftyone.utils.huggingface import load_from_hub

def load_or_create_dataset(dataset_name):
    # Load directly from Hugging Face Hub
    return load_from_hub("Voxel51/Safe_and_Unsafe_Behaviours", name=dataset_name)

def ingest_videos(client, index_id, dataset, min_duration=4.0):
    # Create a view: Train split ONLY, Duration >= 4s, NOT yet indexed
    base_view = (
        dataset
        .match_tags("train")
        .match(F("metadata.duration") >= min_duration)
        .match(~F("tl_video_id").exists())  # Idempotency check
    )
    
    # ... iteration logic ...

The ~F("tl_video_id").exists() filter is worth pausing on. In a production environment, you might run this pipeline daily as new footage arrives. This filter ensures you never pay to index the same video twice — if a sample already has a TwelveLabs video ID stored on it, the view silently skips it. Idempotency at the data layer, with no bookkeeping code required.


Step 2: Extracting Embeddings and Populating FiftyOne

With the filtered view constructed, we iterate through samples and upload each video to TwelveLabs for indexing. FiftyOne's iter_samples(autosave=True) batches database writes automatically, which matters when you are processing thousands of clips — it avoids a round-trip to the database on every single sample.

# Iterate through the filtered view
    for sample in label_view.iter_samples(autosave=True, progress=True):
        try:
            # Upload to TwelveLabs and save the returned Video ID to the sample
            sample["tl_video_id"] = index_video_to_twelvelabs(
                client, index_id, sample
            )
            print(f"  ✓ {sample.filename}")
        except Exception as e:
            print(f"  ✗ {sample.filename}: {e}")

Once indexing completes, we fetch the Marengo 3.0 embeddings for each video. These are dense vectors that encode the full multimodal content of each clip — what is happening visually, what is audible, and the contextual relationship between the two. Unlike embeddings from image-only models (which would treat each frame independently), Marengo's embeddings capture temporal continuity. A "worker removing safety goggles" and a "worker putting on safety goggles" produce meaningfully different vectors, even though any single frame might look identical.


Step 3: Unsupervised Semantic Clustering

This is the core of the pipeline. We have no predefined categories. We do not know how many types of safety violations exist in this footage, or what they look like. Instead, we apply KMeans clustering on the embedding space and let the data self-organize.

def cluster_and_label(client, dataset, embeddings, num_clusters=8):
    print(f"Clustering {len(embeddings)} videos into {num_clusters} clusters...")

    # 1. Cluster embeddings
    kmeans = KMeans(n_clusters=num_clusters, random_state=0)
    cluster_labels = kmeans.fit_predict(embeddings)

    # 2. Map clusters to semantic labels using Pegasus (Generative AI)
    cluster_label_map = {}
    for cluster_idx in np.unique(cluster_labels):
         # We pick a representative video from the cluster and ask Pegasus to label it
         representative_video_id = get_video_id_for_cluster(cluster_idx)
         cluster_label_map[cluster_idx] = generate_label(client, representative_video_id)

    # 3. Batch update FiftyOne samples
    indexed_view = dataset.exists("tl_video_id")
    classifications = [Classification(label=cluster_label_map[c]) for c in cluster_labels]
    indexed_view.set_values("pred_cluster", classifications)

A technical note on set_values(): this updates the entire dataset in a single bulk operation. The naive alternative — iterating sample by sample and calling sample.save() — would be orders of magnitude slower at scale. When you are working with thousands of videos, this distinction matters.

Why does semantic clustering produce meaningful groups here? Because Marengo's embeddings encode video as a compressed multimodal representation, not as a bag of per-frame features. Two clips of a forklift blocking a walkway — shot from different angles, at different times of day, with different ambient noise — will land near each other in embedding space because they share semantic meaning. Traditional approaches based on visual similarity alone would scatter them.


Step 4: Zero-Shot Auto-Labeling with Pegasus

A cluster ID like "Cluster 3" is mathematically useful but operationally meaningless. We need "Forklift Right-of-Way Violation." This is where Pegasus 1.2 comes in.

For each cluster, we select a representative video and send it to Pegasus with a structured prompt. Pegasus reasons over the full video — not a single frame, not a transcript — and returns a safety classification. No fine-tuning required. This is zero-shot generative reasoning: Pegasus has never seen this specific surveillance footage before, yet it produces labels that align with established safety protocols.

CLUSTER_LABEL_PROMPT = """
Analyze this workplace safety video and classify it as exactly ONE of the following labels.
UNSAFE BEHAVIORS:
- Safe Walkway Violation
- Unauthorized Intervention
- Opened Panel Cover
...
Return ONLY the exact label name.
"""

The prompt constrains Pegasus to a predefined taxonomy of safety categories. This is a deliberate design choice: in a production deployment, safety managers need labels that map to their existing incident classification systems, not free-form descriptions. You can adapt this taxonomy to match your organization's specific safety protocols by editing the prompt — no retraining, no new model, just a different set of label strings.


Step 5: Interactive Embedding Visualization with UMAP

Numbers and labels are necessary, but for computer vision work, you need to see the data. FiftyOne's Brain module computes a 2D UMAP projection of the high-dimensional embeddings, producing an interactive scatterplot where each point is a video and colors correspond to auto-generated labels.

What to look for in this visualization: tight, well-separated clusters indicate that the embeddings are producing clean semantic groupings — the model is confidently distinguishing between, say, "blocked fire exit" and "PPE violation." Overlapping clusters suggest categories that may need refinement (perhaps "improper lifting" and "ergonomic violation" are too similar to separate cleanly). Outliers — points sitting far from any cluster center — are worth investigating manually, as they may represent rare incidents or mislabeled data.

This step is not cosmetic. Visualizing embeddings before export catches data quality issues that would otherwise propagate silently into your training pipeline. A five-minute review here can prevent hours of debugging downstream.


Step 6: Exporting for PyTorch

A curated dataset is only valuable if it can train a model. We use FiftyOne's to_torch() method with a custom GetItem class that maps string labels to tensors and retrieves pre-computed embeddings on the fly.

from fiftyone.utils.torch import GetItem
import torch

class WorkerSafetyGetItem(GetItem):
    def __call__(self, d):
        return {
            "embedding": torch.tensor(d.get("tl_embedding"), dtype=torch.float32),
            "label_idx": torch.tensor(
                self.label_to_idx.get(d.get("ground_truth").label, -1),
                dtype=torch.long
            ),
        }

def create_dataloader(dataset, batch_size=4):
    indexed_view = dataset.exists("tl_video_id")
    
    # Create a PyTorch dataset directly from the FiftyOne view
    torch_dataset = indexed_view.to_torch(WorkerSafetyGetItem(LABEL_TO_IDX))
    
    return DataLoader(torch_dataset, batch_size=batch_size, shuffle=True)

The resulting DataLoader yields batches of pre-computed embeddings paired with integer labels. Because the embeddings were already extracted by Marengo during the indexing step, the training loop skips the most expensive part of the typical CV pipeline — feature extraction. You can train a lightweight classifier (an MLP or linear probe) on top of these embeddings in seconds rather than hours, and the labels are semantically consistent because they were generated by Pegasus reasoning over actual video content, not inferred from filenames or folder structures.

An example dataset is available in the GitHub repository.


Why This Matters: From Curated Data to Operational Intelligence

With the technical pipeline complete, it is worth stepping back to consider what this workflow enables at an organizational level.

Accelerating model development. The conventional path from raw footage to trained model involves weeks of annotation, multiple review cycles, and significant cost. This pipeline compresses that into a single script execution. The exported dataset contains pre-computed embeddings and semantically consistent labels — a training loop can begin immediately. For teams iterating on safety classification models, this removes the annotation bottleneck entirely.

Surfacing systemic patterns. For a safety manager, the clustering visualization is not just a data quality tool — it is an operational dashboard. If KMeans produces a disproportionately large cluster for "blocked fire exit," that is not a one-off incident. It is a systemic failure pattern that demands a process-level response, not just a written warning. Running this pipeline on a recurring schedule (daily, weekly) transforms surveillance footage from a passive liability into an active intelligence source. You can track whether specific violation categories are trending upward or downward over time, measure the impact of safety interventions, and identify which shifts or zones have the highest incident density.

Reducing annotation costs. Manual video annotation for safety-critical datasets typically costs $25–50 per reviewed hour of footage. This pipeline replaces that cost with API compute time. For organizations operating hundreds of cameras, the difference compounds quickly. More importantly, the labels are reproducible — run the same pipeline on new footage and you get consistent classifications, without the inter-annotator variability that plagues manual labeling at scale.


Conclusion

This tutorial walked through a complete pipeline: raw surveillance footage in, labeled PyTorch DataLoader out. TwelveLabs' Marengo 3.0 handled the hard part — encoding video into multimodal embeddings that capture semantic meaning across visual, audio, and contextual dimensions. Pegasus 1.2 converted those abstract clusters into human-readable safety classifications. FiftyOne provided the visualization and export infrastructure to make the results auditable and training-ready.

The full source code is available on GitHub. Clone it, point it at your own footage, and adapt the Pegasus labeling prompt to match your safety taxonomy.

To explore what else you can build with TwelveLabs' video understanding APIs — from content search to compliance monitoring to highlight generation — visit the TwelveLabs documentation or contact the team to discuss your use case.


Resources

Introduction

Thousands of terabytes of surveillance footage flow through factory IP cameras every year. Somewhere in that data is the forklift violation that caused a near-miss last Tuesday, the PPE non-compliance pattern on the night shift, and the blocked fire exit that nobody noticed for six weeks. The footage exists. The labels do not.

This gap between raw video volume and usable training data is the single largest bottleneck in deploying computer vision for worker safety. Traditional solutions — AWS SageMaker Ground Truth, Scale AI, dedicated annotation teams — attack the problem by throwing human reviewers at it. That approach works, but it costs $25–50 per hour of video reviewed and scales linearly. Double your camera count, double your annotation budget.

What if you could invert that workflow entirely? Instead of scrubbing through footage to find incidents, what if the video could organize itself — grouping similar events by meaning, then describing what it found?

That is what we are building in this tutorial. By combining TwelveLabs' video understanding models with FiftyOne's dataset management platform, we will construct a pipeline that:

  1. Ingests and indexes raw surveillance footage using the TwelveLabs API

  2. Extracts multimodal embeddings (Marengo 3.0) that represent video content as dense vectors — capturing visual, audio, and contextual signals simultaneously

  3. Clusters videos semantically using KMeans, grouping similar safety incidents without any predefined categories

  4. Auto-labels each cluster using generative video reasoning (Pegasus 1.2), converting abstract cluster IDs into human-readable safety classifications

  5. Visualizes the results in FiftyOne for quality auditing, then exports a training-ready PyTorch dataset

The key insight: TwelveLabs doesn't process video frame-by-frame. Marengo encodes video as a compressed multimodal representation — what the team calls "video as volume" — where audio, text, motion, and visual context are unified into a single embedding space. This is why semantic clustering works. Videos that look different but mean the same thing (a forklift cutting off a pedestrian from two different camera angles) end up near each other in embedding space.

Watch the walkthrough below for a quick demo of the finished application:


Prerequisites

This project requires three things:

  1. Python 3.8+: Download Python

  2. TwelveLabs API Key: Authentication docs (free tier available)

  3. HuggingFace Account (optional): Only needed if you want to use the same surveillance dataset we use — Voxel51/Safe_and_Unsafe_Behaviours

Clone the repository and install dependencies:

>> git clone https://github.com/nathanchess/visual-ai-worker-safety-kit
>> cd


Architecture Overview

Before writing any code, it helps to see the full pipeline. At a high level, this project has two halves: TwelveLabs provides the intelligence (embeddings and labels), and FiftyOne provides the infrastructure (visualization, filtering, and export).

Full-Screen View (Lucid App): [Voxel51 x TwelveLabs] - Semantic Dataset Curator Tool 

Marengo 3.0 is TwelveLabs' encoder model — it compresses video into rich multimodal embeddings that capture what is seen, heard, and contextually implied. Pegasus 1.2 is the reasoning model — it traverses that compressed representation to identify events, classify behaviors, and generate natural-language descriptions. FiftyOne sits downstream, ingesting the embeddings and labels to provide interactive visualization, similarity browsing, and direct export to PyTorch.

The complete source code is available in the GitHub repository. The walkthrough below follows the structure of main.py.


Step 1: Intelligent Video Ingestion

We start by loading the surveillance dataset from Hugging Face. FiftyOne's ViewField (imported as F) lets us build declarative filters without modifying the underlying data. Three filters matter here: restrict to the training split, drop clips shorter than 4 seconds (too brief for meaningful embedding), and skip any videos we have already indexed.

from fiftyone import ViewField as F
from fiftyone.utils.huggingface import load_from_hub

def load_or_create_dataset(dataset_name):
    # Load directly from Hugging Face Hub
    return load_from_hub("Voxel51/Safe_and_Unsafe_Behaviours", name=dataset_name)

def ingest_videos(client, index_id, dataset, min_duration=4.0):
    # Create a view: Train split ONLY, Duration >= 4s, NOT yet indexed
    base_view = (
        dataset
        .match_tags("train")
        .match(F("metadata.duration") >= min_duration)
        .match(~F("tl_video_id").exists())  # Idempotency check
    )
    
    # ... iteration logic ...

The ~F("tl_video_id").exists() filter is worth pausing on. In a production environment, you might run this pipeline daily as new footage arrives. This filter ensures you never pay to index the same video twice — if a sample already has a TwelveLabs video ID stored on it, the view silently skips it. Idempotency at the data layer, with no bookkeeping code required.


Step 2: Extracting Embeddings and Populating FiftyOne

With the filtered view constructed, we iterate through samples and upload each video to TwelveLabs for indexing. FiftyOne's iter_samples(autosave=True) batches database writes automatically, which matters when you are processing thousands of clips — it avoids a round-trip to the database on every single sample.

# Iterate through the filtered view
    for sample in label_view.iter_samples(autosave=True, progress=True):
        try:
            # Upload to TwelveLabs and save the returned Video ID to the sample
            sample["tl_video_id"] = index_video_to_twelvelabs(
                client, index_id, sample
            )
            print(f"  ✓ {sample.filename}")
        except Exception as e:
            print(f"  ✗ {sample.filename}: {e}")

Once indexing completes, we fetch the Marengo 3.0 embeddings for each video. These are dense vectors that encode the full multimodal content of each clip — what is happening visually, what is audible, and the contextual relationship between the two. Unlike embeddings from image-only models (which would treat each frame independently), Marengo's embeddings capture temporal continuity. A "worker removing safety goggles" and a "worker putting on safety goggles" produce meaningfully different vectors, even though any single frame might look identical.


Step 3: Unsupervised Semantic Clustering

This is the core of the pipeline. We have no predefined categories. We do not know how many types of safety violations exist in this footage, or what they look like. Instead, we apply KMeans clustering on the embedding space and let the data self-organize.

def cluster_and_label(client, dataset, embeddings, num_clusters=8):
    print(f"Clustering {len(embeddings)} videos into {num_clusters} clusters...")

    # 1. Cluster embeddings
    kmeans = KMeans(n_clusters=num_clusters, random_state=0)
    cluster_labels = kmeans.fit_predict(embeddings)

    # 2. Map clusters to semantic labels using Pegasus (Generative AI)
    cluster_label_map = {}
    for cluster_idx in np.unique(cluster_labels):
         # We pick a representative video from the cluster and ask Pegasus to label it
         representative_video_id = get_video_id_for_cluster(cluster_idx)
         cluster_label_map[cluster_idx] = generate_label(client, representative_video_id)

    # 3. Batch update FiftyOne samples
    indexed_view = dataset.exists("tl_video_id")
    classifications = [Classification(label=cluster_label_map[c]) for c in cluster_labels]
    indexed_view.set_values("pred_cluster", classifications)

A technical note on set_values(): this updates the entire dataset in a single bulk operation. The naive alternative — iterating sample by sample and calling sample.save() — would be orders of magnitude slower at scale. When you are working with thousands of videos, this distinction matters.

Why does semantic clustering produce meaningful groups here? Because Marengo's embeddings encode video as a compressed multimodal representation, not as a bag of per-frame features. Two clips of a forklift blocking a walkway — shot from different angles, at different times of day, with different ambient noise — will land near each other in embedding space because they share semantic meaning. Traditional approaches based on visual similarity alone would scatter them.


Step 4: Zero-Shot Auto-Labeling with Pegasus

A cluster ID like "Cluster 3" is mathematically useful but operationally meaningless. We need "Forklift Right-of-Way Violation." This is where Pegasus 1.2 comes in.

For each cluster, we select a representative video and send it to Pegasus with a structured prompt. Pegasus reasons over the full video — not a single frame, not a transcript — and returns a safety classification. No fine-tuning required. This is zero-shot generative reasoning: Pegasus has never seen this specific surveillance footage before, yet it produces labels that align with established safety protocols.

CLUSTER_LABEL_PROMPT = """
Analyze this workplace safety video and classify it as exactly ONE of the following labels.
UNSAFE BEHAVIORS:
- Safe Walkway Violation
- Unauthorized Intervention
- Opened Panel Cover
...
Return ONLY the exact label name.
"""

The prompt constrains Pegasus to a predefined taxonomy of safety categories. This is a deliberate design choice: in a production deployment, safety managers need labels that map to their existing incident classification systems, not free-form descriptions. You can adapt this taxonomy to match your organization's specific safety protocols by editing the prompt — no retraining, no new model, just a different set of label strings.


Step 5: Interactive Embedding Visualization with UMAP

Numbers and labels are necessary, but for computer vision work, you need to see the data. FiftyOne's Brain module computes a 2D UMAP projection of the high-dimensional embeddings, producing an interactive scatterplot where each point is a video and colors correspond to auto-generated labels.

What to look for in this visualization: tight, well-separated clusters indicate that the embeddings are producing clean semantic groupings — the model is confidently distinguishing between, say, "blocked fire exit" and "PPE violation." Overlapping clusters suggest categories that may need refinement (perhaps "improper lifting" and "ergonomic violation" are too similar to separate cleanly). Outliers — points sitting far from any cluster center — are worth investigating manually, as they may represent rare incidents or mislabeled data.

This step is not cosmetic. Visualizing embeddings before export catches data quality issues that would otherwise propagate silently into your training pipeline. A five-minute review here can prevent hours of debugging downstream.


Step 6: Exporting for PyTorch

A curated dataset is only valuable if it can train a model. We use FiftyOne's to_torch() method with a custom GetItem class that maps string labels to tensors and retrieves pre-computed embeddings on the fly.

from fiftyone.utils.torch import GetItem
import torch

class WorkerSafetyGetItem(GetItem):
    def __call__(self, d):
        return {
            "embedding": torch.tensor(d.get("tl_embedding"), dtype=torch.float32),
            "label_idx": torch.tensor(
                self.label_to_idx.get(d.get("ground_truth").label, -1),
                dtype=torch.long
            ),
        }

def create_dataloader(dataset, batch_size=4):
    indexed_view = dataset.exists("tl_video_id")
    
    # Create a PyTorch dataset directly from the FiftyOne view
    torch_dataset = indexed_view.to_torch(WorkerSafetyGetItem(LABEL_TO_IDX))
    
    return DataLoader(torch_dataset, batch_size=batch_size, shuffle=True)

The resulting DataLoader yields batches of pre-computed embeddings paired with integer labels. Because the embeddings were already extracted by Marengo during the indexing step, the training loop skips the most expensive part of the typical CV pipeline — feature extraction. You can train a lightweight classifier (an MLP or linear probe) on top of these embeddings in seconds rather than hours, and the labels are semantically consistent because they were generated by Pegasus reasoning over actual video content, not inferred from filenames or folder structures.

An example dataset is available in the GitHub repository.


Why This Matters: From Curated Data to Operational Intelligence

With the technical pipeline complete, it is worth stepping back to consider what this workflow enables at an organizational level.

Accelerating model development. The conventional path from raw footage to trained model involves weeks of annotation, multiple review cycles, and significant cost. This pipeline compresses that into a single script execution. The exported dataset contains pre-computed embeddings and semantically consistent labels — a training loop can begin immediately. For teams iterating on safety classification models, this removes the annotation bottleneck entirely.

Surfacing systemic patterns. For a safety manager, the clustering visualization is not just a data quality tool — it is an operational dashboard. If KMeans produces a disproportionately large cluster for "blocked fire exit," that is not a one-off incident. It is a systemic failure pattern that demands a process-level response, not just a written warning. Running this pipeline on a recurring schedule (daily, weekly) transforms surveillance footage from a passive liability into an active intelligence source. You can track whether specific violation categories are trending upward or downward over time, measure the impact of safety interventions, and identify which shifts or zones have the highest incident density.

Reducing annotation costs. Manual video annotation for safety-critical datasets typically costs $25–50 per reviewed hour of footage. This pipeline replaces that cost with API compute time. For organizations operating hundreds of cameras, the difference compounds quickly. More importantly, the labels are reproducible — run the same pipeline on new footage and you get consistent classifications, without the inter-annotator variability that plagues manual labeling at scale.


Conclusion

This tutorial walked through a complete pipeline: raw surveillance footage in, labeled PyTorch DataLoader out. TwelveLabs' Marengo 3.0 handled the hard part — encoding video into multimodal embeddings that capture semantic meaning across visual, audio, and contextual dimensions. Pegasus 1.2 converted those abstract clusters into human-readable safety classifications. FiftyOne provided the visualization and export infrastructure to make the results auditable and training-ready.

The full source code is available on GitHub. Clone it, point it at your own footage, and adapt the Pegasus labeling prompt to match your safety taxonomy.

To explore what else you can build with TwelveLabs' video understanding APIs — from content search to compliance monitoring to highlight generation — visit the TwelveLabs documentation or contact the team to discuss your use case.


Resources