import os import uuid import shutil from pathlib import Path from typing import Tuple, Optional, Dict, Any from PIL import Image from fastapi import UploadFile, HTTPException from app.core.config import settings import logging logger = logging.getLogger(__name__) class FileService: """Service for handling file uploads with validation, image processing, and storage.""" def __init__(self): self.upload_dir = Path(settings.UPLOAD_DIR) self.max_file_size = settings.MAX_FILE_SIZE self.allowed_extensions = settings.ALLOWED_EXTENSIONS self.max_width = settings.MAX_IMAGE_WIDTH self.max_height = settings.MAX_IMAGE_HEIGHT self.thumbnail_size = settings.THUMBNAIL_SIZE # Ensure upload directory exists self.upload_dir.mkdir(parents=True, exist_ok=True) def validate_file(self, file: UploadFile) -> None: """Validate uploaded file.""" # Check file size if file.size and file.size > self.max_file_size: raise HTTPException( status_code=413, detail=f"File too large. Maximum size is {self.max_file_size // (1024*1024)}MB" ) # Check file extension if not file.filename: raise HTTPException(status_code=400, detail="No filename provided") file_extension = file.filename.lower().split('.')[-1] if file_extension not in self.allowed_extensions: raise HTTPException( status_code=400, detail=f"File type not allowed. Allowed types: {', '.join(self.allowed_extensions)}" ) def get_file_extension(self, filename: str) -> str: """Get file extension in lowercase.""" return filename.lower().split('.')[-1] if '.' in filename else '' def generate_unique_filename(self, original_filename: str, expense_id: Optional[int] = None) -> str: """Generate unique filename with original extension.""" extension = self.get_file_extension(original_filename) unique_id = str(uuid.uuid4())[:8] if expense_id: return f"{expense_id}_{unique_id}.{extension}" else: return f"{unique_id}.{extension}" async def process_image(self, image_path: Path, max_width: int = None, max_height: int = None) -> Tuple[Image.Image, Dict[str, Any]]: """Process image: resize if needed, return processed image and metadata.""" try: with Image.open(image_path) as img: # Convert to RGB if necessary (for JPEG compatibility) if img.mode in ('RGBA', 'LA', 'P'): background = Image.new('RGB', img.size, (255, 255, 255)) if img.mode == 'P': img = img.convert('RGBA') background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None) img = background original_width, original_height = img.size metadata = { 'original_size': (original_width, original_height), 'format': img.format, 'mode': img.mode } # Calculate new dimensions if resizing is needed max_width = max_width or self.max_width max_height = max_height or self.max_height if original_width > max_width or original_height > max_height: # Calculate aspect ratio preserving dimensions ratio = min(max_width / original_width, max_height / original_height) new_width = int(original_width * ratio) new_height = int(original_height * ratio) img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) metadata['resized'] = True metadata['new_size'] = (new_width, new_height) else: metadata['resized'] = False # Create a copy of the image to avoid file handle issues img_copy = img.copy() return img_copy, metadata except Exception as e: logger.error(f"Error processing image {image_path}: {e}") raise HTTPException(status_code=400, detail="Invalid image file") def create_thumbnail(self, img: Image.Image, expense_id: int, original_filename: str) -> str: """Create thumbnail for an image.""" thumbnail_dir = self.upload_dir / "thumbnails" thumbnail_dir.mkdir(exist_ok=True) # Create thumbnail thumbnail_img = img.copy() thumbnail_img.thumbnail((self.thumbnail_size, self.thumbnail_size), Image.Resampling.LANCZOS) # Save thumbnail extension = self.get_file_extension(original_filename) thumbnail_filename = f"{expense_id}_{uuid.uuid4().hex[:8]}_thumb.{extension}" thumbnail_path = thumbnail_dir / thumbnail_filename # Ensure thumbnail directory exists thumbnail_path.parent.mkdir(parents=True, exist_ok=True) if extension.lower() in ('jpg', 'jpeg'): thumbnail_img.save(thumbnail_path, 'JPEG', quality=85, optimize=True) elif extension.lower() == 'png': thumbnail_img.save(thumbnail_path, 'PNG', optimize=True) elif extension.lower() == 'webp': thumbnail_img.save(thumbnail_path, 'WEBP', quality=85, optimize=True) return f"thumbnails/{thumbnail_filename}" async def save_upload_file( self, file: UploadFile, trip_id: int, expense_id: Optional[int] = None ) -> Tuple[str, str, Dict[str, Any]]: """ Save uploaded file with validation and processing. Returns: Tuple of (file_url, thumbnail_url, metadata) """ self.validate_file(file) # Create trip directory if it doesn't exist trip_dir = self.upload_dir / str(trip_id) trip_dir.mkdir(parents=True, exist_ok=True) # Generate unique filename filename = self.generate_unique_filename(file.filename, expense_id) file_path = trip_dir / filename # Save file temporarily for processing try: with open(file_path, "wb") as buffer: content = await file.read() buffer.write(content) # Process image processed_img, metadata = await self.process_image(file_path) # Save processed image (overwrite original) if self.get_file_extension(file.filename) in ('jpg', 'jpeg'): processed_img.save(file_path, 'JPEG', quality=85, optimize=True) elif self.get_file_extension(file.filename) == 'png': processed_img.save(file_path, 'PNG', optimize=True) elif self.get_file_extension(file.filename) == 'webp': processed_img.save(file_path, 'WEBP', quality=85, optimize=True) # Create thumbnail if we have an expense_id thumbnail_url = "" if expense_id: thumbnail_url = self.create_thumbnail(processed_img, expense_id, file.filename) # Generate URLs file_url = f"{trip_id}/{filename}" metadata['file_size'] = file_path.stat().st_size metadata['filename'] = filename metadata['file_url'] = file_url return file_url, thumbnail_url, metadata except Exception as e: # Clean up file if processing fails if file_path.exists(): file_path.unlink() logger.error(f"Error saving file {file.filename}: {e}") raise HTTPException(status_code=500, detail="Failed to save file") def delete_file(self, file_url: str) -> bool: """Delete a file and its thumbnail.""" try: # Delete main file file_path = self.upload_dir / file_url if file_path.exists(): file_path.unlink() # Delete thumbnail if it exists if '/' in file_url: trip_id, filename = file_url.split('/', 1) thumbnail_dir = self.upload_dir / "thumbnails" # Find and delete thumbnail (different naming pattern) if hasattr(self, '_delete_thumbnail_for_file'): self._delete_thumbnail_for_file(thumbnail_dir, filename) else: # Simple cleanup: remove all thumbnails for this expense_id expense_id = filename.split('_')[0] if '_' in filename else None if expense_id and expense_id.isdigit(): for thumb_file in thumbnail_dir.glob(f"{expense_id}_*_thumb.*"): thumb_file.unlink() return True except Exception as e: logger.error(f"Error deleting file {file_url}: {e}") return False def _delete_thumbnail_for_file(self, thumbnail_dir: Path, filename: str) -> None: """Delete thumbnail for a specific file.""" # This is a placeholder for more sophisticated thumbnail deletion logic # For now, the simple cleanup in delete_file method should be sufficient pass def get_file_path(self, file_url: str) -> Path: """Get full filesystem path for a file URL.""" return self.upload_dir / file_url def file_exists(self, file_url: str) -> bool: """Check if a file exists.""" return self.get_file_path(file_url).exists() def cleanup_orphaned_files(self, valid_file_urls: list[str]) -> int: """Clean up orphaned files not in the valid_file_urls list. Returns: Number of files deleted """ deleted_count = 0 try: # Walk through all files in upload directory for file_path in self.upload_dir.rglob("*"): if file_path.is_file(): # Calculate relative URL relative_path = file_path.relative_to(self.upload_dir) file_url = str(relative_path).replace('\\', '/') # Ensure forward slashes # Skip thumbnails as they'll be cleaned up with main files if 'thumbnails' in file_url: continue # Delete if not in valid list if file_url not in valid_file_urls: file_path.unlink() deleted_count += 1 logger.info(f"Deleted orphaned file: {file_url}") # Clean up orphaned thumbnails thumbnail_dir = self.upload_dir / "thumbnails" if thumbnail_dir.exists(): # Get all expense IDs from valid URLs valid_expense_ids = set() for url in valid_file_urls: filename = url.split('/')[-1] if '/' in url else url if '_' in filename: expense_id = filename.split('_')[0] if expense_id.isdigit(): valid_expense_ids.add(expense_id) # Delete thumbnails for expense IDs not in valid list for thumb_file in thumbnail_dir.glob("*_thumb.*"): thumb_filename = thumb_file.name expense_id = thumb_filename.split('_')[0] if expense_id not in valid_expense_ids: thumb_file.unlink() deleted_count += 1 logger.info(f"Deleted orphaned thumbnail: {thumb_filename}") except Exception as e: logger.error(f"Error during cleanup: {e}") return deleted_count def delete_receipt_file(self, receipt_url: str) -> bool: """ Delete a specific receipt file and its associated thumbnail. Args: receipt_url: URL of the receipt file to delete Returns: bool: True if deletion was successful, False otherwise """ try: # Get file path from URL file_path = self.get_file_path(receipt_url) # Delete the main receipt file if file_path.exists(): file_path.unlink() logger.info(f"Deleted receipt file: {receipt_url}") # Delete associated thumbnail if it exists thumbnail_dir = self.upload_dir / "thumbnails" if thumbnail_dir.exists(): # Try to find and delete the thumbnail # Extract potential expense_id from the filename filename = receipt_url.split('/')[-1] if '/' in receipt_url else receipt_url for thumb_file in thumbnail_dir.glob(f"*_{filename}*thumb*"): thumb_file.unlink() logger.info(f"Deleted receipt thumbnail: {thumb_file.name}") return True except Exception as e: logger.error(f"Error deleting receipt file {receipt_url}: {e}") return False def delete_expense_files(self, expense_id: int, receipt_url: Optional[str] = None) -> bool: """ Delete files associated with a specific expense. Args: expense_id: ID of the expense receipt_url: Optional URL of the receipt file to delete Returns: bool: True if deletion was successful, False otherwise """ try: # Delete receipt file if provided if receipt_url: self.delete_receipt_file(receipt_url) # Delete any thumbnails for this expense_id thumbnail_dir = self.upload_dir / "thumbnails" if thumbnail_dir.exists(): # Look for thumbnails that start with this expense_id for thumb_file in thumbnail_dir.glob(f"{expense_id}_*thumb.*"): thumb_file.unlink() logger.info(f"Deleted expense thumbnail: {thumb_file.name}") return True except Exception as e: logger.error(f"Error deleting files for expense {expense_id}: {e}") return False # Global file service instance file_service = FileService()