"""Cloud server router with file operations endpoints."""
import logging
from pathlib import Path
from tempfile import NamedTemporaryFile
import aiofiles
from fastapi import HTTPException, Request, UploadFile
from fastapi.responses import FileResponse
from python_template_server.constants import MB_TO_BYTES
from python_template_server.models import ResponseCode
from python_template_server.routers import BaseRouter
from cloud_server.db import FilesMetadataDatabaseManager
from cloud_server.models import (
DatabaseAction,
DeleteFileResponse,
FileMetadata,
ListFilesResponse,
StorageConfig,
UpdateFileMetadataRequest,
UpdateFileMetadataResponse,
UploadFileResponse,
)
from cloud_server.thumbnail_generator import ThumbnailGenerator
logger = logging.getLogger(__name__)
[docs]
class FilesRouter(BaseRouter):
"""Router for the cloud server file operations."""
def _raise_file_size_exceeded(self, file_size: int, max_size: int) -> None:
"""Raise HTTPException for file size exceeded.
:param int file_size: The actual file size
:param int max_size: The maximum allowed size
:raises HTTPException: Always raises with BAD_REQUEST status
"""
error_msg = f"File size ({file_size} bytes) exceeds maximum allowed size ({max_size} bytes)"
logger.error(error_msg)
raise HTTPException(status_code=ResponseCode.BAD_REQUEST, detail=error_msg)
async def _stream_upload_to_temp(
self, file: UploadFile, temp_filepath: Path, max_size_bytes: int, chunk_size: int
) -> int:
"""Stream uploaded file to temporary location while validating size.
:param UploadFile file: The file being uploaded
:param Path temp_filepath: Path to temporary file
:param int max_size_bytes: Maximum allowed file size
:param int chunk_size: Size of chunks to read
:return int: Total file size in bytes
:raises HTTPException: If file size exceeds maximum
"""
file_size = 0
async with aiofiles.open(temp_filepath, "wb") as f:
while chunk := await file.read(chunk_size):
file_size += len(chunk)
if file_size > max_size_bytes:
self._raise_file_size_exceeded(file_size, max_size_bytes)
await f.write(chunk)
return file_size
def _cleanup_temp_file(self, temp_filepath: Path | None) -> None:
"""Clean up temporary file if it exists.
:param Path | None temp_filepath: Path to temporary file or None
"""
if temp_filepath and temp_filepath.exists():
temp_filepath.unlink()
[docs]
def setup_routes(self) -> None:
"""Set up the API routes."""
self.add_route(
endpoint="/",
handler_function=self.list_files,
response_model=ListFilesResponse,
methods=["GET"],
limited=True,
authentication_required=True,
)
self.add_route(
endpoint="/",
handler_function=self.upload_file,
response_model=UploadFileResponse,
methods=["POST"],
limited=True,
authentication_required=True,
)
self.add_route(
endpoint="/{file_id}",
handler_function=self.get_file,
response_model=None,
methods=["GET"],
limited=True,
authentication_required=True,
)
self.add_route(
endpoint="/{file_id}",
handler_function=self.delete_file,
response_model=DeleteFileResponse,
methods=["DELETE"],
limited=True,
authentication_required=True,
)
self.add_route(
endpoint="/{file_id}/metadata",
handler_function=self.update_file_metadata,
response_model=UpdateFileMetadataResponse,
methods=["PATCH"],
limited=True,
authentication_required=True,
)
self.add_route(
endpoint="/{file_id}/thumbnail",
handler_function=self.get_thumbnail,
response_model=None,
methods=["GET"],
limited=True,
authentication_required=True,
)
[docs]
async def list_files(self, request: Request) -> ListFilesResponse:
"""Get metadata for all files.
:param Request request: The incoming HTTP request
:return ListFilesResponse: Files metadata response
"""
files_metadata = self._db.list_files()
if len(files_metadata) == 0:
logger.warning("No files found in database.")
return ListFilesResponse(
message="Files metadata retrieved successfully.",
files_metadata=files_metadata,
)
[docs]
async def upload_file(self, request: Request, file: UploadFile, parent_directory: str) -> UploadFileResponse:
"""Upload a file.
:param Request request: The incoming HTTP request
:param UploadFile file: The file to upload
:param str parent_directory: Parent directory path relative to storage directory
:return UploadFileResponse: File upload response
:raises HTTPException: If file validation fails or storage operation fails
"""
# Validate filename
if not file.filename:
error_msg = "Filename is required for file upload."
logger.error(error_msg)
raise HTTPException(status_code=ResponseCode.BAD_REQUEST, detail=error_msg)
# Stream file to temp location while validating size
max_size_bytes = self._storage_config.max_file_size_mb * MB_TO_BYTES
chunk_size = self._storage_config.upload_chunk_size_kb * 1024
temp_filepath = None
try:
temp_file = NamedTemporaryFile(delete=False, dir=self._storage_directory)
temp_filepath = Path(temp_file.name)
temp_file.close()
file_size = await self._stream_upload_to_temp(
file=file, temp_filepath=temp_filepath, max_size_bytes=max_size_bytes, chunk_size=chunk_size
)
except Exception as e:
self._cleanup_temp_file(temp_filepath=temp_filepath)
error_msg = f"Failed to read uploaded file: {file.filename}"
logger.exception(error_msg)
raise HTTPException(status_code=ResponseCode.INTERNAL_SERVER_ERROR, detail=error_msg) from e
# Create file metadata
file_metadata = FileMetadata(
id=None,
filename=file.filename,
parent_directory=Path(parent_directory),
mime_type=file.content_type or "application/octet-stream",
size=file_size,
)
# Ensure parent directory exists
full_parent_dir = self._storage_directory / file_metadata.parent_directory
full_parent_dir.mkdir(parents=True, exist_ok=True)
if (filepath := self._storage_directory / file_metadata.filepath).exists():
error_msg = f"File already exists in storage: {filepath}"
logger.error(error_msg)
self._cleanup_temp_file(temp_filepath=temp_filepath)
raise HTTPException(status_code=ResponseCode.CONFLICT, detail=error_msg)
# Move temp file to final location
try:
temp_filepath.rename(filepath)
except OSError as e:
error_msg = f"Failed to move file to final location: {filepath}"
logger.exception(error_msg)
self._cleanup_temp_file(temp_filepath=temp_filepath)
raise HTTPException(status_code=ResponseCode.INTERNAL_SERVER_ERROR, detail=error_msg) from e
try:
created_file_metadata = self._db.perform_file_metadata_action(
action=DatabaseAction.CREATE, file_metadata=file_metadata
)
except ValueError as e:
# Cleanup the file if database operation fails
if filepath.exists():
filepath.unlink()
error_msg = f"Failed to create file metadata in database for file: {file_metadata}"
logger.exception(error_msg)
raise HTTPException(status_code=ResponseCode.INTERNAL_SERVER_ERROR, detail=error_msg) from e
if file_metadata.mime_type.startswith(("image/", "video/")):
self._thumbnail_generator.generate_thumbnail(
filepath=filepath,
mime_type=file_metadata.mime_type,
file_id=created_file_metadata.id, # type: ignore[arg-type]
thumbnail_size=self._storage_config.thumbnail_size,
)
return UploadFileResponse(
message="File uploaded successfully.",
file_metadata=created_file_metadata,
)
[docs]
async def get_file(self, request: Request, file_id: int) -> FileResponse:
"""Get a file by its ID.
:param Request request: The incoming HTTP request
:param int file_id: The ID of the file to retrieve
:return FileResponse: The requested file
:raises HTTPException: If the file metadata doesn't exist in the database or if the file is not found in storage
"""
# Retrieve file metadata from database
try:
file_metadata = self._db.perform_file_metadata_action(action=DatabaseAction.READ, file_id=file_id)
except ValueError as e:
error_msg = f"File metadata doesn't exist in database for file {file_id}!"
logger.exception(error_msg)
raise HTTPException(status_code=ResponseCode.NOT_FOUND, detail=error_msg) from e
# Validate file exists in storage
if not (filepath := self._storage_directory / file_metadata.filepath).exists():
error_msg = f"File {file_id} not found in storage: {filepath}"
logger.error(error_msg)
raise HTTPException(status_code=ResponseCode.NOT_FOUND, detail=error_msg)
return FileResponse(
path=filepath,
filename=file_metadata.filename,
media_type=file_metadata.mime_type,
)
[docs]
async def delete_file(self, request: Request, file_id: int) -> DeleteFileResponse:
"""Delete a file by its ID.
:param Request request: The incoming HTTP request
:param int file_id: The ID of the file to delete
:return DeleteFileResponse: File deletion response
:raises HTTPException: If the file metadata doesn't exist in the database or if the file is not found in storage
"""
# First, delete the file metadata from the database to get the file path, then delete the file from storage
try:
file_metadata = self._db.perform_file_metadata_action(action=DatabaseAction.DELETE, file_id=file_id)
except ValueError as e:
error_msg = f"File metadata doesn't exist in database for file {file_id}!"
logger.exception(error_msg)
raise HTTPException(status_code=ResponseCode.NOT_FOUND, detail=error_msg) from e
# Validate file exists in storage before attempting to delete
if not (filepath := self._storage_directory / file_metadata.filepath).exists():
error_msg = f"File {file_id} not found in storage: {filepath}"
logger.error(error_msg)
raise HTTPException(status_code=ResponseCode.NOT_FOUND, detail=error_msg)
filepath.unlink()
logger.info("Deleted file %s from storage: %s", file_id, filepath)
if not any(filepath.parent.iterdir()):
filepath.parent.rmdir()
logger.info("Deleted empty parent directory: %s", filepath.parent)
if (thumbnail_path := self._thumbnail_generator.get_thumbnail_path(file_id=file_id)).exists():
thumbnail_path.unlink()
return DeleteFileResponse(
message="File deleted successfully.",
file_metadata=file_metadata,
)
[docs]
async def get_thumbnail(self, request: Request, file_id: int) -> FileResponse:
"""Get a thumbnail for a file by its ID.
:param Request request: The incoming HTTP request
:param int file_id: The ID of the file to get a thumbnail for
:return FileResponse: The requested thumbnail image
:raises HTTPException: If the file metadata doesn't exist in the database or if the file is not found in storage
"""
# Retrieve file metadata from database
try:
file_metadata = self._db.perform_file_metadata_action(action=DatabaseAction.READ, file_id=file_id)
except ValueError as e:
error_msg = f"File metadata doesn't exist in database for file {file_id}!"
logger.exception(error_msg)
raise HTTPException(status_code=ResponseCode.NOT_FOUND, detail=error_msg) from e
thumbnail_path = self._thumbnail_generator.get_thumbnail_path(file_id=file_metadata.id) # type: ignore[arg-type]
return FileResponse(
path=thumbnail_path,
filename=thumbnail_path.name,
media_type="image/jpeg",
)