Metadata-Version: 2.4
Name: pipetee
Version: 0.3.0
Summary: A pipeline framework for processing data
Author-email: Sam Yu <yudataguy@gmail.com>
License-Expression: MIT
License-File: LICENSE
Classifier: Development Status :: 3 - Alpha
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Requires-Python: >=3.8
Requires-Dist: pydantic>=2.0.0
Provides-Extra: dev
Requires-Dist: black>=23.0.0; extra == 'dev'
Requires-Dist: isort>=5.0.0; extra == 'dev'
Requires-Dist: mypy>=1.0.0; extra == 'dev'
Requires-Dist: pytest-asyncio>=0.23.0; extra == 'dev'
Requires-Dist: pytest-cov>=4.0.0; extra == 'dev'
Requires-Dist: pytest>=7.0.0; extra == 'dev'
Requires-Dist: ruff>=0.0.290; extra == 'dev'
Provides-Extra: viz
Requires-Dist: graphviz>=0.20.0; extra == 'viz'
Requires-Dist: matplotlib>=3.7.0; extra == 'viz'
Description-Content-Type: text/markdown

# Pipeline-Tee

[![PyPI version](https://badge.fury.io/py/pipetee.svg)](https://badge.fury.io/py/pipetee)

A powerful and flexible Python pipeline processing framework with advanced flow control, parallel execution, visualization, and async support.

## Features

- 🔄 **Flexible Pipeline Processing**: Build complex data processing pipelines with branching and conditional execution
- ⚡ **Async Support**: Built with asyncio for efficient asynchronous processing
- 🚀 **Parallel Execution**: True concurrent processing with 1-to-N-to-1 patterns (Fan Out/Fan In)
  - **Selective Parallelism**: Choose which stages run in parallel vs sequential
  - **Multiple Parallel Groups**: Mix parallel and sequential execution in the same pipeline
  - **Performance Gains**: 30-50% faster execution for independent tasks
  - **Concurrent I/O**: Ideal for API calls, database operations, file processing
- 📊 **Visualization**: Rich pipeline visualization with graphviz and matplotlib
  - Generate pipeline structure diagrams showing stages and connections
  - Create execution timeline plots with timing information
  - Track stage status with color-coded nodes and status icons
  - **Parallel execution visualization** with overlapping timelines
  - Export as PNG, SVG, or PDF files
- 🔀 **Advanced Flow Control**: Support for branching, skipping, jumping between stages
- 📝 **Comprehensive Logging**: Detailed logging with configurable levels
- 🔍 **State Tracking**: Pipeline state management and execution metrics
- 🎯 **Post-Processing**: Add post-processors to modify stage outputs
- ⚠️ **Error Handling**: Robust error handling and propagation

## Requirements

- Python 3.8+
- Optional visualization dependencies:
  - graphviz (for pipeline structure diagrams)
  - matplotlib (for execution timeline plots)

## Project Structure

```text
pipetee/
├── src/
│   └── pipetee/
│       ├── __init__.py
│       ├── pipeline.py          # Core pipeline implementation
│       ├── models/
│       │   ├── __init__.py
│       │   ├── stage.py        # Stage models and decisions
│       │   └── visual.py       # Visualization models
│       └── utils/
│           ├── logging_config.py
│           └── visualization.py # Visualization utilities
├── tests/
│   ├── test_pipeline.py
│   ├── test_visual.py
│   └── test_models.py
├── examples/
│   ├── parallel_pipeline_demo.py      # NEW: 1-to-N-to-1 parallel pattern
│   ├── mixed_parallel_demo.py         # NEW: Mixed parallel/sequential
│   ├── complex_pipeline_demo.py
│   └── complex_branching_pipeline.py
└── pyproject.toml
```

## Installation

You can install Pipeline-Tee directly from PyPI:

```bash
# Install core package
pip install pipetee

# Install with visualization dependencies
pip install pipetee[viz]
```

For development installation:

```bash
# Clone the repository
git clone https://github.com/yudataguy/pipeline-tee.git
cd pipeline-tee

# Install in development mode
pip install -e .

# Install with visualization dependencies
pip install -e ".[viz]"
```

## Basic Usage

### Sequential Pipeline (Traditional)

```python
from pipetee.pipeline import Pipeline, PipelineStage, StageResult
from pipetee.models.stage import StageDecision
from pipetee.utils.visualization import save_pipeline_visualization

# Define a custom pipeline stage
class DataProcessingStage(PipelineStage[str, str]):
    async def process(self, data: str) -> StageResult[str]:
        processed_data = data.upper()
        return StageResult(
            success=True,
            data=processed_data,
            decision=StageDecision.CONTINUE
        )

# Create and configure pipeline
pipeline = Pipeline()
pipeline.add_stage("process_data", DataProcessingStage())

# Process data
result = await pipeline.process("hello world")

# Save visualization diagrams
structure_path, timeline_path = save_pipeline_visualization(
    pipeline,
    output_dir="pipeline_viz",
    base_name="my_pipeline",
    format="png"  # or "svg" or "pdf"
)

print(f"Pipeline structure saved to: {structure_path}")
print(f"Execution timeline saved to: {timeline_path}")
```

### Parallel Pipeline (NEW!) ⚡

```python
import asyncio
from pipetee.pipeline import Pipeline, PipelineStage, StageResult
from pipetee.models.stage import StageDecision

# Stage that triggers parallel execution
class DataPreparationStage(PipelineStage):
    async def process(self, data):
        # Prepare data for parallel processing
        prepared_data = {"input": data, "timestamp": "2024-01-01"}

        # Trigger parallel execution of multiple stages
        return StageResult(
            success=True,
            data=prepared_data,
            decision=StageDecision.PARALLEL_TO,
            next_stage="text_analyzer,image_processor,data_validator"  # Run in parallel
        )

# Parallel stage 1: Text Analysis
class TextAnalyzerStage(PipelineStage):
    async def process(self, data):
        await asyncio.sleep(1.0)  # Simulate text processing
        return StageResult(success=True, data={"stage": "text_analyzer", "result": "analyzed"})

# Parallel stage 2: Image Processing
class ImageProcessorStage(PipelineStage):
    async def process(self, data):
        await asyncio.sleep(1.5)  # Simulate image processing
        return StageResult(success=True, data={"stage": "image_processor", "result": "processed"})

# Parallel stage 3: Data Validation
class DataValidatorStage(PipelineStage):
    async def process(self, data):
        await asyncio.sleep(0.8)  # Simulate validation
        return StageResult(success=True, data={"stage": "data_validator", "result": "valid"})

# Aggregation stage (N-to-1)
class ResultAggregatorStage(PipelineStage):
    async def process(self, parallel_results):
        # parallel_results is a list of results from all parallel stages
        aggregated = {"parallel_results": parallel_results, "status": "aggregated"}
        return StageResult(success=True, data=aggregated)

# Create pipeline with parallel execution
pipeline = Pipeline()
pipeline.add_stage("data_preparation", DataPreparationStage())
pipeline.add_stage("text_analyzer", TextAnalyzerStage())        # Parallel group
pipeline.add_stage("image_processor", ImageProcessorStage())    # Parallel group
pipeline.add_stage("data_validator", DataValidatorStage())      # Parallel group
pipeline.add_stage("result_aggregator", ResultAggregatorStage()) # Sequential

# Execute pipeline
result = await pipeline.process({"data": "sample input"})

# Pipeline flow:
# data_preparation → [text_analyzer + image_processor + data_validator] → result_aggregator
#                    ↑ These 3 stages run concurrently ↑
```

## Parallel Execution Patterns

### 1-to-N-to-1 (Fan Out/Fan In)

The most common parallel pattern where one stage distributes work to multiple parallel stages, then aggregates the results:

```python
# Sequential Stage → [Parallel Stage A + Parallel Stage B + Parallel Stage C] → Aggregation Stage
```

**Benefits:**
- **Concurrent I/O**: Database calls, API requests, file operations run simultaneously
- **Independent Processing**: Text analysis + image processing + data validation
- **Performance**: 30-50% faster execution for suitable workloads

### Selective Parallelism

Not all stages need to be parallel. Mix and match based on your needs:

```python
# Pipeline with mixed execution:
# Init → Analysis → [Parallel Group 1] → Aggregation → Middle → [Parallel Group 2] → Final
```

**Use Parallel When:**
- ✅ Tasks are independent and can run concurrently
- ✅ I/O-bound operations (API calls, database queries)
- ✅ CPU-intensive computations that can be parallelized
- ✅ Different transformations on the same input data

**Stay Sequential When:**
- ⚠️ Stage B depends on Stage A's output
- ⚠️ Shared resource modifications (database updates)
- ⚠️ Setup/teardown operations
- ⚠️ Very fast operations (<100ms) where parallel overhead exceeds benefits

## Visualization Examples

The pipeline visualization utilities can generate two types of diagrams:

1. **Pipeline Structure** (using graphviz):
   - Shows stages and their connections
   - Color-coded nodes based on stage status
   - Status icons (🔄 pending, ⚡ running, ✅ completed, ⏭️ skipped, ❌ failed)
   - Default sequence and conditional branching paths
   - **NEW**: Parallel execution groups clearly marked

2. **Execution Timeline** (using matplotlib):
   - Shows execution flow over time
   - Color-coded bars for stage duration
   - Start and end times for each stage
   - Status icons and stage names
   - Time-based x-axis
   - **NEW**: Overlapping bars show parallel execution

To generate visualizations, use the `save_pipeline_visualization` function from `pipetee.utils.visualization`. Make sure to install the visualization dependencies with `pip install -e ".[viz]"`.

## Advanced Features

### Branching and Conditions

```python
from pipetee.pipeline import Condition

# Add branching condition
condition = Condition("is_valid", lambda x: len(x) > 5)
stage.add_branch_condition(condition, "validation_stage")

# Add skip condition
stage.add_skip_condition(Condition("skip_empty", lambda x: not x))
```

### Parallel Branch Conditions (NEW!)

```python
# Add parallel branching condition
parallel_condition = Condition("high_volume", lambda x: x.get("volume", 0) > 1000)
stage.add_parallel_branch_condition(parallel_condition, ["heavy_processor_1", "heavy_processor_2", "heavy_processor_3"])
```

### Post-Processing

```python
# Add post-processor to a stage
stage.add_post_processor(lambda x: x.strip())
```

### Performance Monitoring

```python
# The pipeline automatically tracks execution metrics
result = await pipeline.process(data)

if result.metadata:
    execution_path = result.metadata.get('execution_path', [])
    duration = result.metadata.get('pipeline_end') - result.metadata.get('pipeline_start')
    print(f"Executed stages: {execution_path}")
    print(f"Total time: {duration.total_seconds():.2f} seconds")
```

## Examples

Check out the `examples/` directory for comprehensive demonstrations:

- **`parallel_pipeline_demo.py`**: 1-to-N-to-1 parallel execution with performance analysis
- **`mixed_parallel_demo.py`**: Mixed parallel and sequential execution patterns
- **`visualization_demo.py`**: Pipeline visualization features
- **`complex_pipeline_demo.py`**: Complex data processing workflows
- **`complex_branching_pipeline.py`**: Advanced flow control features

Run any example:

```bash
# Parallel execution demos
python examples/parallel_pipeline_demo.py
python examples/mixed_parallel_demo.py

# Traditional demos
python examples/visualization_demo.py
python examples/complex_pipeline_demo.py
```

## Development

1. Clone the repository
2. Install development dependencies:
   ```bash
   python -m pip install -e ".[dev,viz]"
   ```
3. Run tests:
   ```bash
   pytest
   ```

## Performance Benchmarks

Example performance improvements with parallel execution:

```
Traditional Sequential Pipeline: 4.9 seconds
├── Data Processing: 1.5s
├── Text Analysis: 1.0s
├── Image Processing: 1.2s
└── Validation: 0.8s

Parallel Pipeline: 3.1 seconds (37% faster)
├── Data Processing: 1.5s
└── [Text + Image + Validation]: 1.2s (concurrent)
```

**Real-world scenarios where parallel execution excels:**
- API data aggregation from multiple sources
- Batch processing of files with different operations
- ML pipeline with parallel feature extraction
- Data validation + transformation + enrichment workflows

MIT License
