Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/backend/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def get_required_setting(setting_key: str, default: str | None = None) -> str |
os.environ.get("FILE_EXECUTION_TRACKER_COMPLETED_TTL_IN_SECOND", 60 * 10)
) # 10 minutes

FILE_ACTIVE_CACHE_REDIS_DB = int(
os.environ.get("FILE_ACTIVE_CACHE_REDIS_DB", 0)
) # Redis DB for active file cache tracking

INSTANT_WF_POLLING_TIMEOUT = int(
os.environ.get("INSTANT_WF_POLLING_TIMEOUT", "300")
) # 5 minutes
Expand Down
3 changes: 3 additions & 0 deletions backend/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,6 @@ RUNNER_POLLING_INTERVAL_SECONDS=2
# Default: 1800 seconds (30 minutes)
# Examples: 900 (15 min), 1800 (30 min), 3600 (60 min)
MIN_SCHEDULE_INTERVAL_SECONDS=1800

# File active cache redis db
FILE_ACTIVE_CACHE_REDIS_DB=0
26 changes: 23 additions & 3 deletions backend/utils/cache_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ def clear_cache(key_pattern: str) -> Any:
cache.delete_pattern(key_pattern)

@staticmethod
def clear_cache_optimized(key_pattern: str) -> Any:
def clear_cache_optimized(key_pattern: str, db: int | None = None) -> Any:
"""Delete keys in bulk using optimized SCAN approach for large datasets.

Uses Redis SCAN instead of KEYS to avoid blocking Redis during deletion.
Safe for production with large key sets. Use this for heavy operations
like workflow history clearing.

Args:
key_pattern: Pattern to match keys for deletion (e.g., "file_active:*")
db: Optional Redis database number. If provided, clears from that specific DB.
If None, uses the default Django cache connection (typically DB 0).
Use db=1 for worker cache entries (file_active:*, etc.)
"""
TIMEOUT_SECONDS = 90 # Generous but bounded timeout
BATCH_SIZE = 1000
Expand All @@ -58,6 +64,20 @@ def clear_cache_optimized(key_pattern: str) -> Any:
cursor = 0
completed_naturally = False

# Use specified DB or default connection
if db is not None:
import redis

redis_connection = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=db,
username=getattr(settings, "REDIS_USER", None),
password=getattr(settings, "REDIS_PASSWORD", None),
)
else:
redis_connection = redis_cache

try:
while True:
# Check timeout first
Expand All @@ -69,13 +89,13 @@ def clear_cache_optimized(key_pattern: str) -> Any:
break

# SCAN returns (next_cursor, keys_list)
cursor, keys = redis_cache.scan(
cursor, keys = redis_connection.scan(
cursor=cursor, match=key_pattern, count=BATCH_SIZE
)

if keys:
# Delete keys in pipeline for efficiency
pipe = redis_cache.pipeline()
pipe = redis_connection.pipeline()
for key in keys:
pipe.delete(key)
pipe.execute()
Expand Down
50 changes: 5 additions & 45 deletions backend/workflow_manager/internal_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import logging
import uuid

from django.core.cache import cache
from django.db import transaction
from django.shortcuts import get_object_or_404
from django.utils import timezone
Expand Down Expand Up @@ -731,10 +730,10 @@ def post(self, request):
# Check for files in PENDING or EXECUTING state in other workflow executions
active_files = {} # {uuid: [execution_data]} - legacy format
active_identifiers = set() # Composite identifiers for new format
cache_hits = 0
db_queries = 0

# Step 1: Check cache for all files and separate files that need database queries
# Prepare all files for database check
# Note: Workers already check cache before calling this API, so no need to check again
files_needing_db_check = []

for file_data in files:
Expand All @@ -744,42 +743,7 @@ def post(self, request):
f"{provider_uuid}:{file_path}" if file_path else provider_uuid
)

# 1. Check completion cache first (highest priority)
completion_key = f"file_completed:{workflow_id}:{provider_uuid}"
completion_data = cache.get(completion_key)

if completion_data:
logger.debug(
f"File {provider_uuid} found in completion cache, skipping"
)
continue # Skip - recently completed

# 2. Check active processing cache (path-aware)
cached_active = None

if file_path is not None:
# Use precise path-aware cache key
active_key = f"file_active:{workflow_id}:{provider_uuid}:{file_path}"
cached_active = cache.get(active_key)
if cached_active:
logger.debug(
f"File {provider_uuid}:{file_path} found in path-aware cache"
)
else:
# No file path available, skip cache check for files without path
cached_active = None

if cached_active:
# Verify it's not the current execution
if cached_active.get("execution_id") != current_execution_id:
# Track in both formats
active_files[provider_uuid] = [cached_active]
active_identifiers.add(composite_id)
cache_hits += 1
logger.debug(f"File {composite_id} found in active cache")
continue
Comment thread
ritwik-g marked this conversation as resolved.

# File needs database check - add to batch
# All files need database check
files_needing_db_check.append(
{
"uuid": provider_uuid,
Expand All @@ -788,7 +752,7 @@ def post(self, request):
}
)

# Step 2: Bulk database queries for all files that need database check
# Bulk database query for all files
if files_needing_db_check:
logger.info(
f"[ActiveCheck] Performing bulk database check for {len(files_needing_db_check)} files"
Expand All @@ -804,7 +768,7 @@ def post(self, request):

logger.info(
f"[ActiveCheck] Active check complete: {len(active_files)}/{len(files)} files active "
f"(cache_hits: {cache_hits}, db_queries: {db_queries})"
f"(db_queries: {db_queries})"
)

# Log final active identifiers for debugging
Expand All @@ -827,11 +791,7 @@ def post(self, request):
"total_checked": len(files),
"total_active": len(active_files),
"cache_stats": {
"cache_hits": cache_hits,
"db_queries": db_queries,
"cache_hit_rate": f"{(cache_hits / len(files) * 100):.1f}%"
if files
else "0.0%",
},
}
)
Expand Down
7 changes: 5 additions & 2 deletions backend/workflow_manager/workflow_v2/file_history_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import timedelta
from typing import Any

from django.conf import settings
from django.db.models import Q
from django.db.utils import IntegrityError
from django.utils import timezone
Expand Down Expand Up @@ -335,9 +336,11 @@ def clear_history_for_workflow(
pattern = f"file_active:{workflow.id}:*"

try:
CacheService.clear_cache_optimized(pattern)
# Workers store file_active:* cache in Redis DB (FILE_ACTIVE_CACHE_REDIS_DB)
DB = settings.FILE_ACTIVE_CACHE_REDIS_DB
CacheService.clear_cache_optimized(pattern, db=DB)
logger.info(
f"Cleared Redis cache entries for workflow {workflow.id} with pattern: {pattern}"
f"Cleared Redis cache entries (DB {DB}) for workflow {workflow.id} with pattern: {pattern}"
)
except Exception as e:
logger.warning(
Expand Down
43 changes: 42 additions & 1 deletion workers/file_processing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,43 @@ def _compile_batch_result(context: WorkflowContextData) -> dict[str, Any]:
# These functions support the refactored file processing workflow


def _cleanup_file_cache_entry(
file_hash: FileHashData,
workflow_id: str,
file_name: str,
) -> None:
"""Helper to cleanup active file cache entry after DB record creation attempt.

This should be called after attempting to create WorkflowFileExecution,
regardless of success or failure, to prevent stale cache entries from
blocking future executions.

Args:
file_hash: File hash data containing provider_file_uuid
workflow_id: Workflow ID for cache key
file_name: File name for logging
"""
if not file_hash.provider_file_uuid:
return

try:
from shared.workflow.execution.active_file_manager import (
cleanup_active_file_cache,
)

cleanup_active_file_cache(
provider_file_uuids=[file_hash.provider_file_uuid],
workflow_id=workflow_id,
logger_instance=logger,
)
logger.debug(
f"Cleaned cache for {file_name} (UUID: {file_hash.provider_file_uuid})"
)
except Exception as cleanup_error:
logger.warning(f"Cache cleanup failed for {file_name}: {cleanup_error}")
# Don't raise - cache will expire anyway


def _pre_create_file_executions(
file_data: WorkerFileData,
files: list[Any],
Expand Down Expand Up @@ -867,7 +904,7 @@ def _pre_create_file_executions(

# Use the file history flag passed from execution parameters
logger.info(
f"Using file history parameter for workflow {workflow_id} (type: {workflow_type}): use_file_history = {use_file_history}"
f"Using file history parameter for workflow {workflow_id} execution {execution_id} (type: {workflow_type}): use_file_history = {use_file_history}"
)

for file_item in files:
Expand Down Expand Up @@ -936,6 +973,10 @@ def _pre_create_file_executions(
)
# Continue with other files even if one fails

finally:
# Always cleanup cache (success or failure) to prevent stale entries
_cleanup_file_cache_entry(file_hash, workflow_id, file_name)

# File history deduplication now handled during individual file processing

return pre_created_data
Expand Down
5 changes: 4 additions & 1 deletion workers/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ REDIS_DB=0
CACHE_REDIS_ENABLED=true
CACHE_REDIS_HOST=unstract-redis
CACHE_REDIS_PORT=6379
CACHE_REDIS_DB=1
CACHE_REDIS_DB=0
CACHE_REDIS_PASSWORD=
CACHE_REDIS_USERNAME=
CACHE_REDIS_SSL=false
Expand Down Expand Up @@ -257,6 +257,9 @@ EXECUTION_RESULT_TTL_SECONDS=86400
EXECUTION_CACHE_TTL_SECONDS=86400
INSTANT_WF_POLLING_TIMEOUT=300

# Active File Execution cache in seconds
ACTIVE_FILE_CACHE_TTL=300

# =============================================================================
# Development Settings
# =============================================================================
Expand Down
8 changes: 5 additions & 3 deletions workers/shared/clients/file_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ def get_or_create_workflow_file_execution(
logger.info(
f"FileHashData debug: file_name='{file_hash_data.file_name}', "
f"has_hash={file_hash_data.has_hash()}, "
f"source_connection_type='{getattr(file_hash_data, 'source_connection_type', None)}'"
f"source_connection_type='{getattr(file_hash_data, 'source_connection_type', None)}', "
f"execution_id='{execution_id}'"
)

# CRITICAL FIX: For API files with pre-calculated hash, skip hash computation
Expand Down Expand Up @@ -219,11 +220,12 @@ def get_or_create_workflow_file_execution(
data = create_request.to_dict()

logger.info(
f"Creating workflow file execution with file_hash: {file_hash_data.file_name}"
f"Creating workflow file execution: '{execution_id}' with file_name: {file_hash_data.file_name}"
)
logger.info(
f"FileHashData key identifiers: provider_file_uuid='{file_hash_data.provider_file_uuid}', "
f"file_path='{file_hash_data.file_path}', file_hash='{file_hash_data.file_hash}'"
f"file_path='{file_hash_data.file_path}', file_hash='{file_hash_data.file_hash}', "
f"execution='{execution_id}'."
)
logger.debug(f"FileHashData: {file_hash_data.to_dict()}")

Expand Down
Loading