Process and analyze multiple file formats in parallel using Render Workflows, with automated report generation.
Batch process files from various sources (storage, uploads, APIs) and generate consolidated analytics. Common scenarios:
- Document processing pipelines
- Data ingestion from multiple formats
- Automated report generation
- File format conversion
- Content analysis and extraction
- Batch data validation
- Multi-Format Support: Process CSV, JSON, and text files
- Parallel Processing: Process multiple files concurrently with asyncio.gather()
- Automatic Analysis: Extract insights from each file type
- Report Generation: Create consolidated reports from batch results
- Error Handling: Graceful handling of missing or corrupt files
- Extensible: Easy to add support for new file formats
process_file_batch (orchestrator)
└── process_single_file (for each file, in parallel)
├── read_csv_file → analyze_csv_data
├── read_json_file → analyze_json_structure
└── read_text_file → analyze_text_content
generate_consolidated_report (final aggregation)
- Python 3.10+
# Navigate to example directory
cd file-processing
# Install dependencies
pip install -r requirements.txt
# Run the workflow service
python main.pyService Type: Workflow
Build Command:
cd file-processing && pip install -r requirements.txtStart Command:
cd file-processing && python main.pyRequired:
RENDER_API_KEY- Your Render API key (from Render dashboard)
-
Create Workflow Service
- Go to Render Dashboard
- Click "New +" → "Workflow"
- Connect your repository
- Name:
file-processing-workflows
-
Configure Build Settings
- Build Command:
cd file-processing && pip install -r requirements.txt - Start Command:
cd file-processing && python main.py
- Build Command:
-
Set Environment Variables
- Add
RENDER_API_KEYin the Environment section - Get API key from: Render Dashboard → Account Settings → API Keys
- Add
-
Deploy
- Click "Create Workflow"
- Render will build and start your workflow service
Once deployed, you can test file processing directly in the Render Dashboard:
- Go to your Workflow service in Render Dashboard
- Click the "Manual Run" or "Start Task" button
- Select the task you want to test
- Enter the task input as JSON in the text area
- Click "Start task"
Important: The file processing workflow expects an array of file paths, not a JSON object with a file_paths key.
Recommended Starting Point: Start with process_file_batch - this is the main orchestrator that processes multiple files in parallel and shows the complete workflow.
Test batch file processing:
Task: process_file_batch
Input:
[
"sample_files/sales_data.csv",
"sample_files/config.json",
"sample_files/report.txt"
]This will process all three sample files in parallel and return analysis for each.
Test single file processing:
Task: process_single_file
Input (CSV file):
{
"file_path": "sample_files/sales_data.csv"
}Input (JSON file):
{
"file_path": "sample_files/config.json"
}Input (Text file):
{
"file_path": "sample_files/report.txt"
}Test consolidated report generation:
First run process_file_batch, then use its result as input:
Task: generate_consolidated_report
Input:
{
"batch_result": {
"results": [...],
"successful": 3,
"failed": 0,
"total_files": 3
}
}Note: You'll need to copy the actual result from process_file_batch to test this task.
Tip: The Dashboard will show you processing progress for parallel operations, demonstrating how multiple files are analyzed concurrently.
Once deployed, trigger file processing via the Render API or SDK:
from render_sdk import Render
# Uses RENDER_API_KEY environment variable automatically
render = Render()
# Process a batch of files
task_run = await render.workflows.run_task(
"file-processing-workflows/process_file_batch",
{
"file_paths": [
"sample_files/sales_data.csv",
"sample_files/config.json",
"sample_files/report.txt"
]
}
)
result = await task_run
print(f"Processed {result.results['successful']}/{result.results['total_files']} files")
# Generate consolidated report
report_run = await render.workflows.run_task(
"file-processing-workflows/generate_consolidated_report",
{"batch_result": result.results}
)
report = await report_run
print(f"Report: {report.results['summary']}")The example includes sample files in sample_files/:
sales_data.csv: Sales transaction data
- 8 rows of sales data
- Columns: date, product, quantity, price, region
config.json: Configuration file
- Nested JSON structure
- Product catalog and settings
report.txt: Text report
- Multi-section text document
- Sales analysis narrative
read_csv_file: Reads CSV and returns rows as dictionaries with column metadata.
read_json_file: Parses JSON and returns data with structure information.
read_text_file: Reads text content and provides basic statistics (lines, words, chars).
analyze_csv_data: Analyzes CSV for metrics like total revenue, unique products/regions.
analyze_json_structure: Examines JSON structure, counts keys, detects nesting.
analyze_text_content: Analyzes text for keywords, sections, and content patterns.
process_single_file: Routes file to appropriate reader and analyzer based on extension.
process_file_batch: Processes multiple files in parallel using asyncio.gather().
generate_consolidated_report: Aggregates results from all files into a single report.
The key to efficient batch processing is using asyncio.gather():
@app.task
async def process_file_batch(file_paths: list[str]) -> dict:
# Launch all file processing tasks concurrently
tasks = [process_single_file(fp) for fp in file_paths]
results = await asyncio.gather(*tasks)
# Results from all files are ready
return aggregate_results(results)This processes all files simultaneously rather than sequentially, dramatically reducing total processing time.
Add New File Format:
@app.task
def read_xml_file(file_path: str) -> dict:
# Parse XML file
# Return structured data
pass
@app.task
def analyze_xml_data(xml_result: dict) -> dict:
# Analyze XML content
# Return insights
pass
# Update process_single_file to handle .xml extensionAdd Cloud Storage Integration:
@app.task
async def download_from_s3(bucket: str, key: str) -> str:
# Download file from S3
# Save to temp location
# Return local path
pass
@app.task
async def process_s3_batch(bucket: str, keys: list[str]) -> dict:
# Download files in parallel
paths = await asyncio.gather(*[download_from_s3(bucket, k) for k in keys])
# Process files
return await process_file_batch(paths)Add Database Export:
@app.task
async def export_to_database(report: dict) -> dict:
# Connect to database
# Insert report data
# Return confirmation
pass- Parallel Processing: Always use asyncio.gather() for independent operations
- Batch Size: Process files in batches of 10-50 for optimal performance
- Memory Management: For large files, consider streaming or chunked processing
- Error Isolation: One file failure shouldn't stop the entire batch
- Python-only: Workflows are only supported in Python via render-sdk
- No Blueprint Support: Workflows don't support render.yaml blueprint configuration
- File Access: In production, integrate with cloud storage (S3, GCS) or databases
- Retry Logic: All read operations include retry configuration for transient failures
- Local Paths: Sample uses local paths; adapt for your storage solution