Executor API (WIP)
Executor is the core orchestration module of the PETsARD framework. It parses YAML configuration, executes each module in workflow sequence, manages execution state, and provides result access interface.
Class Architecture
Design Philosophy
Executor follows three core design principles:
1. Single Responsibility
Executor only handles workflow orchestration, delegating specific functions to dedicated modules:
- Config: Configuration parsing and validation
- Status: State management and result storage
- Modules: Actual data processing tasks
2. Clear Interface
All interactions through Executor’s public methods:
- Construction phase:
__init__()
- Execution phase:
run()
- Result retrieval:
get_result()
,get_timing()
,is_execution_completed()
- Schema inference:
get_inferred_schema()
3. Automated Management
Users only need to provide configuration; Executor handles:
- Module loading and initialization
- Execution sequence management
- State tracking and snapshot creation
- Result collection and organization
Basic Usage
Import Module
from petsard import Executor
Create Executor Instance
# From YAML file
executor = Executor(config='path/to/config.yaml')
# From dictionary
config_dict = {
'Loader': {
'load_data': {
'filepath': 'data.csv'
}
}
}
executor = Executor(config=config_dict)
Execute Workflow
# Execute all experiments
executor.run()
# Check execution status
if executor.is_execution_completed():
print("Execution completed")
Get Results
# Get all results
results = executor.get_result()
# Get execution time report
timing_df = executor.get_timing()
Constructor (__init__
)
Creates an Executor instance and initializes configuration.
Syntax
Executor(config, verbose=True)
Parameters
Parameter | Type | Required | Default | Description |
---|---|---|---|---|
config | str or dict | Yes | - | YAML file path or configuration dictionary |
verbose | bool | No | True | Whether to display execution progress information |
Return Value
Returns an Executor instance with initialized Config and Status.
Usage Examples
Example 1: Load from YAML file
from petsard import Executor
# Create Executor
exec = Executor(config='workflow_config.yaml')
print("Configuration loaded successfully")
Example 2: Use configuration dictionary
from petsard import Executor
# Define configuration
config = {
'Loader': {
'load_csv': {
'filepath': 'data/input.csv'
}
},
'Synthesizer': {
'generate': {
'method': 'sdv',
'model': 'GaussianCopula',
'num_samples': 1000
}
}
}
# Create Executor
exec = Executor(config=config)
exec.run()
Example 3: Disable progress information
from petsard import Executor
# Create silent Executor
exec = Executor(
config='config.yaml',
verbose=False
)
exec.run()
Notes
- Configuration Validation: Config automatically validates configuration content during initialization
- Module Check: Ensures all referenced modules exist in the system
- Path Handling: Supports absolute and relative paths; relative paths resolve from current working directory
- Error Reporting: Provides detailed error messages for configuration errors
- Memory Management: Large configurations may consume more memory; consider splitting into multiple smaller workflows
Method Documentation
Executor provides the following public methods:
run()
Execute the entire workflow and generate all experiment combinations.
get_result()
Get execution results, including DataFrames and Schema for all experiments.
get_timing()
Get execution time report, displaying time spent by each module and step.
is_execution_completed()
Check if workflow execution is complete.
get_inferred_schema()
Get inferred Schema based on Loader, Preprocessor, and Postprocessor configuration.
Internal Components
Config
Responsible for configuration parsing and validation:
- Parse YAML or dictionary configuration
- Validate configuration structure and parameters
- Provide configuration query interface
See: Config API
Status
Responsible for state management and result storage:
- Record execution status of each module
- Track metadata (Schema) changes
- Create execution snapshots
- Collect execution time information
See: Status API
Workflow Execution Mechanism
1. Initialization Phase
executor = Executor(config='config.yaml')
Actions performed during this phase:
- Load and parse configuration file
- Initialize Config object
- Validate configuration content
- Initialize Status object
- Prepare module loading environment
2. Execution Phase
executor.run()
Actions performed during this phase:
- Parse module dependencies
- Determine execution sequence
- Load and initialize modules
- Create execution snapshots
- Execute module methods
- Record execution results
- Collect time information
3. Result Retrieval Phase
results = executor.get_result()
timing = executor.get_timing()
Actions performed during this phase:
- Organize execution results
- Format result structure
- Generate time report
- Provide result access interface
Multiple Experiment Management
Executor automatically handles combinations of multiple experiments:
Configuration Example
Loader:
experiment_1:
filepath: data1.csv
experiment_2:
filepath: data2.csv
Synthesizer:
method_a:
method: sdv
model: GaussianCopula
method_b:
method: sdv
model: CTGAN
Execution Flow
Generate Combinations: Executor automatically generates 4 experiment combinations:
Loader[experiment_1]
+Synthesizer[method_a]
Loader[experiment_1]
+Synthesizer[method_b]
Loader[experiment_2]
+Synthesizer[method_a]
Loader[experiment_2]
+Synthesizer[method_b]
Sequential Execution: Execute each combination in order, ensuring result independence
Result Organization: Organize results by combination, facilitating comparison
Result Retrieval
results = executor.get_result()
# Result structure
# {
# 'Loader[experiment_1]_Synthesizer[method_a]': {
# 'data': DataFrame,
# 'schema': Schema
# },
# 'Loader[experiment_1]_Synthesizer[method_b]': {...},
# 'Loader[experiment_2]_Synthesizer[method_a]': {...},
# 'Loader[experiment_2]_Synthesizer[method_b]': {...}
# }
Error Handling
Executor provides comprehensive error handling mechanisms:
Configuration Errors
try:
executor = Executor(config='invalid_config.yaml')
except Exception as e:
print(f"Configuration error: {e}")
Common configuration errors:
- File not found
- Invalid YAML format
- Missing required fields
- Invalid parameter values
Execution Errors
executor = Executor(config='config.yaml')
try:
executor.run()
except Exception as e:
print(f"Execution error: {e}")
# Check which module failed
print(f"Execution status: {executor.is_execution_completed()}")
Common execution errors:
- Module loading failure
- Data processing errors
- Insufficient memory
- File I/O errors
Performance Optimization
Memory Management
# For large datasets, process in batches
executor = Executor(config='config.yaml')
executor.run()
# Get results and clear memory immediately
results = executor.get_result()
del executor # Release memory
Parallel Execution
For independent experiments, consider parallel execution:
from concurrent.futures import ProcessPoolExecutor
def run_experiment(config_file):
executor = Executor(config=config_file)
executor.run()
return executor.get_result()
# Execute multiple experiments in parallel
with ProcessPoolExecutor() as executor:
configs = ['config1.yaml', 'config2.yaml', 'config3.yaml']
results = executor.map(run_experiment, configs)
Best Practices
1. Configuration Management
# Use version control for configuration files
config_v1 = 'configs/workflow_v1.yaml'
config_v2 = 'configs/workflow_v2.yaml'
# Add descriptive information to configuration
config = {
'metadata': {
'version': '1.0',
'author': 'Data Team',
'description': 'Production synthesis workflow'
},
'Loader': {...}
}
2. Result Validation
executor = Executor(config='config.yaml')
executor.run()
# Check execution status
if not executor.is_execution_completed():
raise RuntimeError("Execution incomplete")
# Validate results
results = executor.get_result()
for exp_name, result in results.items():
assert 'data' in result, f"Missing data: {exp_name}"
assert 'schema' in result, f"Missing schema: {exp_name}"
3. Logging
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Create Executor
executor = Executor(config='config.yaml', verbose=True)
executor.run()
# Check execution time
timing = executor.get_timing()
print(timing)
Common Patterns
Pattern 1: Data Processing Pipeline
# Load → Preprocess → Synthesize → Evaluate
config = {
'Loader': {'load': {'filepath': 'data.csv'}},
'Preprocessor': {'clean': {'method': 'default'}},
'Synthesizer': {'generate': {'method': 'sdv'}},
'Evaluator': {'assess': {'metrics': ['statistical', 'privacy']}}
}
executor = Executor(config=config)
executor.run()
results = executor.get_result()
Pattern 2: Comparative Experiments
# Compare multiple synthesis methods
config = {
'Loader': {'load': {'filepath': 'data.csv'}},
'Synthesizer': {
'sdv_gc': {'method': 'sdv', 'model': 'GaussianCopula'},
'sdv_ctgan': {'method': 'sdv', 'model': 'CTGAN'},
'sdv_tvae': {'method': 'sdv', 'model': 'TVAE'}
},
'Evaluator': {'eval': {'metrics': ['quality_report']}}
}
executor = Executor(config=config)
executor.run()
# Compare results of different methods
results = executor.get_result()
for exp_name, result in results.items():
print(f"{exp_name}: {len(result['data'])} records")
Pattern 3: Batch Processing
# Process multiple datasets
datasets = ['data1.csv', 'data2.csv', 'data3.csv']
for dataset in datasets:
config = {
'Loader': {'load': {'filepath': dataset}},
'Synthesizer': {'generate': {'method': 'sdv'}},
'Reporter': {'save': {'output_dir': f'output/{dataset}'}}
}
executor = Executor(config=config, verbose=False)
executor.run()
print(f"Completed: {dataset}")
Integration with Other Modules
With Loader
# Executor automatically calls Loader's load_data method
config = {
'Loader': {
'load_csv': {
'filepath': 'data.csv',
'delimiter': ','
}
}
}
executor = Executor(config=config)
executor.run()
# Get loaded data
results = executor.get_result()
data = results['Loader[load_csv]']['data']
With Synthesizer
# Executor automatically passes data and schema to Synthesizer
config = {
'Loader': {'load': {'filepath': 'data.csv'}},
'Synthesizer': {
'generate': {
'method': 'sdv',
'model': 'GaussianCopula',
'num_samples': 1000
}
}
}
executor = Executor(config=config)
executor.run()
# Get synthesized data
results = executor.get_result()
synthetic_data = results['Loader[load]_Synthesizer[generate]']['data']
With Evaluator
# Executor automatically passes original and synthetic data to Evaluator
config = {
'Loader': {'load': {'filepath': 'data.csv'}},
'Synthesizer': {'generate': {'method': 'sdv'}},
'Evaluator': {
'assess': {
'metrics': ['statistical_similarity', 'privacy_metrics']
}
}
}
executor = Executor(config=config)
executor.run()
# Get evaluation results
results = executor.get_result()
eval_result = results['Loader[load]_Synthesizer[generate]_Evaluator[assess]']
Advanced Features
Custom Module Integration
# Register custom module (advanced usage)
from petsard import Executor
from my_custom_module import MyCustomModule
# Configure custom module
config = {
'Loader': {'load': {'filepath': 'data.csv'}},
'MyCustomModule': {
'process': {
'param1': 'value1',
'param2': 'value2'
}
}
}
executor = Executor(config=config)
executor.run()
Dynamic Configuration
# Generate configuration dynamically
def create_config(data_path, synthesis_method):
return {
'Loader': {
'load': {'filepath': data_path}
},
'Synthesizer': {
'generate': {'method': synthesis_method}
}
}
# Use dynamic configuration
config = create_config('data.csv', 'sdv')
executor = Executor(config=config)
executor.run()
Notes
- Single Instance: Each Executor instance manages one workflow; don’t reuse across workflows
- Execution Order: Must call
run()
beforeget_result()
andget_timing()
- Configuration Immutability: Cannot modify configuration after Executor creation
- Thread Safety: Executor is not thread-safe; use separate instances in multi-threaded environments
- Resource Management: Large-scale experiments may consume significant memory and time; plan resources carefully
- Error Recovery: Currently does not support resuming from breakpoints; need to re-execute entire workflow upon failure