"""
Service layer for Cloudflare Images Toolkit.
This module contains the business logic for interacting with the
Cloudflare Images API, managing image uploads, and transformations.
"""
import json
import logging
import threading
from datetime import timedelta
from typing import Any
import requests
from django.utils import timezone
from .exceptions import (
CloudflareImagesError,
)
from .models import CloudflareImage, ImageUploadLog, ImageUploadStatus
from .settings import cloudflare_settings
logger = logging.getLogger(__name__)
[docs]
class CloudflareImagesService:
"""Service class for Cloudflare Images API operations."""
[docs]
def __init__(self):
# Each thread gets its own Session so there is no shared mutable state
# (e.g. cookies, adapters) between concurrent callers.
self._local: threading.local = threading.local()
@property
def account_id(self) -> str:
return cloudflare_settings.account_id
@property
def api_token(self) -> str:
return cloudflare_settings.api_token
@property
def base_url(self) -> str:
return cloudflare_settings.base_url
@property
def session(self) -> requests.Session:
# Return the Session for the current thread, creating it on first use.
# Using threading.local() means each thread has its own independent
# Session so concurrent API calls cannot share cookies or other mutable
# session state. Auth headers are passed per-request via _auth_headers()
# so override_settings changes are reflected immediately.
if not hasattr(self._local, "session"):
self._local.session = requests.Session()
return self._local.session
def _auth_headers(self) -> dict[str, str]:
"""Return per-request Authorization headers using the current API token.
Reading the token on each call keeps the header in sync with
override_settings changes and avoids mutating shared session state.
"""
return {"Authorization": f"Bearer {self.api_token}"}
[docs]
def get_direct_upload_url(
self,
user=None,
custom_id: str | None = None,
metadata: dict[str, Any] | None = None,
require_signed_urls: bool | None = None,
expiry_minutes: int | None = None,
) -> dict[str, str]:
"""
Get a one-time upload URL for direct creator upload.
This is an alias for create_direct_upload_url that returns a dict
to match the documentation examples.
"""
image = self.create_direct_upload_url(
user=user,
custom_id=custom_id,
metadata=metadata,
require_signed_urls=require_signed_urls,
expiry_minutes=expiry_minutes,
)
return {"id": image.cloudflare_id, "uploadURL": image.upload_url}
[docs]
def create_direct_upload_url(
self,
user=None,
custom_id: str | None = None,
metadata: dict[str, Any] | None = None,
require_signed_urls: bool | None = None,
expiry_minutes: int | None = None,
) -> CloudflareImage:
"""
Create a one-time upload URL for direct creator upload.
Args:
user: Django user instance (optional)
custom_id: Custom ID for the image (optional)
metadata: Additional metadata to store with the image
require_signed_urls: Whether to require signed URLs
expiry_minutes: Minutes until the upload URL expires
Returns:
CloudflareImage instance with upload URL
Raises:
CloudflareImagesError: If the API request fails
"""
if require_signed_urls is None:
require_signed_urls = cloudflare_settings.require_signed_urls
if expiry_minutes is None:
expiry_minutes = cloudflare_settings.default_expiry_minutes
if metadata is None:
metadata = {}
# Calculate expiry time (must be 2 min to 6 hours in the future per API docs)
expiry_minutes = max(2, min(expiry_minutes, 360))
expires_at = timezone.now() + timedelta(minutes=expiry_minutes)
# Prepare request data
form_data = {
"requireSignedURLs": str(require_signed_urls).lower(),
"metadata": json.dumps(metadata),
"expiry": expires_at.isoformat(),
}
if custom_id:
form_data["id"] = custom_id
# Make API request
url = f"{self.base_url}/accounts/{self.account_id}/images/v2/direct_upload"
try:
# This endpoint requires multipart/form-data. Using (None, value) tuples
# encodes each field as a plain form field (no filename) so the request
# matches Cloudflare's expected -F key=value semantics.
files = {k: (None, v) for k, v in form_data.items()}
response = self.session.post(url, files=files, headers=self._auth_headers())
response.raise_for_status()
data = response.json()
if not data.get("success"):
error_msg = ", ".join(
[
err.get("message", "Unknown error")
for err in data.get("errors", [])
]
)
raise CloudflareImagesError(f"Cloudflare API error: {error_msg}")
result = data["result"]
# Create CloudflareImage record
image = CloudflareImage.objects.create(
cloudflare_id=result["id"],
user=user,
upload_url=result["uploadURL"],
status=ImageUploadStatus.PENDING,
require_signed_urls=require_signed_urls,
metadata=metadata,
expires_at=expires_at,
)
# Log the creation
ImageUploadLog.objects.create(
image=image,
event_type="upload_url_created",
message="Direct upload URL created successfully",
data={"response": result},
)
logger.info(f"Created direct upload URL for image {image.cloudflare_id}")
return image
except requests.RequestException as e:
logger.error(f"Failed to create direct upload URL: {str(e)}")
raise CloudflareImagesError(f"Failed to create upload URL: {str(e)}") from e
[docs]
def check_image_status(self, image: CloudflareImage) -> dict[str, Any]:
"""
Check the status of an image upload.
Args:
image: CloudflareImage instance
Returns:
Dictionary containing the image status data
Raises:
CloudflareImagesError: If the API request fails
"""
url = f"{self.base_url}/accounts/{self.account_id}/images/v1/{image.cloudflare_id}"
try:
response = self.session.get(url, headers=self._auth_headers())
response.raise_for_status()
data = response.json()
if not data.get("success"):
error_msg = ", ".join(
[
err.get("message", "Unknown error")
for err in data.get("errors", [])
]
)
raise CloudflareImagesError(f"Cloudflare API error: {error_msg}")
result = data["result"]
# Update the image record
image.update_from_cloudflare_response(result)
# Log the status check
ImageUploadLog.objects.create(
image=image,
event_type="status_checked",
message=f"Image status checked: {image.status}",
data={"response": result},
)
logger.info(
f"Checked status for image {image.cloudflare_id}: {image.status}"
)
return result
except requests.RequestException as e:
logger.error(
f"Failed to check image status for {image.cloudflare_id}: {str(e)}"
)
raise CloudflareImagesError(
f"Failed to check image status: {str(e)}"
) from e
[docs]
def list_images(self, page: int = 1, per_page: int = 1000) -> dict[str, Any]:
"""
List images from Cloudflare Images.
Args:
page: Page number for pagination (default: 1)
per_page: Number of images per page (default: 1000, max: 10000)
Returns:
Dictionary with pagination info and list of images
Raises:
CloudflareImagesError: If the API request fails
"""
url = f"{self.base_url}/accounts/{self.account_id}/images/v1"
params = {
"page": page,
"per_page": min(per_page, 10000), # Cloudflare max is 10000
}
try:
response = self.session.get(
url, params=params, headers=self._auth_headers()
)
response.raise_for_status()
data = response.json()
if not data.get("success"):
error_msg = ", ".join(
[
err.get("message", "Unknown error")
for err in data.get("errors", [])
]
)
raise CloudflareImagesError(f"Cloudflare API error: {error_msg}")
logger.info(f"Listed images: page {page}, per_page {per_page}")
return data
except requests.RequestException as e:
logger.error(f"Failed to list images: {str(e)}")
raise CloudflareImagesError(f"Failed to list images: {str(e)}") from e
[docs]
def get_image(self, image_id: str) -> dict[str, Any]:
"""
Get details for a specific image.
Args:
image_id: Cloudflare image ID
Returns:
Dictionary with image details
Raises:
CloudflareImagesError: If the API request fails
"""
url = f"{self.base_url}/accounts/{self.account_id}/images/v1/{image_id}"
try:
response = self.session.get(url, headers=self._auth_headers())
response.raise_for_status()
data = response.json()
if not data.get("success"):
error_msg = ", ".join(
[
err.get("message", "Unknown error")
for err in data.get("errors", [])
]
)
raise CloudflareImagesError(f"Cloudflare API error: {error_msg}")
logger.info(f"Retrieved image details for {image_id}")
return data
except requests.RequestException as e:
logger.error(f"Failed to get image {image_id}: {str(e)}")
raise CloudflareImagesError(f"Failed to get image: {str(e)}") from e
[docs]
def update_image(
self,
image_id: str,
metadata: dict[str, Any] | None = None,
require_signed_urls: bool | None = None,
) -> dict[str, Any]:
"""
Update image metadata and settings.
Args:
image_id: Cloudflare image ID
metadata: New metadata for the image
require_signed_urls: Whether to require signed URLs
Returns:
Dictionary with updated image details
Raises:
CloudflareImagesError: If the API request fails
"""
url = f"{self.base_url}/accounts/{self.account_id}/images/v1/{image_id}"
update_data = {}
if metadata is not None:
update_data["metadata"] = metadata
if require_signed_urls is not None:
update_data["requireSignedURLs"] = require_signed_urls
try:
response = self.session.patch(
url, json=update_data, headers=self._auth_headers()
)
response.raise_for_status()
data = response.json()
if not data.get("success"):
error_msg = ", ".join(
[
err.get("message", "Unknown error")
for err in data.get("errors", [])
]
)
raise CloudflareImagesError(f"Cloudflare API error: {error_msg}")
# Update local CloudflareImage if it exists
try:
image = CloudflareImage.objects.get(cloudflare_id=image_id)
if metadata is not None:
image.metadata.update(metadata)
if require_signed_urls is not None:
image.require_signed_urls = require_signed_urls
image.save()
except CloudflareImage.DoesNotExist:
pass
logger.info(f"Updated image {image_id}")
return data
except requests.RequestException as e:
logger.error(f"Failed to update image {image_id}: {str(e)}")
raise CloudflareImagesError(f"Failed to update image: {str(e)}") from e
[docs]
def delete_image(self, image: CloudflareImage) -> bool:
"""
Delete an image from Cloudflare Images.
Args:
image: CloudflareImage instance
Returns:
True if deletion was successful
Raises:
CloudflareImagesError: If the API request fails
"""
url = f"{self.base_url}/accounts/{self.account_id}/images/v1/{image.cloudflare_id}"
try:
response = self.session.delete(url, headers=self._auth_headers())
response.raise_for_status()
data = response.json()
if not data.get("success"):
error_msg = ", ".join(
[
err.get("message", "Unknown error")
for err in data.get("errors", [])
]
)
raise CloudflareImagesError(f"Cloudflare API error: {error_msg}")
# Log the deletion
ImageUploadLog.objects.create(
image=image,
event_type="image_deleted",
message="Image deleted from Cloudflare",
data={"response": data},
)
logger.info(f"Deleted image {image.cloudflare_id}")
return True
except requests.RequestException as e:
logger.error(f"Failed to delete image {image.cloudflare_id}: {str(e)}")
raise CloudflareImagesError(f"Failed to delete image: {str(e)}") from e
[docs]
def validate_webhook_signature(self, payload: bytes, signature: str) -> bool:
"""
Validate webhook signature from Cloudflare.
Args:
payload: Raw webhook payload
signature: Signature from webhook headers (should be in format 'sha256=...')
Returns:
True if signature is valid
"""
if not cloudflare_settings.webhook_secret:
logger.warning(
"Webhook secret not configured, skipping signature validation"
)
return True
import hashlib
import hmac
# Remove 'sha256=' prefix if present
if signature.startswith("sha256="):
signature = signature[7:]
expected_signature = hmac.new(
cloudflare_settings.webhook_secret.encode(), payload, hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
[docs]
def process_webhook(self, payload: dict[str, Any]) -> CloudflareImage | None:
"""
Process webhook payload from Cloudflare.
Args:
payload: Webhook payload data
Returns:
Updated CloudflareImage instance if found
"""
try:
image_id = payload.get("id")
if not image_id:
logger.warning("Webhook payload missing image ID")
return None
try:
image = CloudflareImage.objects.get(cloudflare_id=image_id)
except CloudflareImage.DoesNotExist:
logger.warning(f"Received webhook for unknown image: {image_id}")
return None
# Update image from webhook data
image.update_from_cloudflare_response(payload)
# Log the webhook
ImageUploadLog.objects.create(
image=image,
event_type="webhook_received",
message="Webhook processed successfully",
data={"payload": payload},
)
logger.info(f"Processed webhook for image {image.cloudflare_id}")
return image
except Exception as e:
logger.error(f"Failed to process webhook: {str(e)}")
return None
# Global service instance
cloudflare_service = CloudflareImagesService()