Skip to content

Media Processing Guide

Understanding Media Processing

Media processing is the foundation of the Wickson API, transforming your documents, images, videos, audio files, and 3D models into searchable vector embeddings. This guide covers both individual and batch processing workflows, helping you efficiently manage your media library.

Key Concepts

Vector Embeddings

When you process media, the API converts content into vector embeddings - numerical representations that capture semantic meaning. These embeddings enable:

  • Semantic Search: Find content based on meaning, not just keywords
  • Content Relations: Discover connections between related media
  • Cross-Modal Search: Link content across different media types

Processing Pipeline

Every file undergoes a multi-stage pipeline:

  1. File Validation: Format and size verification
  2. Content Extraction: Text, visual elements, audio transcript extraction
  3. Analysis: AI-powered semantic understanding
  4. Vector Generation: Creation of embedding vectors
  5. Metadata Extraction: Topics, entities, quality metrics
  6. Storage: Indexing for efficient retrieval

Data Storage

What happens to my original files after I upload them to the Wickson API?

Your original files are used only for the processing step and are not stored permanently on Wickson API servers.
Once the vector embeddings and metadata (descriptions, entities, summaries, and other extracted information) are generated, the original file data is securely deleted.

What data is stored and searchable by the Wickson API?

  • Vector Embeddings: Numerical representations of your content's meaning.
  • Metadata: Information extracted from your files, such as:
    • Descriptions
    • Identified Entities (people, places, organizations)
    • Content Summaries
    • Other relevant data extracted during processing

Single vs. Batch Processing

  • Single Processing: Upload and process one file at a time
  • Batch Processing: Submit multiple files in a single operation for asynchronous processing

The same underlying technology powers both methods, with batch processing optimized for larger volumes.

Working with Media Processing

Individual File Processing

import requests

def process_media_file(file_path, collection_id=None, force_overwrite=False):
    url = "https://api.wickson.ai/v1/media"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    data = {}
    if collection_id:
        data["collection_id"] = collection_id
    if force_overwrite:
        data["force_overwrite"] = "true"

    # Use context manager to ensure file is properly closed
    with open(file_path, "rb") as file_obj:
        files = {"file": file_obj}
        response = requests.post(url, headers=headers, data=data, files=files)

    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Process a document
result = process_media_file("quarterly_report.pdf", collection_id="financial_reports")

if result:
    # Get the media ID for future reference
    media_id = result["data"]["media_id"]
    print(f"Processed file with ID: {media_id}")

    # Display content summary if available
    if "content_summary" in result["data"]:
        print(f"Content summary: {result['data']['content_summary']['summary']}")

    # Display cost information
    balance_info = result["data"]["balance_info"]
    print(f"Cost: ${balance_info['cost']} (New balance: ${balance_info['new_balance']})")

Batch Processing Workflow

import requests
import time

def submit_batch(file_paths, collection_id=None, force_overwrite=False):
    url = "https://api.wickson.ai/v1/batches"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    data = {}
    if collection_id:
        data["collection_id"] = collection_id
    if force_overwrite:
        data["force_overwrite"] = "true"

    # Prepare files using context managers
    files = []
    file_objects = []  # Keep track of file objects to close them

    try:
        for path in file_paths:
            file_obj = open(path, "rb")
            file_objects.append(file_obj)
            files.append(('file', (path.split('/')[-1], file_obj)))

        response = requests.post(url, headers=headers, data=data, files=files)

        if response.status_code == 200:
            return response.json()["data"]
        else:
            print(f"Error {response.status_code}: {response.text}")
            return None
    finally:
        # Close all opened files
        for file_obj in file_objects:
            file_obj.close()

def check_batch_status(batch_id):
    url = f"https://api.wickson.ai/v1/batches/{batch_id}"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    response = requests.get(
        url, 
        headers=headers,
        params={"include_job_details": True}  # Get detailed job information if it is needed
    )

    if response.status_code == 200:
        return response.json()["data"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

def get_batch_results(batch_id):
    url = f"https://api.wickson.ai/v1/batches/{batch_id}/results"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()["data"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Complete workflow example
try:
    # Step 1: Submit a batch
    files = ["report1.pdf", "report2.pdf", "image1.jpg", "video1.mp4"]
    batch = submit_batch(files, collection_id="mixed_media")

    if not batch:
        print("Batch submission failed")
        exit(1)

    batch_id = batch["batch_id"]
    print(f"Submitted batch: {batch_id}")
    print(f"Estimated completion: {batch['estimated_completion']}")
    print(f"Estimated cost: ${batch['cost_estimate']['total']}")

    # Step 2: Poll for completion
    while True:
        status = check_batch_status(batch_id)

        if not status:
            print("Failed to get batch status")
            break

        progress = status["progress"]
        print(f"Status: {status['status']} - {progress['percentage']:.1f}% complete")
        print(f"Completed: {progress['completed']}/{progress['total']}, Failed: {progress['failed']}")

        if status["status"] in ["completed", "failed", "cancelled"]:
            break

        time.sleep(10)

    # Step 3: Get results when complete
    if status and status["status"] == "completed":
        results = get_batch_results(batch_id)

        if results:
            print(f"\nBatch processing complete:")
            print(f"Successfully processed {results['batch_summary']['successful']} of {results['batch_summary']['total_items']} items")

            # Process results
            for item in results["results"]:
                if item["status"] == "completed":
                    print(f"\nItem: {item['file_info']['filename']}")
                    print(f"Media ID: {item['media_id']}")
                    print(f"Media Type: {item['media_type']}")

                    # Display summary if available in the metadata
                    if "metadata" in item and "search" in item["metadata"]:
                        print(f"Summary: {item['metadata']['search']['summary']}")

                    print("-" * 40)

            # Handle failures if any
            if results["batch_summary"]["failed"] > 0:
                print("\nFailed items:")
                for item in results["results"]:
                    if item["status"] == "failed":
                        print(f"File: {item.get('file_path', 'Unknown')}")
                        if "error" in item:
                            print(f"Error: {item['error'].get('message', 'Unknown error')}")
                        print("-" * 40)
except Exception as e:
    print(f"Error in batch processing workflow: {str(e)}")

Listing Processed Media

import requests

def list_media(collection=None, media_type=None, page=1, page_size=20):
    url = "https://api.wickson.ai/v1/media"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    params = {
        "page": page,
        "page_size": page_size
    }

    if collection:
        params["collection"] = collection
    if media_type:
        params["media_type"] = media_type

    response = requests.get(url, headers=headers, params=params)

    if response.status_code == 200:
        return response.json()["data"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# List documents in a specific collection
documents = list_media(collection="financial_reports", media_type="document")

if documents:
    pagination = documents["pagination"]
    print(f"Found {pagination['total_items']} documents (page {pagination['page']} of {pagination['total_pages']})")

    for item in documents["items"]:
        print(f"\nID: {item['id']}")
        print(f"Media Type: {item['media_type']}")

        if "file_info" in item:
            print(f"Filename: {item['file_info']['filename']}")

        if "storage" in item and "created_at" in item["storage"]:
            print(f"Created: {item['storage']['created_at']}")

        if "collection" in item:
            print(f"Collection: {item['collection']}")

        # Display summary if available
        if "content_summary" in item and "summary" in item["content_summary"]:
            summary = item["content_summary"]["summary"]
            print(f"Summary: {summary[:100]}..." if len(summary) > 100 else f"Summary: {summary}")

        print("-" * 40)

Retrieving Media Details

import requests

def get_media_details(media_id, include_vectors=False, include_chunks=False):
    url = f"https://api.wickson.ai/v1/media/{media_id}"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    params = {
        "include_vectors": include_vectors,
        "include_chunks": include_chunks,
        "include_metadata": True
    }

    response = requests.get(url, headers=headers, params=params)

    if response.status_code == 200:
        return response.json()["data"]
    elif response.status_code == 404:
        print(f"Media item {media_id} not found")
        return None
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Get detailed information with content chunks
details = get_media_details("vec-abc123", include_chunks=True)

if details:
    print(f"Media ID: {details['id']}")
    print(f"Media Type: {details['media_type']}")

    if "collection" in details:
        print(f"Collection: {details['collection']}")

    if "file_info" in details:
        file_info = details["file_info"]
        print(f"Filename: {file_info.get('filename', 'Unknown')}")
        print(f"Size: {file_info.get('size_bytes', 0)} bytes")

        # Display media-specific information
        if details["media_type"] == "document" and "page_count" in file_info:
            print(f"Pages: {file_info['page_count']}")
        elif details["media_type"] in ["video", "audio"] and "duration" in file_info:
            print(f"Duration: {file_info['duration']} seconds")
        elif details["media_type"] == "image" and "dimensions" in file_info:
            print(f"Dimensions: {file_info['dimensions']}")

    # Display summary from metadata
    if "metadata" in details and "search" in details["metadata"]:
        search_metadata = details["metadata"]["search"]
        if "summary" in search_metadata:
            print(f"\nSummary: {search_metadata['summary']}")

        # Display entities if available
        if "entities" in search_metadata:
            entities = search_metadata["entities"]
            print("\nEntities:")
            for entity_type, items in entities.items():
                if items:
                    print(f"  {entity_type.capitalize()}: {', '.join(items)}")

    # Display content chunks if available
    if "content" in details and "chunks" in details["content"]:
        print("\nContent Chunks:")
        for i, chunk in enumerate(details["content"]["chunks"]):
            print(f"Chunk {i+1}: {chunk[:100]}..." if len(chunk) > 100 else f"Chunk {i+1}: {chunk}")

Deleting Media

import requests

def delete_media(media_id):
    url = f"https://api.wickson.ai/v1/media/{media_id}"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    response = requests.delete(url, headers=headers)

    if response.status_code == 200:
        return response.json()["data"]
    elif response.status_code == 404:
        print(f"Media item {media_id} not found")
        return None
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Delete a media item
result = delete_media("vec-abc123")

if result:
    print(f"Media ID: {result['media_id']}")
    print(f"Deletion status: {result['deletion_status']}")
    print(f"Successful: {result['successful']}")
    print(f"Deletion time: {result.get('deletion_time', 'Not provided')}")

Organizing Media with Collections

Creating and Using Collections

Collections are created implicitly when you upload media with a collection_id parameter:

# Create a new collection by uploading media to it
with open("product.jpg", "rb") as file:
    response = requests.post(
        "https://api.wickson.ai/v1/media",
        headers={"X-Api-Key": "YOUR_API_KEY"},
        files={"file": file},
        data={"collection_id": "product_catalog"}
    )

    if response.status_code == 200:
        result = response.json()["data"]
        print(f"Created collection 'product_catalog' with first item: {result['media_id']}")

Managing Collections

import requests

# List all collections
def list_collections():
    url = "https://api.wickson.ai/v1/media/collections"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()["data"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Get collection details
def get_collection_details(collection_id):
    url = f"https://api.wickson.ai/v1/media/collections/{collection_id}"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()["data"]
    elif response.status_code == 404:
        print(f"Collection '{collection_id}' not found")
        return None
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Move items to a collection
def move_items_to_collection(collection_id, media_ids):
    url = f"https://api.wickson.ai/v1/media/collections/{collection_id}/items"
    headers = {
        "X-Api-Key": "YOUR_API_KEY",
        "Content-Type": "application/json"
    }

    response = requests.post(
        url,
        headers=headers,
        json={"items": media_ids}
    )

    if response.status_code == 200:
        return response.json()["data"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Delete a collection
def delete_collection(collection_id):
    url = f"https://api.wickson.ai/v1/media/collections/{collection_id}"
    headers = {"X-Api-Key": "YOUR_API_KEY"}

    response = requests.delete(url, headers=headers)

    if response.status_code == 200:
        return response.json()["data"]
    elif response.status_code == 404:
        print(f"Collection '{collection_id}' not found")
        return None
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Usage examples
collections_data = list_collections()

if collections_data:
    collections = collections_data.get("collections", [])
    print(f"Available collections: {', '.join(collections)}")

    # Get details for a specific collection
    if collections:
        collection_to_check = collections[0]
        details = get_collection_details(collection_to_check)

        if details:
            stats = details.get("stats", {})
            print(f"\nCollection '{collection_to_check}' details:")
            print(f"Item count: {stats.get('item_count', 0)}")

            if "media_type_distribution" in stats:
                print("Media types:")
                for media_type, count in stats["media_type_distribution"].items():
                    print(f"  {media_type}: {count}")

# Move items to a collection
media_ids = ["vec-abc123", "vec-def456"]
collection_id = "archive"
move_result = move_items_to_collection(collection_id, media_ids)

if move_result:
    summary = move_result.get("summary", {})
    print(f"\nMoved {summary.get('moved', 0)} items to '{collection_id}' collection")
    print(f"Already in collection: {summary.get('already_in_collection', 0)}")

    # Show media type distribution if available
    if "items_by_type" in move_result:
        print("Item types:")
        for media_type, count in move_result["items_by_type"].items():
            print(f"  {media_type}: {count}")

# Delete a collection (warning: this deletes all media in the collection!)
collection_to_delete = "old_collection"
delete_result = delete_collection(collection_to_delete)

if delete_result:
    print(f"\nDeleted collection '{collection_to_delete}' with {delete_result.get('deleted_items', 0)} items")

    # Show detailed vector deletion information if available
    if "storage_info" in delete_result and "deleted_vectors" in delete_result["storage_info"]:
        vectors = delete_result["storage_info"]["deleted_vectors"]
        print(f"Deleted vectors: {vectors.get('hd', 0)} HD, {vectors.get('ld', 0)} LD")

Batch Deletion of Media Items

import requests

def batch_delete_media(media_ids, reason=None):
    url = "https://api.wickson.ai/v1/media/batch"
    headers = {
        "X-Api-Key": "YOUR_API_KEY",
        "Content-Type": "application/json"
    }

    data = {"media_ids": media_ids}
    if reason:
        data["reason"] = reason

    response = requests.delete(url, headers=headers, json=data)

    if response.status_code == 200:
        return response.json()["data"]
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

# Batch delete multiple media items
media_ids = ["vec-123abc", "vec-456def", "vec-789ghi"]
result = batch_delete_media(media_ids, reason="outdated_content")

if result:
    deleted = result.get("deleted", {})
    failed = result.get("failed", {})

    print(f"Deletion results:")
    print(f"Successfully deleted: {deleted.get('count', 0)} items")
    print(f"Failed: {failed.get('count', 0)} items")

    # Show reasons for failures if any
    if failed.get("count", 0) > 0 and "reasons" in failed:
        print("\nFailure reasons:")
        for media_id, reason in failed["reasons"].items():
            print(f"  {media_id}: {reason}")

Best Practices

Optimizing File Preparation

Clean Source Files: Remove unnecessary content, noise, or artifacts

Appropriate Formats:

  • Documents: PDF for consistent rendering, or plain text for simplicity
  • Images: PNG or JPG with appropriate resolution (higher for detailed analysis)
  • Video: MP4 with good audio quality for better transcription
  • Audio: MP3 or WAV with clear audio and minimal background noise
  • 3D Models: GLB, 3MF, or OBJ with optimized meshes

File Size Optimization:

  • Documents: Compress PDFs, remove unnecessary embedded resources
  • Images: Optimize resolution and compression for content type
  • Video: Compress to appropriate bitrate for content
  • Audio: Use appropriate bitrate for content type
  • 3D Models: Reduce polygon count to necessary detail level

Collection Organization Strategies

  1. By Project: Group related content in project-specific collections
  2. By Media Type: Separate collections for documents, images, videos
  3. By Topic Domain: Organize by subject matter
  4. By Timeline: Organize by time period (quarterly, annual)
  5. By Workflow Status: Draft, review, approved, archived

Processing Optimization

  1. Batch Similar Items: Process similar media types together
  2. Prioritize Critical Content: Process most important items first
  3. Use Force Overwrite Carefully: Only use when you need to replace existing content
  4. Monitor Processing Status: Regularly check batch status for large operations
  5. Handle Errors Appropriately: Have strategies for retrying failed items

Understanding Media Analysis Results

Document Processing

Documents generate rich metadata including:

  • Content summary and description
  • Topics and keywords
  • Named entities (people, organizations, locations)
  • Document structure (sections, headings)
  • Quality metrics (clarity, completeness)

Image Processing

Images generate visual understanding data:

  • Scene descriptions
  • Object detection
  • Visual attributes
  • Composition analysis
  • Text extraction (if present)

Video Processing

Videos produce temporal understanding:

  • Transcription of spoken content
  • Scene descriptions
  • Key frame analysis
  • Timeline understanding
  • Speaker identification

Audio Processing

Audio files yield:

  • Transcription of spoken content
  • Speaker identification
  • Non-speech audio understanding
  • Tone and sentiment analysis
  • Temporal structure

3D Model Processing

3D models generate spatial understanding:

  • Component identification
  • Structural analysis
  • Material properties
  • Spatial relationships
  • Mesh quality assessment

Cost Considerations

Operation Cost Notes
Media Processing $0.03 per file For vector generation
Database I/O $0.01 per operation One-time storage fee
Total per file $0.04 No recurring charges

There are no ongoing storage costs. You only pay once when processing media.

Troubleshooting

Issue Solutions
File validation failure
  • Verify file format is supported
  • Check file is within size limits
  • Ensure file is not corrupted
Duplicate file error
  • Use force_overwrite=true to replace existing file
  • Use a different file if replacement not needed
Batch processing errors
  • Check individual file errors in batch results
  • Ensure consistent file formats
  • Verify sufficient account balance
Missing or poor metadata
  • Check file quality
  • Ensure content is clear and well-structured
  • For scanned documents, verify OCR quality
Collection management issues
  • Check collection naming for consistency
  • Verify permissions for collection operations
  • Ensure media IDs exist when moving items
HTTP 400 errors
  • Verify request parameters are formatted correctly
  • Check file format compatibility
  • Ensure file size is within limits
HTTP 401/403 errors
  • Verify API key is valid
  • Check that your account has sufficient permissions
HTTP 404 errors
  • Verify media IDs and collection names exist
  • Check for typos in resource identifiers
HTTP 409 errors
  • Handle duplicate file conflicts
  • Use force_overwrite when appropriate
HTTP 429 errors
  • Implement request throttling
  • Add exponential backoff retry logic
This site uses cookies to help us improve the overall documentation and browsing experience. By continuing to use this site, you agree to our Privacy Policy.