Media Processing Guide¶
Understanding Media Processing¶
Media processing is the foundation of the Wickson API, transforming your documents, images, videos, audio files, and 3D models into searchable vector embeddings. This guide covers both individual and batch processing workflows, helping you efficiently manage your media library.
Key Concepts¶
Vector Embeddings¶
When you process media, the API converts content into vector embeddings - numerical representations that capture semantic meaning. These embeddings enable:
- Semantic Search: Find content based on meaning, not just keywords
- Content Relations: Discover connections between related media
- Cross-Modal Search: Link content across different media types
Processing Pipeline¶
Every file undergoes a multi-stage pipeline:
- File Validation: Format and size verification
- Content Extraction: Text, visual elements, audio transcript extraction
- Analysis: AI-powered semantic understanding
- Vector Generation: Creation of embedding vectors
- Metadata Extraction: Topics, entities, quality metrics
- Storage: Indexing for efficient retrieval
Data Storage¶
What happens to my original files after I upload them to the Wickson API?
Your original files are used only for the processing step and are not stored permanently on Wickson API servers.
Once the vector embeddings and metadata (descriptions, entities, summaries, and other extracted information) are generated, the original file data is securely deleted.
What data is stored and searchable by the Wickson API?
- Vector Embeddings: Numerical representations of your content's meaning.
- Metadata: Information extracted from your files, such as:
- Descriptions
- Identified Entities (people, places, organizations)
- Content Summaries
- Other relevant data extracted during processing
Single vs. Batch Processing¶
- Single Processing: Upload and process one file at a time
- Batch Processing: Submit multiple files in a single operation for asynchronous processing
The same underlying technology powers both methods, with batch processing optimized for larger volumes.
Working with Media Processing¶
Individual File Processing¶
import requests
def process_media_file(file_path, collection_id=None, force_overwrite=False):
url = "https://api.wickson.ai/v1/media"
headers = {"X-Api-Key": "YOUR_API_KEY"}
data = {}
if collection_id:
data["collection_id"] = collection_id
if force_overwrite:
data["force_overwrite"] = "true"
# Use context manager to ensure file is properly closed
with open(file_path, "rb") as file_obj:
files = {"file": file_obj}
response = requests.post(url, headers=headers, data=data, files=files)
if response.status_code == 200:
return response.json()
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Process a document
result = process_media_file("quarterly_report.pdf", collection_id="financial_reports")
if result:
# Get the media ID for future reference
media_id = result["data"]["media_id"]
print(f"Processed file with ID: {media_id}")
# Display content summary if available
if "content_summary" in result["data"]:
print(f"Content summary: {result['data']['content_summary']['summary']}")
# Display cost information
balance_info = result["data"]["balance_info"]
print(f"Cost: ${balance_info['cost']} (New balance: ${balance_info['new_balance']})")
Batch Processing Workflow¶
import requests
import time
def submit_batch(file_paths, collection_id=None, force_overwrite=False):
url = "https://api.wickson.ai/v1/batches"
headers = {"X-Api-Key": "YOUR_API_KEY"}
data = {}
if collection_id:
data["collection_id"] = collection_id
if force_overwrite:
data["force_overwrite"] = "true"
# Prepare files using context managers
files = []
file_objects = [] # Keep track of file objects to close them
try:
for path in file_paths:
file_obj = open(path, "rb")
file_objects.append(file_obj)
files.append(('file', (path.split('/')[-1], file_obj)))
response = requests.post(url, headers=headers, data=data, files=files)
if response.status_code == 200:
return response.json()["data"]
else:
print(f"Error {response.status_code}: {response.text}")
return None
finally:
# Close all opened files
for file_obj in file_objects:
file_obj.close()
def check_batch_status(batch_id):
url = f"https://api.wickson.ai/v1/batches/{batch_id}"
headers = {"X-Api-Key": "YOUR_API_KEY"}
response = requests.get(
url,
headers=headers,
params={"include_job_details": True} # Get detailed job information if it is needed
)
if response.status_code == 200:
return response.json()["data"]
else:
print(f"Error {response.status_code}: {response.text}")
return None
def get_batch_results(batch_id):
url = f"https://api.wickson.ai/v1/batches/{batch_id}/results"
headers = {"X-Api-Key": "YOUR_API_KEY"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()["data"]
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Complete workflow example
try:
# Step 1: Submit a batch
files = ["report1.pdf", "report2.pdf", "image1.jpg", "video1.mp4"]
batch = submit_batch(files, collection_id="mixed_media")
if not batch:
print("Batch submission failed")
exit(1)
batch_id = batch["batch_id"]
print(f"Submitted batch: {batch_id}")
print(f"Estimated completion: {batch['estimated_completion']}")
print(f"Estimated cost: ${batch['cost_estimate']['total']}")
# Step 2: Poll for completion
while True:
status = check_batch_status(batch_id)
if not status:
print("Failed to get batch status")
break
progress = status["progress"]
print(f"Status: {status['status']} - {progress['percentage']:.1f}% complete")
print(f"Completed: {progress['completed']}/{progress['total']}, Failed: {progress['failed']}")
if status["status"] in ["completed", "failed", "cancelled"]:
break
time.sleep(10)
# Step 3: Get results when complete
if status and status["status"] == "completed":
results = get_batch_results(batch_id)
if results:
print(f"\nBatch processing complete:")
print(f"Successfully processed {results['batch_summary']['successful']} of {results['batch_summary']['total_items']} items")
# Process results
for item in results["results"]:
if item["status"] == "completed":
print(f"\nItem: {item['file_info']['filename']}")
print(f"Media ID: {item['media_id']}")
print(f"Media Type: {item['media_type']}")
# Display summary if available in the metadata
if "metadata" in item and "search" in item["metadata"]:
print(f"Summary: {item['metadata']['search']['summary']}")
print("-" * 40)
# Handle failures if any
if results["batch_summary"]["failed"] > 0:
print("\nFailed items:")
for item in results["results"]:
if item["status"] == "failed":
print(f"File: {item.get('file_path', 'Unknown')}")
if "error" in item:
print(f"Error: {item['error'].get('message', 'Unknown error')}")
print("-" * 40)
except Exception as e:
print(f"Error in batch processing workflow: {str(e)}")
Listing Processed Media¶
import requests
def list_media(collection=None, media_type=None, page=1, page_size=20):
url = "https://api.wickson.ai/v1/media"
headers = {"X-Api-Key": "YOUR_API_KEY"}
params = {
"page": page,
"page_size": page_size
}
if collection:
params["collection"] = collection
if media_type:
params["media_type"] = media_type
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
return response.json()["data"]
else:
print(f"Error {response.status_code}: {response.text}")
return None
# List documents in a specific collection
documents = list_media(collection="financial_reports", media_type="document")
if documents:
pagination = documents["pagination"]
print(f"Found {pagination['total_items']} documents (page {pagination['page']} of {pagination['total_pages']})")
for item in documents["items"]:
print(f"\nID: {item['id']}")
print(f"Media Type: {item['media_type']}")
if "file_info" in item:
print(f"Filename: {item['file_info']['filename']}")
if "storage" in item and "created_at" in item["storage"]:
print(f"Created: {item['storage']['created_at']}")
if "collection" in item:
print(f"Collection: {item['collection']}")
# Display summary if available
if "content_summary" in item and "summary" in item["content_summary"]:
summary = item["content_summary"]["summary"]
print(f"Summary: {summary[:100]}..." if len(summary) > 100 else f"Summary: {summary}")
print("-" * 40)
Retrieving Media Details¶
import requests
def get_media_details(media_id, include_vectors=False, include_chunks=False):
url = f"https://api.wickson.ai/v1/media/{media_id}"
headers = {"X-Api-Key": "YOUR_API_KEY"}
params = {
"include_vectors": include_vectors,
"include_chunks": include_chunks,
"include_metadata": True
}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
return response.json()["data"]
elif response.status_code == 404:
print(f"Media item {media_id} not found")
return None
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Get detailed information with content chunks
details = get_media_details("vec-abc123", include_chunks=True)
if details:
print(f"Media ID: {details['id']}")
print(f"Media Type: {details['media_type']}")
if "collection" in details:
print(f"Collection: {details['collection']}")
if "file_info" in details:
file_info = details["file_info"]
print(f"Filename: {file_info.get('filename', 'Unknown')}")
print(f"Size: {file_info.get('size_bytes', 0)} bytes")
# Display media-specific information
if details["media_type"] == "document" and "page_count" in file_info:
print(f"Pages: {file_info['page_count']}")
elif details["media_type"] in ["video", "audio"] and "duration" in file_info:
print(f"Duration: {file_info['duration']} seconds")
elif details["media_type"] == "image" and "dimensions" in file_info:
print(f"Dimensions: {file_info['dimensions']}")
# Display summary from metadata
if "metadata" in details and "search" in details["metadata"]:
search_metadata = details["metadata"]["search"]
if "summary" in search_metadata:
print(f"\nSummary: {search_metadata['summary']}")
# Display entities if available
if "entities" in search_metadata:
entities = search_metadata["entities"]
print("\nEntities:")
for entity_type, items in entities.items():
if items:
print(f" {entity_type.capitalize()}: {', '.join(items)}")
# Display content chunks if available
if "content" in details and "chunks" in details["content"]:
print("\nContent Chunks:")
for i, chunk in enumerate(details["content"]["chunks"]):
print(f"Chunk {i+1}: {chunk[:100]}..." if len(chunk) > 100 else f"Chunk {i+1}: {chunk}")
Deleting Media¶
import requests
def delete_media(media_id):
url = f"https://api.wickson.ai/v1/media/{media_id}"
headers = {"X-Api-Key": "YOUR_API_KEY"}
response = requests.delete(url, headers=headers)
if response.status_code == 200:
return response.json()["data"]
elif response.status_code == 404:
print(f"Media item {media_id} not found")
return None
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Delete a media item
result = delete_media("vec-abc123")
if result:
print(f"Media ID: {result['media_id']}")
print(f"Deletion status: {result['deletion_status']}")
print(f"Successful: {result['successful']}")
print(f"Deletion time: {result.get('deletion_time', 'Not provided')}")
Organizing Media with Collections¶
Creating and Using Collections¶
Collections are created implicitly when you upload media with a collection_id parameter:
# Create a new collection by uploading media to it
with open("product.jpg", "rb") as file:
response = requests.post(
"https://api.wickson.ai/v1/media",
headers={"X-Api-Key": "YOUR_API_KEY"},
files={"file": file},
data={"collection_id": "product_catalog"}
)
if response.status_code == 200:
result = response.json()["data"]
print(f"Created collection 'product_catalog' with first item: {result['media_id']}")
Managing Collections¶
import requests
# List all collections
def list_collections():
url = "https://api.wickson.ai/v1/media/collections"
headers = {"X-Api-Key": "YOUR_API_KEY"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()["data"]
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Get collection details
def get_collection_details(collection_id):
url = f"https://api.wickson.ai/v1/media/collections/{collection_id}"
headers = {"X-Api-Key": "YOUR_API_KEY"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()["data"]
elif response.status_code == 404:
print(f"Collection '{collection_id}' not found")
return None
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Move items to a collection
def move_items_to_collection(collection_id, media_ids):
url = f"https://api.wickson.ai/v1/media/collections/{collection_id}/items"
headers = {
"X-Api-Key": "YOUR_API_KEY",
"Content-Type": "application/json"
}
response = requests.post(
url,
headers=headers,
json={"items": media_ids}
)
if response.status_code == 200:
return response.json()["data"]
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Delete a collection
def delete_collection(collection_id):
url = f"https://api.wickson.ai/v1/media/collections/{collection_id}"
headers = {"X-Api-Key": "YOUR_API_KEY"}
response = requests.delete(url, headers=headers)
if response.status_code == 200:
return response.json()["data"]
elif response.status_code == 404:
print(f"Collection '{collection_id}' not found")
return None
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Usage examples
collections_data = list_collections()
if collections_data:
collections = collections_data.get("collections", [])
print(f"Available collections: {', '.join(collections)}")
# Get details for a specific collection
if collections:
collection_to_check = collections[0]
details = get_collection_details(collection_to_check)
if details:
stats = details.get("stats", {})
print(f"\nCollection '{collection_to_check}' details:")
print(f"Item count: {stats.get('item_count', 0)}")
if "media_type_distribution" in stats:
print("Media types:")
for media_type, count in stats["media_type_distribution"].items():
print(f" {media_type}: {count}")
# Move items to a collection
media_ids = ["vec-abc123", "vec-def456"]
collection_id = "archive"
move_result = move_items_to_collection(collection_id, media_ids)
if move_result:
summary = move_result.get("summary", {})
print(f"\nMoved {summary.get('moved', 0)} items to '{collection_id}' collection")
print(f"Already in collection: {summary.get('already_in_collection', 0)}")
# Show media type distribution if available
if "items_by_type" in move_result:
print("Item types:")
for media_type, count in move_result["items_by_type"].items():
print(f" {media_type}: {count}")
# Delete a collection (warning: this deletes all media in the collection!)
collection_to_delete = "old_collection"
delete_result = delete_collection(collection_to_delete)
if delete_result:
print(f"\nDeleted collection '{collection_to_delete}' with {delete_result.get('deleted_items', 0)} items")
# Show detailed vector deletion information if available
if "storage_info" in delete_result and "deleted_vectors" in delete_result["storage_info"]:
vectors = delete_result["storage_info"]["deleted_vectors"]
print(f"Deleted vectors: {vectors.get('hd', 0)} HD, {vectors.get('ld', 0)} LD")
Batch Deletion of Media Items¶
import requests
def batch_delete_media(media_ids, reason=None):
url = "https://api.wickson.ai/v1/media/batch"
headers = {
"X-Api-Key": "YOUR_API_KEY",
"Content-Type": "application/json"
}
data = {"media_ids": media_ids}
if reason:
data["reason"] = reason
response = requests.delete(url, headers=headers, json=data)
if response.status_code == 200:
return response.json()["data"]
else:
print(f"Error {response.status_code}: {response.text}")
return None
# Batch delete multiple media items
media_ids = ["vec-123abc", "vec-456def", "vec-789ghi"]
result = batch_delete_media(media_ids, reason="outdated_content")
if result:
deleted = result.get("deleted", {})
failed = result.get("failed", {})
print(f"Deletion results:")
print(f"Successfully deleted: {deleted.get('count', 0)} items")
print(f"Failed: {failed.get('count', 0)} items")
# Show reasons for failures if any
if failed.get("count", 0) > 0 and "reasons" in failed:
print("\nFailure reasons:")
for media_id, reason in failed["reasons"].items():
print(f" {media_id}: {reason}")
Best Practices¶
Optimizing File Preparation¶
Clean Source Files: Remove unnecessary content, noise, or artifacts
Appropriate Formats:
- Documents: PDF for consistent rendering, or plain text for simplicity
- Images: PNG or JPG with appropriate resolution (higher for detailed analysis)
- Video: MP4 with good audio quality for better transcription
- Audio: MP3 or WAV with clear audio and minimal background noise
- 3D Models: GLB, 3MF, or OBJ with optimized meshes
File Size Optimization:
- Documents: Compress PDFs, remove unnecessary embedded resources
- Images: Optimize resolution and compression for content type
- Video: Compress to appropriate bitrate for content
- Audio: Use appropriate bitrate for content type
- 3D Models: Reduce polygon count to necessary detail level
Collection Organization Strategies¶
- By Project: Group related content in project-specific collections
- By Media Type: Separate collections for documents, images, videos
- By Topic Domain: Organize by subject matter
- By Timeline: Organize by time period (quarterly, annual)
- By Workflow Status: Draft, review, approved, archived
Processing Optimization¶
- Batch Similar Items: Process similar media types together
- Prioritize Critical Content: Process most important items first
- Use Force Overwrite Carefully: Only use when you need to replace existing content
- Monitor Processing Status: Regularly check batch status for large operations
- Handle Errors Appropriately: Have strategies for retrying failed items
Understanding Media Analysis Results¶
Document Processing¶
Documents generate rich metadata including:
- Content summary and description
- Topics and keywords
- Named entities (people, organizations, locations)
- Document structure (sections, headings)
- Quality metrics (clarity, completeness)
Image Processing¶
Images generate visual understanding data:
- Scene descriptions
- Object detection
- Visual attributes
- Composition analysis
- Text extraction (if present)
Video Processing¶
Videos produce temporal understanding:
- Transcription of spoken content
- Scene descriptions
- Key frame analysis
- Timeline understanding
- Speaker identification
Audio Processing¶
Audio files yield:
- Transcription of spoken content
- Speaker identification
- Non-speech audio understanding
- Tone and sentiment analysis
- Temporal structure
3D Model Processing¶
3D models generate spatial understanding:
- Component identification
- Structural analysis
- Material properties
- Spatial relationships
- Mesh quality assessment
Cost Considerations¶
| Operation | Cost | Notes |
|---|---|---|
| Media Processing | $0.03 per file | For vector generation |
| Database I/O | $0.01 per operation | One-time storage fee |
| Total per file | $0.04 | No recurring charges |
There are no ongoing storage costs. You only pay once when processing media.
Troubleshooting¶
| Issue | Solutions |
|---|---|
| File validation failure |
|
| Duplicate file error |
|
| Batch processing errors |
|
| Missing or poor metadata |
|
| Collection management issues |
|
| HTTP 400 errors |
|
| HTTP 401/403 errors |
|
| HTTP 404 errors |
|
| HTTP 409 errors |
|
| HTTP 429 errors |
|