From 3111a1748d4b646175e6a7c5be26ae92e941d284 Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 8 Oct 2025 22:48:47 +0530 Subject: [PATCH 1/3] UN-2860 [FIX] Fixed active file cache not preventing duplicate file processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixed critical bug where ActiveFileFilter cache checks were failing to detect files already being processed, causing duplicate file processing in concurrent workflow executions. Key fixes: - Fixed cache data access: Extract execution_id from nested cache structure (cached_data["data"]["execution_id"] instead of cached_data["execution_id"]) - Changed cache status from "EXECUTING" to "PENDING" for queued files - Increased MAX_ACTIVE_FILE_CACHE_TTL from 1hr to 2hrs for resource-constrained environments - Added cache cleanup in finally blocks to prevent stale entries - Fixed cache key format consistency (hash-based) between backend and workers - Optimized DB queries to skip files already found in cache - Removed ~370 lines of dead code (file_management_utils.py and unused methods) Root cause: RedisCacheBackend wraps data in {data: {...}, cached_at, ttl} but filter_pipeline was accessing execution_id directly instead of from nested data key. ๐Ÿค– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- backend/workflow_manager/internal_views.py | 50 +- workers/file_processing/tasks.py | 43 +- workers/sample.env | 3 + workers/shared/clients/file_client.py | 8 +- workers/shared/processing/filter_pipeline.py | 113 +++-- workers/shared/workflow/execution/__init__.py | 2 - .../workflow/execution/active_file_manager.py | 215 +-------- .../execution/file_management_utils.py | 426 ------------------ 8 files changed, 126 insertions(+), 734 deletions(-) delete mode 100644 workers/shared/workflow/execution/file_management_utils.py diff --git a/backend/workflow_manager/internal_views.py b/backend/workflow_manager/internal_views.py index 4923e71002..af6431b22e 100644 --- a/backend/workflow_manager/internal_views.py +++ b/backend/workflow_manager/internal_views.py @@ -5,7 +5,6 @@ import logging import uuid -from django.core.cache import cache from django.db import transaction from django.shortcuts import get_object_or_404 from django.utils import timezone @@ -731,10 +730,10 @@ def post(self, request): # Check for files in PENDING or EXECUTING state in other workflow executions active_files = {} # {uuid: [execution_data]} - legacy format active_identifiers = set() # Composite identifiers for new format - cache_hits = 0 db_queries = 0 - # Step 1: Check cache for all files and separate files that need database queries + # Prepare all files for database check + # Note: Workers already check cache before calling this API, so no need to check again files_needing_db_check = [] for file_data in files: @@ -744,42 +743,7 @@ def post(self, request): f"{provider_uuid}:{file_path}" if file_path else provider_uuid ) - # 1. Check completion cache first (highest priority) - completion_key = f"file_completed:{workflow_id}:{provider_uuid}" - completion_data = cache.get(completion_key) - - if completion_data: - logger.debug( - f"File {provider_uuid} found in completion cache, skipping" - ) - continue # Skip - recently completed - - # 2. Check active processing cache (path-aware) - cached_active = None - - if file_path is not None: - # Use precise path-aware cache key - active_key = f"file_active:{workflow_id}:{provider_uuid}:{file_path}" - cached_active = cache.get(active_key) - if cached_active: - logger.debug( - f"File {provider_uuid}:{file_path} found in path-aware cache" - ) - else: - # No file path available, skip cache check for files without path - cached_active = None - - if cached_active: - # Verify it's not the current execution - if cached_active.get("execution_id") != current_execution_id: - # Track in both formats - active_files[provider_uuid] = [cached_active] - active_identifiers.add(composite_id) - cache_hits += 1 - logger.debug(f"File {composite_id} found in active cache") - continue - - # File needs database check - add to batch + # All files need database check files_needing_db_check.append( { "uuid": provider_uuid, @@ -788,7 +752,7 @@ def post(self, request): } ) - # Step 2: Bulk database queries for all files that need database check + # Bulk database query for all files if files_needing_db_check: logger.info( f"[ActiveCheck] Performing bulk database check for {len(files_needing_db_check)} files" @@ -804,7 +768,7 @@ def post(self, request): logger.info( f"[ActiveCheck] Active check complete: {len(active_files)}/{len(files)} files active " - f"(cache_hits: {cache_hits}, db_queries: {db_queries})" + f"(db_queries: {db_queries})" ) # Log final active identifiers for debugging @@ -827,11 +791,7 @@ def post(self, request): "total_checked": len(files), "total_active": len(active_files), "cache_stats": { - "cache_hits": cache_hits, "db_queries": db_queries, - "cache_hit_rate": f"{(cache_hits / len(files) * 100):.1f}%" - if files - else "0.0%", }, } ) diff --git a/workers/file_processing/tasks.py b/workers/file_processing/tasks.py index 2a0fc15ed0..7587c38736 100644 --- a/workers/file_processing/tasks.py +++ b/workers/file_processing/tasks.py @@ -837,6 +837,43 @@ def _compile_batch_result(context: WorkflowContextData) -> dict[str, Any]: # These functions support the refactored file processing workflow +def _cleanup_file_cache_entry( + file_hash: FileHashData, + workflow_id: str, + file_name: str, +) -> None: + """Helper to cleanup active file cache entry after DB record creation attempt. + + This should be called after attempting to create WorkflowFileExecution, + regardless of success or failure, to prevent stale cache entries from + blocking future executions. + + Args: + file_hash: File hash data containing provider_file_uuid + workflow_id: Workflow ID for cache key + file_name: File name for logging + """ + if not file_hash.provider_file_uuid: + return + + try: + from shared.workflow.execution.active_file_manager import ( + cleanup_active_file_cache, + ) + + cleanup_active_file_cache( + provider_file_uuids=[file_hash.provider_file_uuid], + workflow_id=workflow_id, + logger_instance=logger, + ) + logger.debug( + f"Cleaned cache for {file_name} (UUID: {file_hash.provider_file_uuid})" + ) + except Exception as cleanup_error: + logger.warning(f"Cache cleanup failed for {file_name}: {cleanup_error}") + # Don't raise - cache will expire anyway + + def _pre_create_file_executions( file_data: WorkerFileData, files: list[Any], @@ -867,7 +904,7 @@ def _pre_create_file_executions( # Use the file history flag passed from execution parameters logger.info( - f"Using file history parameter for workflow {workflow_id} (type: {workflow_type}): use_file_history = {use_file_history}" + f"Using file history parameter for workflow {workflow_id} execution {execution_id} (type: {workflow_type}): use_file_history = {use_file_history}" ) for file_item in files: @@ -936,6 +973,10 @@ def _pre_create_file_executions( ) # Continue with other files even if one fails + finally: + # Always cleanup cache (success or failure) to prevent stale entries + _cleanup_file_cache_entry(file_hash, workflow_id, file_name) + # File history deduplication now handled during individual file processing return pre_created_data diff --git a/workers/sample.env b/workers/sample.env index d5a5e8d765..7ee8c6beda 100644 --- a/workers/sample.env +++ b/workers/sample.env @@ -257,6 +257,9 @@ EXECUTION_RESULT_TTL_SECONDS=86400 EXECUTION_CACHE_TTL_SECONDS=86400 INSTANT_WF_POLLING_TIMEOUT=300 +# Active File Execution cache +ACTIVE_FILE_CACHE_TTL=300 + # ============================================================================= # Development Settings # ============================================================================= diff --git a/workers/shared/clients/file_client.py b/workers/shared/clients/file_client.py index a0f8592bf5..92d0f350af 100644 --- a/workers/shared/clients/file_client.py +++ b/workers/shared/clients/file_client.py @@ -127,7 +127,8 @@ def get_or_create_workflow_file_execution( logger.info( f"FileHashData debug: file_name='{file_hash_data.file_name}', " f"has_hash={file_hash_data.has_hash()}, " - f"source_connection_type='{getattr(file_hash_data, 'source_connection_type', None)}'" + f"source_connection_type='{getattr(file_hash_data, 'source_connection_type', None)}', " + f"execution_id='{execution_id}'" ) # CRITICAL FIX: For API files with pre-calculated hash, skip hash computation @@ -219,11 +220,12 @@ def get_or_create_workflow_file_execution( data = create_request.to_dict() logger.info( - f"Creating workflow file execution with file_hash: {file_hash_data.file_name}" + f"Creating workflow file execution: '{execution_id}' with file_name: {file_hash_data.file_name}" ) logger.info( f"FileHashData key identifiers: provider_file_uuid='{file_hash_data.provider_file_uuid}', " - f"file_path='{file_hash_data.file_path}', file_hash='{file_hash_data.file_hash}'" + f"file_path='{file_hash_data.file_path}', file_hash='{file_hash_data.file_hash}', " + f"execution='{execution_id}'." ) logger.debug(f"FileHashData: {file_hash_data.to_dict()}") diff --git a/workers/shared/processing/filter_pipeline.py b/workers/shared/processing/filter_pipeline.py index a02b955dfc..8287e48937 100644 --- a/workers/shared/processing/filter_pipeline.py +++ b/workers/shared/processing/filter_pipeline.py @@ -12,6 +12,7 @@ from ..api.internal_client import InternalAPIClient from ..cache.cache_backends import RedisCacheBackend from ..infrastructure.logging import WorkerLogger +from ..workflow.execution.active_file_manager import ActiveFileManager logger = WorkerLogger.get_logger(__name__) @@ -594,7 +595,7 @@ def apply( logger.info( f"[ActiveFileFilter] {len(files)} โ†’ {len(filtered)} files " - f"({len(active_identifiers)} currently active)" + f"({len(active_identifiers)} currently active) for execution_id {execution_id}" ) return filtered @@ -634,12 +635,14 @@ def _check_active_files_batch( uuid = data["uuid"] file_path = data["path"] - # Create precise cache key instead of pattern matching - cache_key = f"file_active:{workflow_id}:{uuid}:{file_path}" + # Create precise cache key using same hashing logic as ActiveFileManager + cache_key = ActiveFileManager._create_cache_key( + workflow_id, uuid, file_path + ) cached_data = cache.get(cache_key) if cached_data and isinstance(cached_data, dict): - cached_exec_id = cached_data.get("execution_id") + cached_exec_id = cached_data.get("data", {}).get("execution_id") if cached_exec_id and cached_exec_id != execution_id: active_identifiers.add(identifier) logger.debug( @@ -648,52 +651,72 @@ def _check_active_files_batch( except Exception as e: logger.warning(f"[ActiveFileFilter] Cache check failed: {e}") - # 2. Check database for active files (single batch API call) + logger.info( + f"[ActiveFileFilter] found {len(active_identifiers)} from cache for execution {execution_id}" + ) + # 2. Check database for active files (only files not found in cache) try: - # Prepare composite file information for the API call - files_for_api = [] - for identifier in identifiers_to_check: - data = file_identifiers[identifier] - files_for_api.append({"uuid": data["uuid"], "path": data["path"]}) + # Filter out files already found active in cache to reduce DB query size + remaining_identifiers = [ + identifier + for identifier in identifiers_to_check + if identifier not in active_identifiers + ] + + if not remaining_identifiers: + logger.info( + f"[ActiveFileFilter] All files already checked via cache, skipping database check for execution_id {execution_id}" + ) + else: + logger.info( + f"[ActiveFileFilter] Checking {len(remaining_identifiers)} remaining files in database " + f"({len(active_identifiers)} already found in cache) for execution_id {execution_id}" + ) - response = api_client.check_files_active_processing( - workflow_id=workflow_id, - files=files_for_api, - current_execution_id=execution_id, - ) + # Prepare composite file information for the API call + files_for_api = [] + for identifier in remaining_identifiers: + data = file_identifiers[identifier] + files_for_api.append({"uuid": data["uuid"], "path": data["path"]}) - if response.success and response.data: - # Backend returns: {"active_files": {uuid: [exec_data]}, "active_uuids": [uuid1, uuid2], "active_identifiers": ["uuid:path"]} - # Use the new composite identifiers if available, fallback to legacy format - active_composite_ids = response.data.get("active_identifiers", []) - if active_composite_ids: - # New path-aware format - logger.debug( - f"[ActiveFileFilter] Backend reported {len(active_composite_ids)} active identifiers: {active_composite_ids}" - ) - for composite_id in active_composite_ids: - if composite_id in identifiers_to_check: - active_identifiers.add(composite_id) - logger.debug( - f"[ActiveFileFilter] File {composite_id} active in database" - ) - else: - # Fallback to legacy format - active_uuids = response.data.get("active_uuids", []) - logger.debug( - f"[ActiveFileFilter] Backend reported {len(active_uuids)} active UUIDs (legacy): {active_uuids}" - ) + response = api_client.check_files_active_processing( + workflow_id=workflow_id, + files=files_for_api, + current_execution_id=execution_id, + ) + + if response.success and response.data: + # Backend returns: {"active_files": {uuid: [exec_data]}, "active_uuids": [uuid1, uuid2], "active_identifiers": ["uuid:path"]} + # Use the new composite identifiers if available, fallback to legacy format + active_composite_ids = response.data.get("active_identifiers", []) + if active_composite_ids: + # New path-aware format + logger.debug( + f"[ActiveFileFilter] Backend reported {len(active_composite_ids)} active identifiers: {active_composite_ids}" + ) + for composite_id in active_composite_ids: + if composite_id in remaining_identifiers: + active_identifiers.add(composite_id) + logger.debug( + f"[ActiveFileFilter] File {composite_id} active in database" + ) + else: + # Fallback to legacy format + active_uuids = response.data.get("active_uuids", []) + logger.debug( + f"[ActiveFileFilter] Backend reported {len(active_uuids)} active UUIDs (legacy): {active_uuids}" + ) - # Map back to identifiers - for identifier in identifiers_to_check: - data = file_identifiers[identifier] - uuid = data["uuid"] + # Map back to identifiers + for identifier in remaining_identifiers: + data = file_identifiers[identifier] + uuid = data["uuid"] - if uuid in active_uuids: - active_identifiers.add(identifier) - logger.debug( - f"[ActiveFileFilter] File {identifier} active in database (legacy mapping)" - ) + if uuid in active_uuids: + active_identifiers.add(identifier) + logger.debug( + f"[ActiveFileFilter] File {identifier} active in database (legacy mapping)" + ) except Exception as e: logger.warning(f"[ActiveFileFilter] Database check failed: {e}") diff --git a/workers/shared/workflow/execution/__init__.py b/workers/shared/workflow/execution/__init__.py index 62c0e91a41..fc20f8995d 100644 --- a/workers/shared/workflow/execution/__init__.py +++ b/workers/shared/workflow/execution/__init__.py @@ -6,7 +6,6 @@ from .active_file_manager import ActiveFileManager from .context import WorkerExecutionContext -from .file_management_utils import FileManagementUtils from .orchestration_utils import WorkflowOrchestrationUtils from .service import WorkerWorkflowExecutionService @@ -15,5 +14,4 @@ "WorkflowOrchestrationUtils", "WorkerWorkflowExecutionService", "ActiveFileManager", - "FileManagementUtils", ] diff --git a/workers/shared/workflow/execution/active_file_manager.py b/workers/shared/workflow/execution/active_file_manager.py index 3ff8065287..546d4ddcf6 100644 --- a/workers/shared/workflow/execution/active_file_manager.py +++ b/workers/shared/workflow/execution/active_file_manager.py @@ -15,13 +15,12 @@ import time from typing import Any, Protocol -from ...api.internal_client import InternalAPIClient from ...cache.cache_backends import RedisCacheBackend from ...infrastructure.logging import WorkerLogger # Constants for cache configuration DEFAULT_ACTIVE_FILE_CACHE_TTL = 300 # 5 minutes -MAX_ACTIVE_FILE_CACHE_TTL = 3600 # 1 hour maximum +MAX_ACTIVE_FILE_CACHE_TTL = 7200 # 2 hours maximum def get_active_file_cache_ttl() -> int: @@ -61,190 +60,6 @@ class ActiveFileManager: cache pattern. They handle duplicate processing through their own mechanisms. """ - @staticmethod - def filter_and_cache_files( - source_files: dict[str, Any], - workflow_id: str, - execution_id: str, - api_client: Any, - logger_instance: LoggerProtocol | None = None, - final_files_to_process: dict[str, Any] | None = None, - ) -> tuple[dict[str, Any], int, dict[str, Any]]: - """Filter out active files and create cache entries for files to be processed. - - This method performs three key operations: - 1. Checks cache and database for files already being processed - 2. Creates cache entries ONLY for files that will actually be processed (race condition prevention) - 3. Filters the source_files dict to remove active files - - Args: - source_files: Dictionary of source files to process - workflow_id: Workflow identifier - execution_id: Current execution identifier - api_client: API client for database checks - logger_instance: Optional logger override (uses module logger if None) - final_files_to_process: Optional dict of files that will actually be processed (after limits) - If provided, cache entries are created only for these files - - Returns: - Tuple of (filtered_source_files, new_file_count, filtering_stats) - - Example: - >>> files = {"file1": {"provider_file_uuid": "uuid1"}} - >>> filtered, count, stats = ActiveFileManager.filter_and_cache_files( - ... source_files=files, - ... workflow_id="workflow-123", - ... execution_id="exec-456", - ... api_client=client, - ... ) - >>> print(f"Processing {count} files, stats: {stats}") - """ - log = logger_instance or logger - - if not source_files: - return ( - source_files, - 0, - {"original_count": 0, "filtered_count": 0, "skipped_files": []}, - ) - - original_count = len(source_files) - filtering_stats = { - "original_count": original_count, - "cache_active": [], # Files found active in cache - "db_active": [], # Files found active in database - "processing_files": [], # Files that will be processed - "cache_created": 0, # Successfully created cache entries - "cache_errors": 0, # Failed cache operations - "filtered_count": original_count, # Will be updated if filtering occurs - } - - try: - # Extract provider_file_uuids and file paths from source files for checking - provider_file_map = {} # provider_uuid -> file_key mapping (for backward compatibility) - file_tracking_data = {} # file_key -> {provider_uuid, file_path, file_data} mapping - - for file_key, file_data in source_files.items(): - provider_uuid = ActiveFileManager._extract_provider_uuid(file_data) - file_path = ActiveFileManager._extract_file_path(file_data) - - if provider_uuid: - # For backward compatibility with database checks - provider_file_map[provider_uuid] = file_key - # New tracking structure includes both provider_uuid and file_path - file_tracking_data[file_key] = { - "provider_uuid": provider_uuid, - "file_path": file_path - or file_key, # fallback to file_key if no file_path - "file_data": file_data, - } - - if not provider_file_map: - log.warning( - "No provider_file_uuid found in source files, proceeding without filtering" - ) - return source_files, original_count, filtering_stats - - log.info( - f"Checking {len(provider_file_map)} files for active processing conflicts" - ) - log.debug(f"Current execution_id: {execution_id}, workflow_id: {workflow_id}") - - active_files_to_skip = set() - - # STEP 1: Check active_file cache for OTHER executions - try: - cache_stats = ActiveFileManager._handle_cache_check( - file_tracking_data=file_tracking_data, - workflow_id=workflow_id, - execution_id=execution_id, - log=log, - ) - active_files_to_skip.update(cache_stats["active_files"]) - filtering_stats.update(cache_stats["stats"]) - - except Exception as cache_error: - log.warning(f"Active file cache operations failed: {cache_error}") - - # STEP 2: Database check for files in PENDING/EXECUTING state (backend only reads cache, doesn't create) - try: - db_active_provider_uuids = ActiveFileManager._check_database_active_files( - api_client=api_client, - workflow_id=workflow_id, - execution_id=execution_id, - provider_file_map=provider_file_map, - log=log, - ) - - if db_active_provider_uuids: - # Convert provider UUIDs back to file keys for filtering - db_active_file_keys = { - provider_file_map[provider_uuid] - for provider_uuid in db_active_provider_uuids - if provider_uuid in provider_file_map - } - - new_db_active = db_active_file_keys - active_files_to_skip - active_files_to_skip.update(db_active_file_keys) - filtering_stats["db_active"].extend(list(new_db_active)) - log.info( - f"๐Ÿ“Š Found {len(new_db_active)} additional files active in database" - ) - - except Exception as db_error: - log.warning(f"Database file check failed: {db_error}") - - # STEP 3: Filter source_files to remove active ones (now using file_keys directly) - if active_files_to_skip: - filtered_files, new_count = ( - ActiveFileManager._filter_source_files_by_keys( - source_files=source_files, - active_file_keys_to_skip=active_files_to_skip, - log=log, - ) - ) - - filtering_stats["filtered_count"] = new_count - filtering_stats["skipped_files"] = list(active_files_to_skip) - - log.info( - f"๐Ÿ”„ Filtered files: {original_count} โ†’ {new_count} " - f"(removed {len(active_files_to_skip)} active files)" - ) - - # STEP 4: Create cache entries only for files that will actually be processed - if final_files_to_process: - ActiveFileManager._create_cache_entries_for_selected_files( - final_files_to_process=final_files_to_process, - file_tracking_data=file_tracking_data, - workflow_id=workflow_id, - execution_id=execution_id, - log=log, - filtering_stats=filtering_stats, - ) - - return filtered_files, new_count, filtering_stats - else: - log.info("โœ… No active files found - processing all files") - - # Create cache entries for files that will actually be processed - if final_files_to_process: - ActiveFileManager._create_cache_entries_for_selected_files( - final_files_to_process=final_files_to_process, - file_tracking_data=file_tracking_data, - workflow_id=workflow_id, - execution_id=execution_id, - log=log, - filtering_stats=filtering_stats, - ) - - return source_files, original_count, filtering_stats - - except Exception as filter_error: - log.warning(f"File filtering failed: {filter_error}") - filtering_stats["error"] = str(filter_error) - return source_files, original_count, filtering_stats - @staticmethod def create_cache_entries( source_files: dict[str, Any], @@ -603,7 +418,7 @@ def _create_cache_entries_for_selected_files( "workflow_id": workflow_id, "provider_file_uuid": provider_uuid, "file_path": file_path, - "status": "EXECUTING", + "status": "PENDING", "created_at": time.time(), } @@ -719,7 +534,7 @@ def _create_cache_entry( "workflow_id": workflow_id, "provider_file_uuid": provider_uuid, "file_path": file_path, # Include file path in cache data - "status": "EXECUTING", + "status": "PENDING", "created_at": time.time(), } @@ -733,30 +548,6 @@ def _create_cache_entry( ) return False - @staticmethod - def _check_database_active_files( - api_client: InternalAPIClient, - workflow_id: str, - execution_id: str, - provider_file_map: dict[str, str], - log: LoggerProtocol, - ) -> set[str]: - """Check database for active files and return set of active provider UUIDs.""" - active_files_response = api_client.check_files_active_processing( - workflow_id=workflow_id, - provider_file_uuids=list(provider_file_map.keys()), - current_execution_id=execution_id, - ) - - if active_files_response.success: - active_files_data = active_files_response.data - return {uuid for uuid, is_active in active_files_data.items() if is_active} - else: - log.warning( - f"Database active file check failed: {active_files_response.error}" - ) - return set() - @staticmethod def _filter_source_files_by_keys( source_files: dict[str, Any], diff --git a/workers/shared/workflow/execution/file_management_utils.py b/workers/shared/workflow/execution/file_management_utils.py deleted file mode 100644 index 3dc0ba17a7..0000000000 --- a/workers/shared/workflow/execution/file_management_utils.py +++ /dev/null @@ -1,426 +0,0 @@ -"""File Management Utilities - -This module provides utility methods for file processing, filtering, and management -that can be used across different worker types (general, api-deployment, etc.). - -Each utility method has a single responsibility and can be composed as needed. -""" - -from typing import Any, Protocol - -from ...infrastructure.logging import WorkerLogger -from .active_file_manager import ActiveFileManager - -logger = WorkerLogger.get_logger(__name__) - - -class LoggerProtocol(Protocol): - """Protocol for logger objects to provide proper type hints.""" - - def debug(self, msg: str) -> None: ... - def info(self, msg: str) -> None: ... - def warning(self, msg: str) -> None: ... - def error(self, msg: str) -> None: ... - - -class FileFilterResult: - """Result of file filtering operations.""" - - def __init__( - self, - filtered_files: dict[str, Any], - filtered_count: int, - filtering_stats: dict[str, Any], - ): - self.filtered_files = filtered_files - self.filtered_count = filtered_count - self.filtering_stats = filtering_stats - self.original_count = filtering_stats.get("original_count", 0) - self.skipped_files = filtering_stats.get("skipped_files", []) - self.cache_active_count = len(filtering_stats.get("cache_active", [])) - self.db_active_count = len(filtering_stats.get("db_active", [])) - - def has_files(self) -> bool: - """Check if any files remain after filtering.""" - return self.filtered_count > 0 - - def all_files_active(self) -> bool: - """Check if all files were filtered out due to being active.""" - return self.original_count > 0 and self.filtered_count == 0 - - -class FileLimitResult: - """Result of applying file limits.""" - - def __init__( - self, limited_files: dict[str, Any], final_count: int, limit_applied: bool - ): - self.limited_files = limited_files - self.final_count = final_count - self.limit_applied = limit_applied - - -class FileManagementUtils: - """Utility methods for file processing and management across workers.""" - - @staticmethod - def apply_active_file_filtering( - source_files: dict[str, Any], - workflow_id: str, - execution_id: str, - api_client: Any, - logger_instance: LoggerProtocol | None = None, - final_files_to_process: dict[str, Any] | None = None, - ) -> FileFilterResult: - """Apply active file filtering to remove files being processed by other executions. - - Use this for ETL/TASK workflows that need to avoid duplicate processing. - API workflows typically skip this filtering. - - Args: - source_files: Dictionary of source files to filter - workflow_id: Workflow ID - execution_id: Current execution ID - api_client: API client for database checks - logger_instance: Optional logger instance - final_files_to_process: Optional dict of files that will actually be processed - If provided, cache entries are created only for these files - - Returns: - FileFilterResult with filtered files and statistics - """ - log = logger_instance or logger - - if not source_files: - return FileFilterResult( - filtered_files={}, - filtered_count=0, - filtering_stats={ - "original_count": 0, - "filtered_count": 0, - "skipped_files": [], - }, - ) - - log.info(f"๐Ÿ” Applying active file filtering for {len(source_files)} files") - - filtered_files, filtered_count, filtering_stats = ( - ActiveFileManager.filter_and_cache_files( - source_files=source_files, - workflow_id=workflow_id, - execution_id=execution_id, - api_client=api_client, - logger_instance=log, - final_files_to_process=final_files_to_process, - ) - ) - - result = FileFilterResult(filtered_files, filtered_count, filtering_stats) - - if result.all_files_active(): - log.warning( - "โš ๏ธ All discovered files are currently being processed by other executions" - ) - log.info( - "๐Ÿ’ก Tip: Wait for current executions to complete or discover more files" - ) - elif result.has_files(): - log.info( - f"โœ… {result.filtered_count} files available for processing after filtering" - ) - - return result - - @staticmethod - def apply_file_limit( - files: dict[str, Any], - max_limit: int, - logger_instance: LoggerProtocol | None = None, - ) -> FileLimitResult: - """Apply maximum file limit to a collection of files. - - Args: - files: Dictionary of files to limit - max_limit: Maximum number of files to allow - logger_instance: Optional logger instance - - Returns: - FileLimitResult with limited files - """ - log = logger_instance or logger - - if len(files) <= max_limit: - return FileLimitResult( - limited_files=files, final_count=len(files), limit_applied=False - ) - - log.info( - f"๐Ÿ“ Applying max files limit: taking {max_limit} files from {len(files)} available" - ) - - # Convert to list, take first N files, convert back to dict - limited_files = dict(list(files.items())[:max_limit]) - - return FileLimitResult( - limited_files=limited_files, final_count=max_limit, limit_applied=True - ) - - @staticmethod - def cleanup_active_file_cache( - provider_file_uuids: list[str], - workflow_id: str, - logger_instance: LoggerProtocol | None = None, - ) -> int: - """Clean up active file cache entries for completed/failed processing. - - Args: - provider_file_uuids: List of provider file UUIDs to clean up - workflow_id: Workflow ID - logger_instance: Optional logger instance - - Returns: - Number of cache entries cleaned up - """ - log = logger_instance or logger - - if not provider_file_uuids: - return 0 - - log.debug(f"๐Ÿงน Cleaning up cache entries for {len(provider_file_uuids)} files") - - return ActiveFileManager.cleanup_cache_entries( - provider_file_uuids=provider_file_uuids, workflow_id=workflow_id, log=log - ) - - @staticmethod - def create_file_cache_entries( - source_files: dict[str, Any], - files_to_cache: dict[str, Any], - workflow_id: str, - execution_id: str, - logger_instance: LoggerProtocol | None = None, - ) -> dict[str, Any]: - """Create cache entries for files to prevent race conditions (cache-only, no filtering). - - This is a utility wrapper around ActiveFileManager.create_cache_entries() that provides - a consistent interface for cache creation across different workflow types. Use this - after FilterPipeline has already applied all necessary filtering. - - Args: - source_files: Dictionary of all source files (used for tracking data) - files_to_cache: Dictionary of specific files to create cache entries for - workflow_id: Workflow ID - execution_id: Current execution ID - logger_instance: Optional logger instance - - Returns: - Cache statistics dictionary with creation results - - Example: - >>> # After FilterPipeline has processed files - >>> cache_stats = FileManagementUtils.create_file_cache_entries( - ... source_files=all_discovered_files, - ... files_to_cache=final_filtered_files, - ... workflow_id="workflow-123", - ... execution_id="exec-456", - ... ) - >>> print(f"Created {cache_stats['cache_created']} cache entries") - """ - log = logger_instance or logger - - if not files_to_cache: - log.debug("No files provided for cache creation") - return {"cache_created": 0, "cache_errors": 0, "processing_files": []} - - log.debug(f"Creating cache entries for {len(files_to_cache)} files") - - return ActiveFileManager.create_cache_entries( - source_files=source_files, - files_to_cache=files_to_cache, - workflow_id=workflow_id, - execution_id=execution_id, - logger_instance=log, - ) - - @staticmethod - def extract_provider_uuids(hash_values_of_files: dict[str, Any]) -> list[str]: - """Extract provider file UUIDs from file hash data. - - Args: - hash_values_of_files: Dictionary of file hash data - - Returns: - List of provider file UUIDs - """ - provider_uuids = [] - for hash_data in hash_values_of_files.values(): - if hasattr(hash_data, "provider_file_uuid") and hash_data.provider_file_uuid: - provider_uuids.append(hash_data.provider_file_uuid) - return provider_uuids - - @staticmethod - def log_filtering_stats( - filtering_stats: dict[str, Any], logger_instance: LoggerProtocol | None = None - ) -> None: - """Log detailed file filtering statistics. - - Args: - filtering_stats: Statistics from file filtering operations - logger_instance: Optional logger instance - """ - log = logger_instance or logger - - original_count = filtering_stats.get("original_count", 0) - filtered_count = filtering_stats.get("filtered_count", 0) - - if original_count > 0: - log.info( - f"๐Ÿ“Š File filtering results: {original_count} โ†’ {filtered_count} files" - ) - - cache_created = filtering_stats.get("cache_created", 0) - if cache_created > 0: - log.info(f"๐Ÿ”’ Created {cache_created} active_file cache entries") - - cache_active = filtering_stats.get("cache_active", []) - db_active = filtering_stats.get("db_active", []) - if cache_active or db_active: - cache_count = len(cache_active) - db_count = len(db_active) - log.info( - f"โšก Skipped {cache_count} cache-active + {db_count} db-active files" - ) - - @staticmethod - def process_files_with_active_filtering( - source_files: dict[str, Any], - workflow_id: str, - execution_id: str, - max_limit: int, - api_client: Any, - logger_instance: LoggerProtocol | None = None, - ) -> tuple[dict[str, Any], int]: - """Complete file processing pipeline with active filtering and limit. - - Processing order: - 1. Apply all filters (file history is already done, now cache + database) - 2. Take up to max_limit files from the filtered results - 3. Create cache entries ONLY for the final selected files - - **IMPORTANT**: Use ONLY for ETL/TASK workflows in @workers/general/ - Do NOT use for API deployments (@workers/api-deployment/) - they have their own logic. - - Args: - source_files: Dictionary of source files (already filtered by file history) - workflow_id: Workflow ID - execution_id: Current execution ID - max_limit: Maximum number of files to process after all filtering - api_client: API client for database checks - logger_instance: Optional logger instance - - Returns: - Tuple of (final_files, final_count) - """ - log = logger_instance or logger - - # Step 1: Filter out active files (no cache creation yet) - filter_result = FileManagementUtils.apply_active_file_filtering( - source_files=source_files, - workflow_id=workflow_id, - execution_id=execution_id, - api_client=api_client, - logger_instance=log, - final_files_to_process=None, # No cache creation at this step - ) - - # Step 2: Apply limit to the filtered results (max files after all filtering) - limit_result = FileManagementUtils.apply_file_limit( - files=filter_result.filtered_files, max_limit=max_limit, logger_instance=log - ) - - # Step 3: Create cache entries ONLY for the final selected files - if limit_result.limited_files: - log.info( - f"Creating cache entries for {limit_result.final_count} final selected files" - ) - # Use cache-only method for race condition prevention - FileManagementUtils.create_file_cache_entries( - source_files=source_files, # Need original for file_tracking_data - files_to_cache=limit_result.limited_files, # Create cache for these files only - workflow_id=workflow_id, - execution_id=execution_id, - logger_instance=log, - ) - - # Step 4: Log statistics - FileManagementUtils.log_filtering_stats( - filtering_stats=filter_result.filtering_stats, logger_instance=log - ) - - return limit_result.limited_files, limit_result.final_count - - # IMPORTANT: Maximum file limit behavior - # The max_limit parameter specifies the maximum number of files to process - # AFTER all filtering has been applied. For example: - # - # Source: 10 files โ†’ File History: 7 files โ†’ Cache Filter: 5 files โ†’ DB Filter: 3 files - # If max_limit=4: Process 3 files (less than limit) - # If max_limit=2: Process 2 files (limited by max_limit) - # - # Example usage for different worker types: - # - # # โœ… ETL/TASK workflows (@workers/general/): - # final_files, count = FileManagementUtils.process_files_with_active_filtering( - # source_files=files, workflow_id=wf_id, execution_id=exec_id, - # max_limit=10, api_client=client - # ) - # - # # โœ… API deployments (@workers/api-deployment/): - # final_files, count = FileManagementUtils.process_files_without_active_filtering( - # source_files=files, max_limit=10 - # ) - # - # # โœ… Cleanup (ONLY for ETL/TASK workflows - API deployments don't use cache): - # uuids = FileManagementUtils.extract_provider_uuids(hash_values_of_files) - # cleaned = FileManagementUtils.cleanup_active_file_cache(uuids, workflow_id) - # - # # Custom filtering only: - # filter_result = FileManagementUtils.apply_active_file_filtering( - # source_files=files, workflow_id=wf_id, execution_id=exec_id, api_client=client - # ) - # if not filter_result.has_files(): - # # Handle case where all files are active - # - # limit_result = FileManagementUtils.apply_file_limit(filter_result.filtered_files, 5) - # final_files = limit_result.limited_files - - @staticmethod - def process_files_without_active_filtering( - source_files: dict[str, Any], - max_limit: int, - logger_instance: LoggerProtocol | None = None, - ) -> tuple[dict[str, Any], int]: - """Process files with only limit application, no active filtering. - - **IMPORTANT**: Use for API workflows (@workers/api-deployment/) that don't need - duplicate processing prevention. API deployments handle concurrency differently - and should NOT use the file_active cache pattern. - - Args: - source_files: Dictionary of source files - max_limit: Maximum number of files to process - logger_instance: Optional logger instance - - Returns: - Tuple of (final_files, final_count) - """ - log = logger_instance or logger - - log.info(f"๐Ÿ“‹ Processing {len(source_files)} files without active filtering") - - # Apply file limit only - limit_result = FileManagementUtils.apply_file_limit( - files=source_files, max_limit=max_limit, logger_instance=log - ) - - return limit_result.limited_files, limit_result.final_count From eb72752af8ff853410f4ac906855854b566cf12f Mon Sep 17 00:00:00 2001 From: ali Date: Wed, 8 Oct 2025 23:11:59 +0530 Subject: [PATCH 2/3] addressed code rabbit comments --- workers/sample.env | 2 +- workers/shared/workflow/execution/active_file_manager.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/workers/sample.env b/workers/sample.env index 7ee8c6beda..87f7fbd748 100644 --- a/workers/sample.env +++ b/workers/sample.env @@ -257,7 +257,7 @@ EXECUTION_RESULT_TTL_SECONDS=86400 EXECUTION_CACHE_TTL_SECONDS=86400 INSTANT_WF_POLLING_TIMEOUT=300 -# Active File Execution cache +# Active File Execution cache in seconds ACTIVE_FILE_CACHE_TTL=300 # ============================================================================= diff --git a/workers/shared/workflow/execution/active_file_manager.py b/workers/shared/workflow/execution/active_file_manager.py index 546d4ddcf6..1814a91106 100644 --- a/workers/shared/workflow/execution/active_file_manager.py +++ b/workers/shared/workflow/execution/active_file_manager.py @@ -32,7 +32,9 @@ def get_active_file_cache_ttl() -> int: try: ttl = int(os.environ.get("ACTIVE_FILE_CACHE_TTL", DEFAULT_ACTIVE_FILE_CACHE_TTL)) # Ensure TTL is within reasonable bounds - return min(max(ttl, 60), MAX_ACTIVE_FILE_CACHE_TTL) # Between 1 minute and 1 hour + return min( + max(ttl, 60), MAX_ACTIVE_FILE_CACHE_TTL + ) # Between 1 minute and 2 hours except (ValueError, TypeError): return DEFAULT_ACTIVE_FILE_CACHE_TTL From bb96a541d94f147abf1299fd663c6aec94250cb4 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 9 Oct 2025 08:14:43 +0530 Subject: [PATCH 3/3] optional db param for clear cache --- backend/backend/settings/base.py | 4 +++ backend/sample.env | 3 +++ backend/utils/cache_service.py | 26 ++++++++++++++++--- .../workflow_v2/file_history_helper.py | 7 +++-- workers/sample.env | 2 +- 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/backend/backend/settings/base.py b/backend/backend/settings/base.py index 307c61ee13..4dc3c36725 100644 --- a/backend/backend/settings/base.py +++ b/backend/backend/settings/base.py @@ -160,6 +160,10 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str | os.environ.get("FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND", 60 * 10) ) # 10 minutes +FILE_ACTIVE_CACHE_REDIS_DB = int( + os.environ.get("FILE_ACTIVE_CACHE_REDIS_DB", 0) +) # Redis DB for active file cache tracking + INSTANT_WF_POLLING_TIMEOUT = int( os.environ.get("INSTANT_WF_POLLING_TIMEOUT", "300") ) # 5 minutes diff --git a/backend/sample.env b/backend/sample.env index eaabb0c1be..de46da6ee6 100644 --- a/backend/sample.env +++ b/backend/sample.env @@ -200,3 +200,6 @@ RUNNER_POLLING_INTERVAL_SECONDS=2 # Default: 1800 seconds (30 minutes) # Examples: 900 (15 min), 1800 (30 min), 3600 (60 min) MIN_SCHEDULE_INTERVAL_SECONDS=1800 + +# File active cache redis db +FILE_ACTIVE_CACHE_REDIS_DB=0 diff --git a/backend/utils/cache_service.py b/backend/utils/cache_service.py index 55cbb758da..a649eca5ee 100644 --- a/backend/utils/cache_service.py +++ b/backend/utils/cache_service.py @@ -43,12 +43,18 @@ def clear_cache(key_pattern: str) -> Any: cache.delete_pattern(key_pattern) @staticmethod - def clear_cache_optimized(key_pattern: str) -> Any: + def clear_cache_optimized(key_pattern: str, db: int | None = None) -> Any: """Delete keys in bulk using optimized SCAN approach for large datasets. Uses Redis SCAN instead of KEYS to avoid blocking Redis during deletion. Safe for production with large key sets. Use this for heavy operations like workflow history clearing. + + Args: + key_pattern: Pattern to match keys for deletion (e.g., "file_active:*") + db: Optional Redis database number. If provided, clears from that specific DB. + If None, uses the default Django cache connection (typically DB 0). + Use db=1 for worker cache entries (file_active:*, etc.) """ TIMEOUT_SECONDS = 90 # Generous but bounded timeout BATCH_SIZE = 1000 @@ -58,6 +64,20 @@ def clear_cache_optimized(key_pattern: str) -> Any: cursor = 0 completed_naturally = False + # Use specified DB or default connection + if db is not None: + import redis + + redis_connection = redis.Redis( + host=settings.REDIS_HOST, + port=settings.REDIS_PORT, + db=db, + username=getattr(settings, "REDIS_USER", None), + password=getattr(settings, "REDIS_PASSWORD", None), + ) + else: + redis_connection = redis_cache + try: while True: # Check timeout first @@ -69,13 +89,13 @@ def clear_cache_optimized(key_pattern: str) -> Any: break # SCAN returns (next_cursor, keys_list) - cursor, keys = redis_cache.scan( + cursor, keys = redis_connection.scan( cursor=cursor, match=key_pattern, count=BATCH_SIZE ) if keys: # Delete keys in pipeline for efficiency - pipe = redis_cache.pipeline() + pipe = redis_connection.pipeline() for key in keys: pipe.delete(key) pipe.execute() diff --git a/backend/workflow_manager/workflow_v2/file_history_helper.py b/backend/workflow_manager/workflow_v2/file_history_helper.py index df203b8b7c..0929f1557b 100644 --- a/backend/workflow_manager/workflow_v2/file_history_helper.py +++ b/backend/workflow_manager/workflow_v2/file_history_helper.py @@ -2,6 +2,7 @@ from datetime import timedelta from typing import Any +from django.conf import settings from django.db.models import Q from django.db.utils import IntegrityError from django.utils import timezone @@ -335,9 +336,11 @@ def clear_history_for_workflow( pattern = f"file_active:{workflow.id}:*" try: - CacheService.clear_cache_optimized(pattern) + # Workers store file_active:* cache in Redis DB (FILE_ACTIVE_CACHE_REDIS_DB) + DB = settings.FILE_ACTIVE_CACHE_REDIS_DB + CacheService.clear_cache_optimized(pattern, db=DB) logger.info( - f"Cleared Redis cache entries for workflow {workflow.id} with pattern: {pattern}" + f"Cleared Redis cache entries (DB {DB}) for workflow {workflow.id} with pattern: {pattern}" ) except Exception as e: logger.warning( diff --git a/workers/sample.env b/workers/sample.env index 87f7fbd748..f84bb5e6ef 100644 --- a/workers/sample.env +++ b/workers/sample.env @@ -76,7 +76,7 @@ REDIS_DB=0 CACHE_REDIS_ENABLED=true CACHE_REDIS_HOST=unstract-redis CACHE_REDIS_PORT=6379 -CACHE_REDIS_DB=1 +CACHE_REDIS_DB=0 CACHE_REDIS_PASSWORD= CACHE_REDIS_USERNAME= CACHE_REDIS_SSL=false