パートナーシップ

マルチモーダルビデオ理解のためのTwelveLabs Embed APIとSnowflake Cortexの統合

ジェームズ・リー

開発者は、Twelve LabsのEmbed APIをSnowflake Cortexと統合し、マルチモーダルなビデオ埋め込みをSnowflake本来のVECTORデータ型に保存し、Snowflake Cortex Completeを使用して取得したビデオクリップや書き起こしから要約を生成することで、セマンティックなビデオ検索および要約のワークフローを構築できます。

開発者は、Twelve LabsのEmbed APIをSnowflake Cortexと統合し、マルチモーダルなビデオ埋め込みをSnowflake本来のVECTORデータ型に保存し、Snowflake Cortex Completeを使用して取得したビデオクリップや書き起こしから要約を生成することで、セマンティックなビデオ検索および要約のワークフローを構築できます。

この記事の内容

No headings found on page

ニュースレターに登録する

ニュースレターに登録する

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

ビデオ理解に関する最新の技術進歩、チュートリアル、業界の動向をお届けします

AIを活用してビデオを検索、分析、探索します。

2025/03/26

8分

記事へのリンクをコピー

このチュートリアルの基礎となる、SnowflakeとTwelveLabsを使用したAIビデオ検索に関するクイックスタートガイドを作成してくださったSnowflakeチームのDash Desai氏に深く感謝いたします。



1 - はじめに

このチュートリアルでは、Snowflake CortexTwelveLabs Embed APIを使用して、強力なビデオ検索および要約ワークフローを作成する方法を実演します。これら両製品の高度な機能を組み合わせることで、開発者はマルチモーダルなビデオ理解を実現し、セマンティック検索、検索拡張生成(RAG)、コンテンツ推奨などのアプリケーションを開発できるようになります。

クラウドストレージに保存されたビデオは、TwelveLabs Embed APIを使用して処理され、視覚、音声、テキストの情報をカプセル化したマルチモーダルな埋め込み(embeddings)が生成されます。これらの埋め込みはVECTORデータ型を利用してSnowflakeのテーブルに保存され、VECTOR_COSINE_SIMILARITYなどの関数を使用した効率的な類似性検索が可能になります。テキストクエリは埋め込みに変換され、最も関連性の高いビデオクリップが取得されます。さらに、MoviePyによる音声抽出やOpenAIのWhisperモデルを使用した文字起こしなどの追加処理が行われます。最後に、Snowflake Cortex Completeを活用して、ビデオの詳細、タイムスタンプ、文字起こしテキスト、コンテキストの洞察などの結果を要約します。

参考: Snowflake + TwelveLabs: データクラウドにおけるビデオAI

なぜSnowflake Container Runtimeを使用するのか?

Container Runtime上のSnowflake Notebooksは、Snowflake内で直接、高度な機械学習ワークフローを実行するための柔軟な環境を提供します。Snowpark Container Servicesを搭載したこのランタイムは、GPUやCPUなどの最適化されたコンピュートリソースを使用して、Pythonベースのデータサイエンスのタスクをサポートします。これにより、外部インフラの依存関係に悩まされることなく、外部ライブラリやツールをシームレスに統合し、TwelveLabs Embed APIのワークフローを実用化するのに最適です。



なぜTwelveLabs Embed APIを使用するのか?

TwelveLabs Embed APIを使用すると、開発者はビデオ用の最先端のマルチモーダルな埋め込みを作成できます。従来のテキストのみ、または画像ベースのモデルとは異なり、このAPIは視覚的な表情、話し言葉、ジェスチャーなどのモダリティ全体にわたる時空間的な関係性を捉えます。この統合されたアプローチにより、テキストからビデオへの検索、異常検知、ハイブリッドRAGワークフローなどのアプリケーションで正確なビデオ理解が可能になります。

ワークフローの主な特徴

  1. マルチモーダルな埋め込み:視覚、音声、テキストのモダリティ全体にわたってビデオのコンテキストを捉える埋め込みを生成します。

  2. 効率的な検索:SnowflakeのVECTORデータ型を使用して類似性検索を実行し、関連するビデオセグメントを高速に取得します。

  3. 音声の文字起こし:一致したクリップから音声を抽出し、Whisperを使用して文字起こしを行うことで、アクセシビリティを向上させます。

  4. 要約:Snowflake Cortex Completeを活用して、ビデオのメタデータと文字起こしを実用的な洞察に要約します。

このチュートリアルに従うことで、開発者はSnowflakeとTwelveLabsの堅牢なインフラを活用しながら、ビデオ理解のためのスケーラブルなAI搭載アプリケーションを構築できます。



2 - 前提条件

開始する前に、以下が準備されていることを確認してください(詳細はこちらのクイックスタートセットアップを参照)。

  1. Snowflakeアカウント

    • Container Runtimeが有効化されたSnowflakeアカウントへのアクセス。この機能により、Pythonベースの機械学習ワークフローをSnowflake内で直接実行できます。

    • Snowsightでユーザーロールを DASH_CONTAINER_RUNTIME_ROLE に切り替えます。

  2. TwelveLabsのAPIキー

    • TwelveLabsのアカウントを登録し、APIキーを取得します。

  3. Python環境

    • Snowpark Pythonおよび必要なライブラリ(httpxpydanticmoviepy など)の知識。

  4. サンプルビデオデータ

    • テスト用の公開アクセス可能なビデオURLのリスト。このガイドでは3つのサンプルビデオが提供されています。

  5. ノートブックの設定

  6. 環境設定

    • TwelveLabs APIにアクセスするために、Snowflakeに外部アクセス統合とシークレットが設定されていることを確認します。



3 - ステップバイステップガイド



ステップ 1: 必要なライブラリのインストール

ノートブックの最初のセルで、TwelveLabs、Whisper、MoviePyを含むすべての必要なPythonパッケージをインストールします。

!pip install twelvelabs
!pip install git+https://github.com/openai/whisper.git ffmpeg-python moviepy
!DEBIAN_FRONTEND=noninteractive apt-get install -y ffmpeg

これにより、埋め込み生成、音声処理、文字起こしのタスクに必要なすべての依存関係が利用可能になります。



ステップ 2: ライブラリのインポート

2番目のセルで、インストールしたライブラリを環境にインポートします。これには、埋め込み生成用のTwelveLabs、Snowflakeと対話するためのSnowpark、およびフロントエンドアプリケーション用のStreamlitが含まれます。

from twelvelabs import TwelveLabs
from snowflake.snowpark.context import get_active_session
import streamlit as st
import pandas as pd
import snowflake.cortex as cortex

session = get_active_session()

これにより、Snowflakeセッションが初期化され、埋め込みの作成と処理に必要なツールがセットアップされます。



ステップ 3: ビデオURLの提示

3番目のセルで、処理する公開アクセス可能なビデオURLのリストを定義します。これらのビデオは、埋め込みを生成するための入力として機能します。

video_urls = ['http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerJoyrides.mp4',
              'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerMeltdowns.mp4',
              'https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/WeAreGoingOnBullrun.mp4']



ステップ 4: 埋め込み生成用のSnowpark UDTFを作成する

次に、TwelveLabs Embed APIを使用して埋め込みを生成するために、Snowparkでユーザー定義テーブル関数(UDTF)create_video_embeddingsを定義および登録します。

このステップの主要な構成要素は次のとおりです。

  • 環境に twelvelabs.zip パッケージを追加する。

  • 安全なAPIアクセスのための外部アクセス統合とシークレットを指定する。

  • 埋め込み用にVECTORデータ型を持つ出力スキーマを定義する。

from snowflake.snowpark.functions import udtf
from snowflake.snowpark.types import StructType, StructField, FloatType, StringType, VectorType

session.clear_imports()
session.add_import('@"DASH_DB"."DASH_SCHEMA"."DASH_PKGS"/twelvelabs.zip')

@udtf(name="create_video_embeddings",
     packages=['httpx','pydantic'],
     external_access_integrations=['twelvelabs_access_integration'],
     secrets={'cred': 'twelve_labs_api'},
     if_not_exists=True,
     is_permanent=True,
     stage_location='@DASH_DB.DASH_SCHEMA.DASH_UDFS',
     output_schema=StructType([
        StructField("embedding", VectorType(float,1024)),
        StructField("start_offset_sec", FloatType()),
        StructField("end_offset_sec", FloatType()),
        StructField("embedding_scope", StringType())
    ])
    )

class create_video_embeddings:
    def __init__(self):
        from twelvelabs import TwelveLabs
        from twelvelabs.models.embed import EmbeddingsTask
        import _snowflake
        
        twelve_labs_api_key = _snowflake.get_generic_secret_string('cred') 
        twelvelabs_client = TwelveLabs(api_key=twelve_labs_api_key)
        self.twelvelabs_client = twelvelabs_client

    def process(self, video_url: str) -> Iterable[Tuple[list, float, float, str]]:
        # Create an embeddings task
        task = self.twelvelabs_client.embed.task.create(
            model_name="Marengo-retrieval-2.7",
            video_url=video_url
        )
        
        # Wait for the task to complete
        status = task.wait_for_done(sleep_interval=60)

        # Retrieve and process embeddings
        task = task.retrieve()
        if task.video_embedding is not None and task.video_embedding.segments is not None:
            for segment in task.video_embedding.segments:
                yield (
                    segment.embeddings_float,  # Embedding (list of floats)
                    segment.start_offset_sec,  # Start offset in seconds
                    segment.end_offset_sec,    # End offset in seconds
                    segment.embedding_scope,   # Embedding scope
                )



ステップ 5: 埋め込みを生成してSnowflakeに保存する

SnowparkのDataFrameを使用してビデオURLのリストを並行して処理し、生成された埋め込みを video_embeddings というSnowflakeテーブルに保存します。

df = session.create_dataframe(video_urls, schema=['url'])
df = df.join_table_function(create_video_embeddings(df['url']).over(partition_by="url"))
df.write.mode('overwrite').save_as_table('video_embeddings')
df = session.table('video_embeddings')



ステップ 6: ビデオクリップの文字起こし

OpenAIのWhisperモデルをダウンロードし、ビデオのダウンロード、音声クリップの抽出、およびそれらの文字起こしを行うヘルパー関数を定義します。

import urllib.request
import os
from moviepy import VideoFileClip
import whisper
import warnings
import logging

# Suppress Whisper warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# Set MoviePy logger level to ERROR or CRITICAL to suppress INFO logs
logging.getLogger("moviepy").setLevel(logging.ERROR)

# Load the Whisper model once
whisper_model = whisper.load_model("base")  # or any other model you want to use, e.g., 'small', 'medium', 'large'

def download_video(video_url, output_video_path, status):
    try:
        # status.caption("Downloading video file...")
        urllib.request.urlretrieve(video_url, output_video_path)
        # status.caption(f"Video downloaded to {output_video_path}.")
        return output_video_path
    except Exception as e:
        status.caption(f"An error occurred during video download: {e}")
        return None

def extract_audio_from_video(video_path, output_audio_path, status, start_time=None, end_time=None):
    try:
        # status.caption("Extracting audio from video...")
        video_clip = VideoFileClip(video_path)
        
        # If start and end times are provided, trim the video
        if start_time is not None or end_time is not None:
            video_clip = video_clip.subclipped(start_time, end_time)
        
        video_clip.audio.write_audiofile(output_audio_path)
        # status.caption(f"Audio extracted to {output_audio_path}.")
        return output_audio_path
    except Exception as e:
        status.caption(f"An error occurred during audio extraction: {e}")
        return None

def transcribe_with_whisper(audio_path, status):
    try:
        # status.caption("Transcribing audio with Whisper...")
        result = whisper_model.transcribe(audio_path)
        # status.caption("Transcription complete.")
        return result["text"]
    except Exception as e:
        status.caption(f"An error occurred during transcription: {e}")
        return None

def transcribe_video(video_url, status, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio.mp3"):
    try:
        # Step 1: Download video 
        video_path = download_video(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from video
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status)
        if not audio_path:
            return None

        # Step 3: Transcribe audio with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")

def transcribe_video_clip(video_url, status, start_time, end_time, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio_clip.mp3"):
    try:
        # Step 1: Download video 
        video_path = download_video(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from the specified clip
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status, start_time, end_time)
        if not audio_path:
            return None

        # Step 3: Transcribe the extracted audio clip with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")

これらの関数は、後で類似性検索中に取得されたビデオクリップを処理するために使用されます。



ステップ 7: 類似性検索関数の定義

Snowflake内の VECTOR_COSINE_SIMILARITY を使用して、テキストクエリと保存されたビデオ埋め込みの間の類似性スコアを計算する similarity_scores 関数を作成します。

# TODO: Replace tlk_XXXXXXXXXXXXXXXXXX with your Twelve Labs API Key
TWELVE_LABS_API_KEY ="tlk_XXXXXXXXXXXXXXXXXX"
# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

def truncate_text(text, max_tokens=77):
    # Truncate text to roughly 77 tokens (assuming ~6 chars per token on average)
    return text[:max_tokens * 6]  # Adjust based on actual tokenization behavior

def similarity_scores(search_text,results_limit=5):
    # Twelve Labs Embed API supports text-to-embedding  
    truncated_text = truncate_text(search_text, max_tokens=77)
    
    twelvelabs_response = twelvelabs_client.embed.create(
      model_name="Marengo-retrieval-2.7",
      text=truncated_text,
      text_truncate='start'
    )

    if twelvelabs_response.text_embedding is not None and twelvelabs_response.text_embedding.segments is not None:
        text_query_embeddings = twelvelabs_response.text_embedding.segments[0].embeddings_float
        return session.sql(f"""
            SELECT URL as VIDEO_URL,START_OFFSET_SEC,END_OFFSET_SEC,
            round(VECTOR_COSINE_SIMILARITY(embedding::VECTOR(FLOAT, 1024),{text_query_embeddings}::VECTOR(FLOAT, 1024)),2) as SIMILARITY_SCORE 
            from video_embeddings order by similarity_score desc limit {results_limit}""")
    else:
        return twelvelabs_response



ステップ 8: Streamlitアプリケーションの構築

以下を行うインタラクティブなStreamlitアプリを開発します。

  1. ユーザー入力を受け付ける(検索テキスト、最大結果数)。

  2. similarity_scores を使用して、一致するビデオクリップを取得する。

  3. Whisperを使用して各クリップを文字起こしする。

  4. Snowflake Cortex Completeを使用して結果を要約する。

st.subheader("Search Clips Application")

with st.container():
    with st.expander("Enter search text and select max results", expanded=True):
        left_col,mid_col,right_col = st.columns(3)
        with left_col:
            entered_text = st.text_input('Search Text')
        with mid_col:
            max_results = st.selectbox('Max Results',(1,2,3,4,5))
        with right_col:
            selected_llm = st.selectbox('Select Summary LLM',('llama3.2-3b','llama3.1-405b','mistral-large2', 'snowflake-arctic',))
        
with st.container():
    _,mid_col1,_ = st.columns([.3,.4,.2])
    with mid_col1:
        similarity_scores_btn = st.button('Search and Summarize Matching Video Clips',type="primary")

with st.container():
    if similarity_scores_btn:
        if entered_text:
            with st.status("In progress...") as status:
                df = similarity_scores(entered_text,max_results).to_pandas()
                status.subheader(f"Top {max_results} clip(s) for search query '{entered_text}'")
                for row in df.itertuples():
                    transcribed_clip = transcribe_video_clip(row.VIDEO_URL, status, row.START_OFFSET_SEC, row.END_OFFSET_SEC)
                    prompt = f"""
                    [INST] Summarize the following and include name of the video as well as start and end clip times in seconds with everything in natural language as opposed to attributes: 
                    ###
                    Video URL: {row.VIDEO_URL},
                    Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}
                    Clip Transcript: {transcribed_clip}
                    ###
                    [/INST]
                    """
                    status.write(f"Video URL: {row.VIDEO_URL}")
                    status.caption(f"-- Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}")
                    status.caption(f"-- Clip Transcript: {transcribed_clip}")
                    status.write(f"Summary: {cortex.Complete(selected_llm,prompt)}")
                    status.divider()
                status.update(label="Done!", state="complete", expanded=True)
        else:
            st.caption("User ERROR: Please enter search text!")



検索の例

「Welcome home, Buster」のようなクエリを検索すると、アプリには以下のように表示されます。

  • ビデオURL

  • クリップの開始/終了時間

  • 類似性スコア

  • 文字起こしされたテキスト

  • Cortex Completeによって生成された要約

上記に表示されている最上位の結果は、こちらのビデオのものです。白いバナーに「Welcome home, Buster」という文字がはっきりと強調表示されているのがわかります。

別の検索クエリ「A white car and a black car」(白い車と黒い車)を試して、アプリの表示を確認してみましょう。

上記に表示されている最上位の結果は、こちらのビデオのものです。フレームの中に白い車と黒い車が映っているのがはっきりと確認できます。



4 - ユースケースとメリット

上記のワークフローにより、開発者は革新的なビデオアプリケーションを構築できます。

  1. 文脈に沿ったパーソナライズ広告:TwelveLabsのマルチモーダルな埋め込みとSnowflakeのユーザーデータを組み合わせることで、ブランドセーフティが確保された、文脈に適したビデオ広告を配信できます。ビデオコンテキストを深く理解することで、安全基準を維持しながら、広告をユーザーの好みに合わせることができます。

  2. ビデオ検索エンジン:自然言語クエリをビデオ埋め込みと一致させて、正確な結果を返す高度な検索アプリケーションを作成できます。

  3. コンテンツモデレーション:視覚、音声、テキスト要素を分析することにより、不適切または機密性の高いコンテンツがないかビデオライブラリを審査します。

  4. 推奨システム:文脈理解と視聴パターンに基づくビデオ推奨を通じて、ユーザーのエンゲージメントを高めます。

  5. トレーニングデータのキュレーション:ビデオ埋め込み分析を介して、マルチモーダルなビデオデータセット向けに多様で代表的なサンプルを選択します。

TwelveLabsとSnowflakeの統合は、ビデオ理解機能の強化を通じて極めて大きなビジネス価値をもたらします。TwelveLabsのマルチモーダルな埋め込みは、視覚、音声、テキストを文脈に沿って捉えることで、ビデオコンテンツの包括的な分析を可能にします。この統合ではSnowflakeのContainer Runtimeを活用してスケーラブルなネイティブ環境を構築するため、運用上の複雑さが大幅に軽減されます。また、SnowflakeのVECTORデータ型を使用して埋め込みを保存および処理できるため、効率的な類似性検索や、そこから派生する下流のAIアプリケーションが可能になります。

開発者の観点からは、並行ビデオ処理用のSnowpark Python UDTFを介して簡素化されたワークフローが提供されるため、生産性が向上します。開発者は、低遅延でリアルタイムのセマンティック検索を実装したり、高度な要約タスクにSnowflake Cortex Completeを活用したりすることもできます。

最後に、このソリューションは、低レイテンシで埋め込みを生成するTwelveLabsの最先端のビデオネイティブモデルと、Snowflakeの堅牢なインフラを組み合わせることで、優れたパフォーマンスと信頼性を提供し、セキュアでスループットの高い処理能力を保証します。



5 - 結論

TwelveLabs Embed APIとSnowflake Cortexの統合により、開発者はビデオデータの可能性を最大限に引き出すことができます。高度なマルチモーダルビデオ理解と、SnowflakeのスケーラブルなAIデータクラウドを組み合わせることで、企業は検索、パーソナライズ、モデレーションなどのための革新的なアプリケーションを作成できるようになります。このパートナーシップは、TwelveLabsの最先端技術とSnowflakeのエンタープライズグレードのインフラとの相乗効果を示しており、開発者と組織の双方に大きな価値をもたらします。



行動への呼びかけ

ビデオデータを実用的なインテリジェンスに変換する準備はよろしいですか?

  1. 統合を試すTwelveLabs Embed API オープンベータ版にサインアップして、今すぐSnowflakeで独自のAI搭載ビデオアプリケーションの構築を始めましょう。

  2. さらなるユースケースを探索するSnowflakeクイックスタートガイドにアクセスして、ビジネスニーズに合わせた同様のワークフローを実装する方法を学んでください。

  3. コミュニティに参加する:この統合に関するフィードバックを、TwelveLabsのDiscordSnowflake開発者コミュニティフォーラムで共有してください。

このチュートリアルの基礎となる、SnowflakeとTwelveLabsを使用したAIビデオ検索に関するクイックスタートガイドを作成してくださったSnowflakeチームのDash Desai氏に深く感謝いたします。



1 - はじめに

このチュートリアルでは、Snowflake CortexTwelveLabs Embed APIを使用して、強力なビデオ検索および要約ワークフローを作成する方法を実演します。これら両製品の高度な機能を組み合わせることで、開発者はマルチモーダルなビデオ理解を実現し、セマンティック検索、検索拡張生成(RAG)、コンテンツ推奨などのアプリケーションを開発できるようになります。

クラウドストレージに保存されたビデオは、TwelveLabs Embed APIを使用して処理され、視覚、音声、テキストの情報をカプセル化したマルチモーダルな埋め込み(embeddings)が生成されます。これらの埋め込みはVECTORデータ型を利用してSnowflakeのテーブルに保存され、VECTOR_COSINE_SIMILARITYなどの関数を使用した効率的な類似性検索が可能になります。テキストクエリは埋め込みに変換され、最も関連性の高いビデオクリップが取得されます。さらに、MoviePyによる音声抽出やOpenAIのWhisperモデルを使用した文字起こしなどの追加処理が行われます。最後に、Snowflake Cortex Completeを活用して、ビデオの詳細、タイムスタンプ、文字起こしテキスト、コンテキストの洞察などの結果を要約します。

参考: Snowflake + TwelveLabs: データクラウドにおけるビデオAI

なぜSnowflake Container Runtimeを使用するのか?

Container Runtime上のSnowflake Notebooksは、Snowflake内で直接、高度な機械学習ワークフローを実行するための柔軟な環境を提供します。Snowpark Container Servicesを搭載したこのランタイムは、GPUやCPUなどの最適化されたコンピュートリソースを使用して、Pythonベースのデータサイエンスのタスクをサポートします。これにより、外部インフラの依存関係に悩まされることなく、外部ライブラリやツールをシームレスに統合し、TwelveLabs Embed APIのワークフローを実用化するのに最適です。



なぜTwelveLabs Embed APIを使用するのか?

TwelveLabs Embed APIを使用すると、開発者はビデオ用の最先端のマルチモーダルな埋め込みを作成できます。従来のテキストのみ、または画像ベースのモデルとは異なり、このAPIは視覚的な表情、話し言葉、ジェスチャーなどのモダリティ全体にわたる時空間的な関係性を捉えます。この統合されたアプローチにより、テキストからビデオへの検索、異常検知、ハイブリッドRAGワークフローなどのアプリケーションで正確なビデオ理解が可能になります。

ワークフローの主な特徴

  1. マルチモーダルな埋め込み:視覚、音声、テキストのモダリティ全体にわたってビデオのコンテキストを捉える埋め込みを生成します。

  2. 効率的な検索:SnowflakeのVECTORデータ型を使用して類似性検索を実行し、関連するビデオセグメントを高速に取得します。

  3. 音声の文字起こし:一致したクリップから音声を抽出し、Whisperを使用して文字起こしを行うことで、アクセシビリティを向上させます。

  4. 要約:Snowflake Cortex Completeを活用して、ビデオのメタデータと文字起こしを実用的な洞察に要約します。

このチュートリアルに従うことで、開発者はSnowflakeとTwelveLabsの堅牢なインフラを活用しながら、ビデオ理解のためのスケーラブルなAI搭載アプリケーションを構築できます。



2 - 前提条件

開始する前に、以下が準備されていることを確認してください(詳細はこちらのクイックスタートセットアップを参照)。

  1. Snowflakeアカウント

    • Container Runtimeが有効化されたSnowflakeアカウントへのアクセス。この機能により、Pythonベースの機械学習ワークフローをSnowflake内で直接実行できます。

    • Snowsightでユーザーロールを DASH_CONTAINER_RUNTIME_ROLE に切り替えます。

  2. TwelveLabsのAPIキー

    • TwelveLabsのアカウントを登録し、APIキーを取得します。

  3. Python環境

    • Snowpark Pythonおよび必要なライブラリ(httpxpydanticmoviepy など)の知識。

  4. サンプルビデオデータ

    • テスト用の公開アクセス可能なビデオURLのリスト。このガイドでは3つのサンプルビデオが提供されています。

  5. ノートブックの設定

  6. 環境設定

    • TwelveLabs APIにアクセスするために、Snowflakeに外部アクセス統合とシークレットが設定されていることを確認します。



3 - ステップバイステップガイド



ステップ 1: 必要なライブラリのインストール

ノートブックの最初のセルで、TwelveLabs、Whisper、MoviePyを含むすべての必要なPythonパッケージをインストールします。

!pip install twelvelabs
!pip install git+https://github.com/openai/whisper.git ffmpeg-python moviepy
!DEBIAN_FRONTEND=noninteractive apt-get install -y ffmpeg

これにより、埋め込み生成、音声処理、文字起こしのタスクに必要なすべての依存関係が利用可能になります。



ステップ 2: ライブラリのインポート

2番目のセルで、インストールしたライブラリを環境にインポートします。これには、埋め込み生成用のTwelveLabs、Snowflakeと対話するためのSnowpark、およびフロントエンドアプリケーション用のStreamlitが含まれます。

from twelvelabs import TwelveLabs
from snowflake.snowpark.context import get_active_session
import streamlit as st
import pandas as pd
import snowflake.cortex as cortex

session = get_active_session()

これにより、Snowflakeセッションが初期化され、埋め込みの作成と処理に必要なツールがセットアップされます。



ステップ 3: ビデオURLの提示

3番目のセルで、処理する公開アクセス可能なビデオURLのリストを定義します。これらのビデオは、埋め込みを生成するための入力として機能します。

video_urls = ['http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerJoyrides.mp4',
              'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerMeltdowns.mp4',
              'https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/WeAreGoingOnBullrun.mp4']



ステップ 4: 埋め込み生成用のSnowpark UDTFを作成する

次に、TwelveLabs Embed APIを使用して埋め込みを生成するために、Snowparkでユーザー定義テーブル関数(UDTF)create_video_embeddingsを定義および登録します。

このステップの主要な構成要素は次のとおりです。

  • 環境に twelvelabs.zip パッケージを追加する。

  • 安全なAPIアクセスのための外部アクセス統合とシークレットを指定する。

  • 埋め込み用にVECTORデータ型を持つ出力スキーマを定義する。

from snowflake.snowpark.functions import udtf
from snowflake.snowpark.types import StructType, StructField, FloatType, StringType, VectorType

session.clear_imports()
session.add_import('@"DASH_DB"."DASH_SCHEMA"."DASH_PKGS"/twelvelabs.zip')

@udtf(name="create_video_embeddings",
     packages=['httpx','pydantic'],
     external_access_integrations=['twelvelabs_access_integration'],
     secrets={'cred': 'twelve_labs_api'},
     if_not_exists=True,
     is_permanent=True,
     stage_location='@DASH_DB.DASH_SCHEMA.DASH_UDFS',
     output_schema=StructType([
        StructField("embedding", VectorType(float,1024)),
        StructField("start_offset_sec", FloatType()),
        StructField("end_offset_sec", FloatType()),
        StructField("embedding_scope", StringType())
    ])
    )

class create_video_embeddings:
    def __init__(self):
        from twelvelabs import TwelveLabs
        from twelvelabs.models.embed import EmbeddingsTask
        import _snowflake
        
        twelve_labs_api_key = _snowflake.get_generic_secret_string('cred') 
        twelvelabs_client = TwelveLabs(api_key=twelve_labs_api_key)
        self.twelvelabs_client = twelvelabs_client

    def process(self, video_url: str) -> Iterable[Tuple[list, float, float, str]]:
        # Create an embeddings task
        task = self.twelvelabs_client.embed.task.create(
            model_name="Marengo-retrieval-2.7",
            video_url=video_url
        )
        
        # Wait for the task to complete
        status = task.wait_for_done(sleep_interval=60)

        # Retrieve and process embeddings
        task = task.retrieve()
        if task.video_embedding is not None and task.video_embedding.segments is not None:
            for segment in task.video_embedding.segments:
                yield (
                    segment.embeddings_float,  # Embedding (list of floats)
                    segment.start_offset_sec,  # Start offset in seconds
                    segment.end_offset_sec,    # End offset in seconds
                    segment.embedding_scope,   # Embedding scope
                )



ステップ 5: 埋め込みを生成してSnowflakeに保存する

SnowparkのDataFrameを使用してビデオURLのリストを並行して処理し、生成された埋め込みを video_embeddings というSnowflakeテーブルに保存します。

df = session.create_dataframe(video_urls, schema=['url'])
df = df.join_table_function(create_video_embeddings(df['url']).over(partition_by="url"))
df.write.mode('overwrite').save_as_table('video_embeddings')
df = session.table('video_embeddings')



ステップ 6: ビデオクリップの文字起こし

OpenAIのWhisperモデルをダウンロードし、ビデオのダウンロード、音声クリップの抽出、およびそれらの文字起こしを行うヘルパー関数を定義します。

import urllib.request
import os
from moviepy import VideoFileClip
import whisper
import warnings
import logging

# Suppress Whisper warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# Set MoviePy logger level to ERROR or CRITICAL to suppress INFO logs
logging.getLogger("moviepy").setLevel(logging.ERROR)

# Load the Whisper model once
whisper_model = whisper.load_model("base")  # or any other model you want to use, e.g., 'small', 'medium', 'large'

def download_video(video_url, output_video_path, status):
    try:
        # status.caption("Downloading video file...")
        urllib.request.urlretrieve(video_url, output_video_path)
        # status.caption(f"Video downloaded to {output_video_path}.")
        return output_video_path
    except Exception as e:
        status.caption(f"An error occurred during video download: {e}")
        return None

def extract_audio_from_video(video_path, output_audio_path, status, start_time=None, end_time=None):
    try:
        # status.caption("Extracting audio from video...")
        video_clip = VideoFileClip(video_path)
        
        # If start and end times are provided, trim the video
        if start_time is not None or end_time is not None:
            video_clip = video_clip.subclipped(start_time, end_time)
        
        video_clip.audio.write_audiofile(output_audio_path)
        # status.caption(f"Audio extracted to {output_audio_path}.")
        return output_audio_path
    except Exception as e:
        status.caption(f"An error occurred during audio extraction: {e}")
        return None

def transcribe_with_whisper(audio_path, status):
    try:
        # status.caption("Transcribing audio with Whisper...")
        result = whisper_model.transcribe(audio_path)
        # status.caption("Transcription complete.")
        return result["text"]
    except Exception as e:
        status.caption(f"An error occurred during transcription: {e}")
        return None

def transcribe_video(video_url, status, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio.mp3"):
    try:
        # Step 1: Download video 
        video_path = download_video(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from video
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status)
        if not audio_path:
            return None

        # Step 3: Transcribe audio with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")

def transcribe_video_clip(video_url, status, start_time, end_time, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio_clip.mp3"):
    try:
        # Step 1: Download video 
        video_path = download_video(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from the specified clip
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status, start_time, end_time)
        if not audio_path:
            return None

        # Step 3: Transcribe the extracted audio clip with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")

これらの関数は、後で類似性検索中に取得されたビデオクリップを処理するために使用されます。



ステップ 7: 類似性検索関数の定義

Snowflake内の VECTOR_COSINE_SIMILARITY を使用して、テキストクエリと保存されたビデオ埋め込みの間の類似性スコアを計算する similarity_scores 関数を作成します。

# TODO: Replace tlk_XXXXXXXXXXXXXXXXXX with your Twelve Labs API Key
TWELVE_LABS_API_KEY ="tlk_XXXXXXXXXXXXXXXXXX"
# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

def truncate_text(text, max_tokens=77):
    # Truncate text to roughly 77 tokens (assuming ~6 chars per token on average)
    return text[:max_tokens * 6]  # Adjust based on actual tokenization behavior

def similarity_scores(search_text,results_limit=5):
    # Twelve Labs Embed API supports text-to-embedding  
    truncated_text = truncate_text(search_text, max_tokens=77)
    
    twelvelabs_response = twelvelabs_client.embed.create(
      model_name="Marengo-retrieval-2.7",
      text=truncated_text,
      text_truncate='start'
    )

    if twelvelabs_response.text_embedding is not None and twelvelabs_response.text_embedding.segments is not None:
        text_query_embeddings = twelvelabs_response.text_embedding.segments[0].embeddings_float
        return session.sql(f"""
            SELECT URL as VIDEO_URL,START_OFFSET_SEC,END_OFFSET_SEC,
            round(VECTOR_COSINE_SIMILARITY(embedding::VECTOR(FLOAT, 1024),{text_query_embeddings}::VECTOR(FLOAT, 1024)),2) as SIMILARITY_SCORE 
            from video_embeddings order by similarity_score desc limit {results_limit}""")
    else:
        return twelvelabs_response



ステップ 8: Streamlitアプリケーションの構築

以下を行うインタラクティブなStreamlitアプリを開発します。

  1. ユーザー入力を受け付ける(検索テキスト、最大結果数)。

  2. similarity_scores を使用して、一致するビデオクリップを取得する。

  3. Whisperを使用して各クリップを文字起こしする。

  4. Snowflake Cortex Completeを使用して結果を要約する。

st.subheader("Search Clips Application")

with st.container():
    with st.expander("Enter search text and select max results", expanded=True):
        left_col,mid_col,right_col = st.columns(3)
        with left_col:
            entered_text = st.text_input('Search Text')
        with mid_col:
            max_results = st.selectbox('Max Results',(1,2,3,4,5))
        with right_col:
            selected_llm = st.selectbox('Select Summary LLM',('llama3.2-3b','llama3.1-405b','mistral-large2', 'snowflake-arctic',))
        
with st.container():
    _,mid_col1,_ = st.columns([.3,.4,.2])
    with mid_col1:
        similarity_scores_btn = st.button('Search and Summarize Matching Video Clips',type="primary")

with st.container():
    if similarity_scores_btn:
        if entered_text:
            with st.status("In progress...") as status:
                df = similarity_scores(entered_text,max_results).to_pandas()
                status.subheader(f"Top {max_results} clip(s) for search query '{entered_text}'")
                for row in df.itertuples():
                    transcribed_clip = transcribe_video_clip(row.VIDEO_URL, status, row.START_OFFSET_SEC, row.END_OFFSET_SEC)
                    prompt = f"""
                    [INST] Summarize the following and include name of the video as well as start and end clip times in seconds with everything in natural language as opposed to attributes: 
                    ###
                    Video URL: {row.VIDEO_URL},
                    Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}
                    Clip Transcript: {transcribed_clip}
                    ###
                    [/INST]
                    """
                    status.write(f"Video URL: {row.VIDEO_URL}")
                    status.caption(f"-- Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}")
                    status.caption(f"-- Clip Transcript: {transcribed_clip}")
                    status.write(f"Summary: {cortex.Complete(selected_llm,prompt)}")
                    status.divider()
                status.update(label="Done!", state="complete", expanded=True)
        else:
            st.caption("User ERROR: Please enter search text!")



検索の例

「Welcome home, Buster」のようなクエリを検索すると、アプリには以下のように表示されます。

  • ビデオURL

  • クリップの開始/終了時間

  • 類似性スコア

  • 文字起こしされたテキスト

  • Cortex Completeによって生成された要約

上記に表示されている最上位の結果は、こちらのビデオのものです。白いバナーに「Welcome home, Buster」という文字がはっきりと強調表示されているのがわかります。

別の検索クエリ「A white car and a black car」(白い車と黒い車)を試して、アプリの表示を確認してみましょう。

上記に表示されている最上位の結果は、こちらのビデオのものです。フレームの中に白い車と黒い車が映っているのがはっきりと確認できます。



4 - ユースケースとメリット

上記のワークフローにより、開発者は革新的なビデオアプリケーションを構築できます。

  1. 文脈に沿ったパーソナライズ広告:TwelveLabsのマルチモーダルな埋め込みとSnowflakeのユーザーデータを組み合わせることで、ブランドセーフティが確保された、文脈に適したビデオ広告を配信できます。ビデオコンテキストを深く理解することで、安全基準を維持しながら、広告をユーザーの好みに合わせることができます。

  2. ビデオ検索エンジン:自然言語クエリをビデオ埋め込みと一致させて、正確な結果を返す高度な検索アプリケーションを作成できます。

  3. コンテンツモデレーション:視覚、音声、テキスト要素を分析することにより、不適切または機密性の高いコンテンツがないかビデオライブラリを審査します。

  4. 推奨システム:文脈理解と視聴パターンに基づくビデオ推奨を通じて、ユーザーのエンゲージメントを高めます。

  5. トレーニングデータのキュレーション:ビデオ埋め込み分析を介して、マルチモーダルなビデオデータセット向けに多様で代表的なサンプルを選択します。

TwelveLabsとSnowflakeの統合は、ビデオ理解機能の強化を通じて極めて大きなビジネス価値をもたらします。TwelveLabsのマルチモーダルな埋め込みは、視覚、音声、テキストを文脈に沿って捉えることで、ビデオコンテンツの包括的な分析を可能にします。この統合ではSnowflakeのContainer Runtimeを活用してスケーラブルなネイティブ環境を構築するため、運用上の複雑さが大幅に軽減されます。また、SnowflakeのVECTORデータ型を使用して埋め込みを保存および処理できるため、効率的な類似性検索や、そこから派生する下流のAIアプリケーションが可能になります。

開発者の観点からは、並行ビデオ処理用のSnowpark Python UDTFを介して簡素化されたワークフローが提供されるため、生産性が向上します。開発者は、低遅延でリアルタイムのセマンティック検索を実装したり、高度な要約タスクにSnowflake Cortex Completeを活用したりすることもできます。

最後に、このソリューションは、低レイテンシで埋め込みを生成するTwelveLabsの最先端のビデオネイティブモデルと、Snowflakeの堅牢なインフラを組み合わせることで、優れたパフォーマンスと信頼性を提供し、セキュアでスループットの高い処理能力を保証します。



5 - 結論

TwelveLabs Embed APIとSnowflake Cortexの統合により、開発者はビデオデータの可能性を最大限に引き出すことができます。高度なマルチモーダルビデオ理解と、SnowflakeのスケーラブルなAIデータクラウドを組み合わせることで、企業は検索、パーソナライズ、モデレーションなどのための革新的なアプリケーションを作成できるようになります。このパートナーシップは、TwelveLabsの最先端技術とSnowflakeのエンタープライズグレードのインフラとの相乗効果を示しており、開発者と組織の双方に大きな価値をもたらします。



行動への呼びかけ

ビデオデータを実用的なインテリジェンスに変換する準備はよろしいですか?

  1. 統合を試すTwelveLabs Embed API オープンベータ版にサインアップして、今すぐSnowflakeで独自のAI搭載ビデオアプリケーションの構築を始めましょう。

  2. さらなるユースケースを探索するSnowflakeクイックスタートガイドにアクセスして、ビジネスニーズに合わせた同様のワークフローを実装する方法を学んでください。

  3. コミュニティに参加する:この統合に関するフィードバックを、TwelveLabsのDiscordSnowflake開発者コミュニティフォーラムで共有してください。

このチュートリアルの基礎となる、SnowflakeとTwelveLabsを使用したAIビデオ検索に関するクイックスタートガイドを作成してくださったSnowflakeチームのDash Desai氏に深く感謝いたします。



1 - はじめに

このチュートリアルでは、Snowflake CortexTwelveLabs Embed APIを使用して、強力なビデオ検索および要約ワークフローを作成する方法を実演します。これら両製品の高度な機能を組み合わせることで、開発者はマルチモーダルなビデオ理解を実現し、セマンティック検索、検索拡張生成(RAG)、コンテンツ推奨などのアプリケーションを開発できるようになります。

クラウドストレージに保存されたビデオは、TwelveLabs Embed APIを使用して処理され、視覚、音声、テキストの情報をカプセル化したマルチモーダルな埋め込み(embeddings)が生成されます。これらの埋め込みはVECTORデータ型を利用してSnowflakeのテーブルに保存され、VECTOR_COSINE_SIMILARITYなどの関数を使用した効率的な類似性検索が可能になります。テキストクエリは埋め込みに変換され、最も関連性の高いビデオクリップが取得されます。さらに、MoviePyによる音声抽出やOpenAIのWhisperモデルを使用した文字起こしなどの追加処理が行われます。最後に、Snowflake Cortex Completeを活用して、ビデオの詳細、タイムスタンプ、文字起こしテキスト、コンテキストの洞察などの結果を要約します。

参考: Snowflake + TwelveLabs: データクラウドにおけるビデオAI

なぜSnowflake Container Runtimeを使用するのか?

Container Runtime上のSnowflake Notebooksは、Snowflake内で直接、高度な機械学習ワークフローを実行するための柔軟な環境を提供します。Snowpark Container Servicesを搭載したこのランタイムは、GPUやCPUなどの最適化されたコンピュートリソースを使用して、Pythonベースのデータサイエンスのタスクをサポートします。これにより、外部インフラの依存関係に悩まされることなく、外部ライブラリやツールをシームレスに統合し、TwelveLabs Embed APIのワークフローを実用化するのに最適です。



なぜTwelveLabs Embed APIを使用するのか?

TwelveLabs Embed APIを使用すると、開発者はビデオ用の最先端のマルチモーダルな埋め込みを作成できます。従来のテキストのみ、または画像ベースのモデルとは異なり、このAPIは視覚的な表情、話し言葉、ジェスチャーなどのモダリティ全体にわたる時空間的な関係性を捉えます。この統合されたアプローチにより、テキストからビデオへの検索、異常検知、ハイブリッドRAGワークフローなどのアプリケーションで正確なビデオ理解が可能になります。

ワークフローの主な特徴

  1. マルチモーダルな埋め込み:視覚、音声、テキストのモダリティ全体にわたってビデオのコンテキストを捉える埋め込みを生成します。

  2. 効率的な検索:SnowflakeのVECTORデータ型を使用して類似性検索を実行し、関連するビデオセグメントを高速に取得します。

  3. 音声の文字起こし:一致したクリップから音声を抽出し、Whisperを使用して文字起こしを行うことで、アクセシビリティを向上させます。

  4. 要約:Snowflake Cortex Completeを活用して、ビデオのメタデータと文字起こしを実用的な洞察に要約します。

このチュートリアルに従うことで、開発者はSnowflakeとTwelveLabsの堅牢なインフラを活用しながら、ビデオ理解のためのスケーラブルなAI搭載アプリケーションを構築できます。



2 - 前提条件

開始する前に、以下が準備されていることを確認してください(詳細はこちらのクイックスタートセットアップを参照)。

  1. Snowflakeアカウント

    • Container Runtimeが有効化されたSnowflakeアカウントへのアクセス。この機能により、Pythonベースの機械学習ワークフローをSnowflake内で直接実行できます。

    • Snowsightでユーザーロールを DASH_CONTAINER_RUNTIME_ROLE に切り替えます。

  2. TwelveLabsのAPIキー

    • TwelveLabsのアカウントを登録し、APIキーを取得します。

  3. Python環境

    • Snowpark Pythonおよび必要なライブラリ(httpxpydanticmoviepy など)の知識。

  4. サンプルビデオデータ

    • テスト用の公開アクセス可能なビデオURLのリスト。このガイドでは3つのサンプルビデオが提供されています。

  5. ノートブックの設定

  6. 環境設定

    • TwelveLabs APIにアクセスするために、Snowflakeに外部アクセス統合とシークレットが設定されていることを確認します。



3 - ステップバイステップガイド



ステップ 1: 必要なライブラリのインストール

ノートブックの最初のセルで、TwelveLabs、Whisper、MoviePyを含むすべての必要なPythonパッケージをインストールします。

!pip install twelvelabs
!pip install git+https://github.com/openai/whisper.git ffmpeg-python moviepy
!DEBIAN_FRONTEND=noninteractive apt-get install -y ffmpeg

これにより、埋め込み生成、音声処理、文字起こしのタスクに必要なすべての依存関係が利用可能になります。



ステップ 2: ライブラリのインポート

2番目のセルで、インストールしたライブラリを環境にインポートします。これには、埋め込み生成用のTwelveLabs、Snowflakeと対話するためのSnowpark、およびフロントエンドアプリケーション用のStreamlitが含まれます。

from twelvelabs import TwelveLabs
from snowflake.snowpark.context import get_active_session
import streamlit as st
import pandas as pd
import snowflake.cortex as cortex

session = get_active_session()

これにより、Snowflakeセッションが初期化され、埋め込みの作成と処理に必要なツールがセットアップされます。



ステップ 3: ビデオURLの提示

3番目のセルで、処理する公開アクセス可能なビデオURLのリストを定義します。これらのビデオは、埋め込みを生成するための入力として機能します。

video_urls = ['http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerJoyrides.mp4',
              'http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerMeltdowns.mp4',
              'https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/WeAreGoingOnBullrun.mp4']



ステップ 4: 埋め込み生成用のSnowpark UDTFを作成する

次に、TwelveLabs Embed APIを使用して埋め込みを生成するために、Snowparkでユーザー定義テーブル関数(UDTF)create_video_embeddingsを定義および登録します。

このステップの主要な構成要素は次のとおりです。

  • 環境に twelvelabs.zip パッケージを追加する。

  • 安全なAPIアクセスのための外部アクセス統合とシークレットを指定する。

  • 埋め込み用にVECTORデータ型を持つ出力スキーマを定義する。

from snowflake.snowpark.functions import udtf
from snowflake.snowpark.types import StructType, StructField, FloatType, StringType, VectorType

session.clear_imports()
session.add_import('@"DASH_DB"."DASH_SCHEMA"."DASH_PKGS"/twelvelabs.zip')

@udtf(name="create_video_embeddings",
     packages=['httpx','pydantic'],
     external_access_integrations=['twelvelabs_access_integration'],
     secrets={'cred': 'twelve_labs_api'},
     if_not_exists=True,
     is_permanent=True,
     stage_location='@DASH_DB.DASH_SCHEMA.DASH_UDFS',
     output_schema=StructType([
        StructField("embedding", VectorType(float,1024)),
        StructField("start_offset_sec", FloatType()),
        StructField("end_offset_sec", FloatType()),
        StructField("embedding_scope", StringType())
    ])
    )

class create_video_embeddings:
    def __init__(self):
        from twelvelabs import TwelveLabs
        from twelvelabs.models.embed import EmbeddingsTask
        import _snowflake
        
        twelve_labs_api_key = _snowflake.get_generic_secret_string('cred') 
        twelvelabs_client = TwelveLabs(api_key=twelve_labs_api_key)
        self.twelvelabs_client = twelvelabs_client

    def process(self, video_url: str) -> Iterable[Tuple[list, float, float, str]]:
        # Create an embeddings task
        task = self.twelvelabs_client.embed.task.create(
            model_name="Marengo-retrieval-2.7",
            video_url=video_url
        )
        
        # Wait for the task to complete
        status = task.wait_for_done(sleep_interval=60)

        # Retrieve and process embeddings
        task = task.retrieve()
        if task.video_embedding is not None and task.video_embedding.segments is not None:
            for segment in task.video_embedding.segments:
                yield (
                    segment.embeddings_float,  # Embedding (list of floats)
                    segment.start_offset_sec,  # Start offset in seconds
                    segment.end_offset_sec,    # End offset in seconds
                    segment.embedding_scope,   # Embedding scope
                )



ステップ 5: 埋め込みを生成してSnowflakeに保存する

SnowparkのDataFrameを使用してビデオURLのリストを並行して処理し、生成された埋め込みを video_embeddings というSnowflakeテーブルに保存します。

df = session.create_dataframe(video_urls, schema=['url'])
df = df.join_table_function(create_video_embeddings(df['url']).over(partition_by="url"))
df.write.mode('overwrite').save_as_table('video_embeddings')
df = session.table('video_embeddings')



ステップ 6: ビデオクリップの文字起こし

OpenAIのWhisperモデルをダウンロードし、ビデオのダウンロード、音声クリップの抽出、およびそれらの文字起こしを行うヘルパー関数を定義します。

import urllib.request
import os
from moviepy import VideoFileClip
import whisper
import warnings
import logging

# Suppress Whisper warnings
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)

# Set MoviePy logger level to ERROR or CRITICAL to suppress INFO logs
logging.getLogger("moviepy").setLevel(logging.ERROR)

# Load the Whisper model once
whisper_model = whisper.load_model("base")  # or any other model you want to use, e.g., 'small', 'medium', 'large'

def download_video(video_url, output_video_path, status):
    try:
        # status.caption("Downloading video file...")
        urllib.request.urlretrieve(video_url, output_video_path)
        # status.caption(f"Video downloaded to {output_video_path}.")
        return output_video_path
    except Exception as e:
        status.caption(f"An error occurred during video download: {e}")
        return None

def extract_audio_from_video(video_path, output_audio_path, status, start_time=None, end_time=None):
    try:
        # status.caption("Extracting audio from video...")
        video_clip = VideoFileClip(video_path)
        
        # If start and end times are provided, trim the video
        if start_time is not None or end_time is not None:
            video_clip = video_clip.subclipped(start_time, end_time)
        
        video_clip.audio.write_audiofile(output_audio_path)
        # status.caption(f"Audio extracted to {output_audio_path}.")
        return output_audio_path
    except Exception as e:
        status.caption(f"An error occurred during audio extraction: {e}")
        return None

def transcribe_with_whisper(audio_path, status):
    try:
        # status.caption("Transcribing audio with Whisper...")
        result = whisper_model.transcribe(audio_path)
        # status.caption("Transcription complete.")
        return result["text"]
    except Exception as e:
        status.caption(f"An error occurred during transcription: {e}")
        return None

def transcribe_video(video_url, status, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio.mp3"):
    try:
        # Step 1: Download video 
        video_path = download_video(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from video
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status)
        if not audio_path:
            return None

        # Step 3: Transcribe audio with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")

def transcribe_video_clip(video_url, status, start_time, end_time, temp_video_path="temp_video.mp4", temp_audio_path="temp_audio_clip.mp3"):
    try:
        # Step 1: Download video 
        video_path = download_video(video_url, temp_video_path, status)
        if not video_path:
            return None

        # Step 2: Extract audio from the specified clip
        audio_path = extract_audio_from_video(video_path, temp_audio_path, status, start_time, end_time)
        if not audio_path:
            return None

        # Step 3: Transcribe the extracted audio clip with Whisper
        transcription = transcribe_with_whisper(audio_path, status)
        return transcription
    finally:
        # Clean up temporary files
        if os.path.exists(temp_video_path):
            os.remove(temp_video_path)
            # status.caption("Temporary video file removed.")
        if os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
            # status.caption("Temporary audio file removed.")

これらの関数は、後で類似性検索中に取得されたビデオクリップを処理するために使用されます。



ステップ 7: 類似性検索関数の定義

Snowflake内の VECTOR_COSINE_SIMILARITY を使用して、テキストクエリと保存されたビデオ埋め込みの間の類似性スコアを計算する similarity_scores 関数を作成します。

# TODO: Replace tlk_XXXXXXXXXXXXXXXXXX with your Twelve Labs API Key
TWELVE_LABS_API_KEY ="tlk_XXXXXXXXXXXXXXXXXX"
# Initialize the Twelve Labs client
twelvelabs_client = TwelveLabs(api_key=TWELVE_LABS_API_KEY)

def truncate_text(text, max_tokens=77):
    # Truncate text to roughly 77 tokens (assuming ~6 chars per token on average)
    return text[:max_tokens * 6]  # Adjust based on actual tokenization behavior

def similarity_scores(search_text,results_limit=5):
    # Twelve Labs Embed API supports text-to-embedding  
    truncated_text = truncate_text(search_text, max_tokens=77)
    
    twelvelabs_response = twelvelabs_client.embed.create(
      model_name="Marengo-retrieval-2.7",
      text=truncated_text,
      text_truncate='start'
    )

    if twelvelabs_response.text_embedding is not None and twelvelabs_response.text_embedding.segments is not None:
        text_query_embeddings = twelvelabs_response.text_embedding.segments[0].embeddings_float
        return session.sql(f"""
            SELECT URL as VIDEO_URL,START_OFFSET_SEC,END_OFFSET_SEC,
            round(VECTOR_COSINE_SIMILARITY(embedding::VECTOR(FLOAT, 1024),{text_query_embeddings}::VECTOR(FLOAT, 1024)),2) as SIMILARITY_SCORE 
            from video_embeddings order by similarity_score desc limit {results_limit}""")
    else:
        return twelvelabs_response



ステップ 8: Streamlitアプリケーションの構築

以下を行うインタラクティブなStreamlitアプリを開発します。

  1. ユーザー入力を受け付ける(検索テキスト、最大結果数)。

  2. similarity_scores を使用して、一致するビデオクリップを取得する。

  3. Whisperを使用して各クリップを文字起こしする。

  4. Snowflake Cortex Completeを使用して結果を要約する。

st.subheader("Search Clips Application")

with st.container():
    with st.expander("Enter search text and select max results", expanded=True):
        left_col,mid_col,right_col = st.columns(3)
        with left_col:
            entered_text = st.text_input('Search Text')
        with mid_col:
            max_results = st.selectbox('Max Results',(1,2,3,4,5))
        with right_col:
            selected_llm = st.selectbox('Select Summary LLM',('llama3.2-3b','llama3.1-405b','mistral-large2', 'snowflake-arctic',))
        
with st.container():
    _,mid_col1,_ = st.columns([.3,.4,.2])
    with mid_col1:
        similarity_scores_btn = st.button('Search and Summarize Matching Video Clips',type="primary")

with st.container():
    if similarity_scores_btn:
        if entered_text:
            with st.status("In progress...") as status:
                df = similarity_scores(entered_text,max_results).to_pandas()
                status.subheader(f"Top {max_results} clip(s) for search query '{entered_text}'")
                for row in df.itertuples():
                    transcribed_clip = transcribe_video_clip(row.VIDEO_URL, status, row.START_OFFSET_SEC, row.END_OFFSET_SEC)
                    prompt = f"""
                    [INST] Summarize the following and include name of the video as well as start and end clip times in seconds with everything in natural language as opposed to attributes: 
                    ###
                    Video URL: {row.VIDEO_URL},
                    Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}
                    Clip Transcript: {transcribed_clip}
                    ###
                    [/INST]
                    """
                    status.write(f"Video URL: {row.VIDEO_URL}")
                    status.caption(f"-- Clip Start: {row.START_OFFSET_SEC} | Clip End: {row.END_OFFSET_SEC} | Similarity Score: {row.SIMILARITY_SCORE}")
                    status.caption(f"-- Clip Transcript: {transcribed_clip}")
                    status.write(f"Summary: {cortex.Complete(selected_llm,prompt)}")
                    status.divider()
                status.update(label="Done!", state="complete", expanded=True)
        else:
            st.caption("User ERROR: Please enter search text!")



検索の例

「Welcome home, Buster」のようなクエリを検索すると、アプリには以下のように表示されます。

  • ビデオURL

  • クリップの開始/終了時間

  • 類似性スコア

  • 文字起こしされたテキスト

  • Cortex Completeによって生成された要約

上記に表示されている最上位の結果は、こちらのビデオのものです。白いバナーに「Welcome home, Buster」という文字がはっきりと強調表示されているのがわかります。

別の検索クエリ「A white car and a black car」(白い車と黒い車)を試して、アプリの表示を確認してみましょう。

上記に表示されている最上位の結果は、こちらのビデオのものです。フレームの中に白い車と黒い車が映っているのがはっきりと確認できます。



4 - ユースケースとメリット

上記のワークフローにより、開発者は革新的なビデオアプリケーションを構築できます。

  1. 文脈に沿ったパーソナライズ広告:TwelveLabsのマルチモーダルな埋め込みとSnowflakeのユーザーデータを組み合わせることで、ブランドセーフティが確保された、文脈に適したビデオ広告を配信できます。ビデオコンテキストを深く理解することで、安全基準を維持しながら、広告をユーザーの好みに合わせることができます。

  2. ビデオ検索エンジン:自然言語クエリをビデオ埋め込みと一致させて、正確な結果を返す高度な検索アプリケーションを作成できます。

  3. コンテンツモデレーション:視覚、音声、テキスト要素を分析することにより、不適切または機密性の高いコンテンツがないかビデオライブラリを審査します。

  4. 推奨システム:文脈理解と視聴パターンに基づくビデオ推奨を通じて、ユーザーのエンゲージメントを高めます。

  5. トレーニングデータのキュレーション:ビデオ埋め込み分析を介して、マルチモーダルなビデオデータセット向けに多様で代表的なサンプルを選択します。

TwelveLabsとSnowflakeの統合は、ビデオ理解機能の強化を通じて極めて大きなビジネス価値をもたらします。TwelveLabsのマルチモーダルな埋め込みは、視覚、音声、テキストを文脈に沿って捉えることで、ビデオコンテンツの包括的な分析を可能にします。この統合ではSnowflakeのContainer Runtimeを活用してスケーラブルなネイティブ環境を構築するため、運用上の複雑さが大幅に軽減されます。また、SnowflakeのVECTORデータ型を使用して埋め込みを保存および処理できるため、効率的な類似性検索や、そこから派生する下流のAIアプリケーションが可能になります。

開発者の観点からは、並行ビデオ処理用のSnowpark Python UDTFを介して簡素化されたワークフローが提供されるため、生産性が向上します。開発者は、低遅延でリアルタイムのセマンティック検索を実装したり、高度な要約タスクにSnowflake Cortex Completeを活用したりすることもできます。

最後に、このソリューションは、低レイテンシで埋め込みを生成するTwelveLabsの最先端のビデオネイティブモデルと、Snowflakeの堅牢なインフラを組み合わせることで、優れたパフォーマンスと信頼性を提供し、セキュアでスループットの高い処理能力を保証します。



5 - 結論

TwelveLabs Embed APIとSnowflake Cortexの統合により、開発者はビデオデータの可能性を最大限に引き出すことができます。高度なマルチモーダルビデオ理解と、SnowflakeのスケーラブルなAIデータクラウドを組み合わせることで、企業は検索、パーソナライズ、モデレーションなどのための革新的なアプリケーションを作成できるようになります。このパートナーシップは、TwelveLabsの最先端技術とSnowflakeのエンタープライズグレードのインフラとの相乗効果を示しており、開発者と組織の双方に大きな価値をもたらします。



行動への呼びかけ

ビデオデータを実用的なインテリジェンスに変換する準備はよろしいですか?

  1. 統合を試すTwelveLabs Embed API オープンベータ版にサインアップして、今すぐSnowflakeで独自のAI搭載ビデオアプリケーションの構築を始めましょう。

  2. さらなるユースケースを探索するSnowflakeクイックスタートガイドにアクセスして、ビジネスニーズに合わせた同様のワークフローを実装する方法を学んでください。

  3. コミュニティに参加する:この統合に関するフィードバックを、TwelveLabsのDiscordSnowflake開発者コミュニティフォーラムで共有してください。