Metadata-Version: 2.4
Name: cy-python
Version: 0.1.0
Summary: Python equivalent of Cluster Yield CI's Scala plan-extractor
License: Apache-2.0
License-File: LICENSE
Keywords: spark,pyspark,ci,plan-extraction
Author: Kerim Tricic
Author-email: kerim.tricic@dualsharks.com
Requires-Python: >=3.10,<4
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3.13
Classifier: Programming Language :: Python :: 3.14
Classifier: Topic :: Software Development :: Quality Assurance
Requires-Dist: pyspark (>=3.3,<4)
Project-URL: Homepage, https://github.com/clusteryieldanalytics/cy-py
Project-URL: Repository, https://github.com/clusteryieldanalytics/cy-py
Description-Content-Type: text/markdown

# cy-py — Cluster Yield CI for Python

The Python equivalent of the [Scala plan-extractor](https://github.com/clusteryieldanalytics/cluster-yield-ci/tree/main/plan-extractor).
Extracts Spark physical plans from PySpark pipelines so Cluster Yield CI
can diff them on every PR — same JSON format, same deterministic Spark
config, same GitHub Action.

## 30-Second Setup

### 1. Add the dependency

**Poetry:**
```bash
poetry add --group dev cy-python
```

**pip:**
```
# requirements.txt or pyproject.toml
cy-python
```

### 2. Extend `SparkPipeline` in your pipeline

```python
from cy_py import SparkPipeline
from pyspark.sql import DataFrame, SparkSession

# Just add SparkPipeline as a base class — nothing else changes
class DailyOrderSummary(SparkPipeline):
    def build(self, spark: SparkSession) -> DataFrame:
        # ... your existing pipeline code ...
```

### 3. Add the GitHub Action

```yaml
# .github/workflows/spark-plan-check.yml
name: Spark Plan Check
on:
  pull_request:
    branches: [main]
jobs:
  plan-check:
    runs-on: ubuntu-latest
    permissions:
      pull-requests: write
      contents: read
    steps:
      - uses: actions/setup-python@v5
        with: { python-version: '3.12' }
      - uses: actions/setup-java@v4
        with: { java-version: '11', distribution: 'temurin' }
      - uses: clusteryieldanalytics/cluster-yield-ci@v1
        with:
          pipeline-class: my_package.pipelines:DailyOrderSummary
          github-token: ${{ secrets.GITHUB_TOKEN }}
```

**That's it.** The action detects your Python build tool (Poetry or pip),
installs dependencies, calls your `build()` method, extracts the plan,
diffs it against the baseline from `main`, and posts findings on your PR.

## How It Works

The `pipeline-class` input takes a Python import path. Under the hood:

1. The action detects your build tool (`pyproject.toml` with Poetry, or pip)
2. Installs your project (`poetry install` or `pip install -e .`)
3. Runs `cy-py my_package.pipelines:DailyOrderSummary` on both branches
4. `cy-py` imports your class, calls `build(spark)`, exports the physical plan as JSON
5. `cluster-yield-ci` diffs the plans and posts findings

## Scala ↔ Python Mapping

cy-py is a 1-1 port of the Scala library. Every component has a direct equivalent:

| Scala | Python | Purpose |
|-------|--------|---------|
| `SparkPipeline` trait | `SparkPipeline` ABC | Contract: `build(spark) -> DataFrame` |
| `PlanExtractor.exportPlan()` | `export_plan()` | Export physical plan as JSON |
| `PlanExtractor.exportCatalog()` | `export_catalog()` | Export table statistics |
| `PlanExtractor.createExtractionSession()` | `create_extraction_session()` | Deterministic SparkSession for CI |
| `ExtractRunner` (reflection) | `extract_runner` (import + getattr) | CLI that resolves and invokes pipelines |
| `com.example.MyPipeline` | `my_package.module:MyPipeline` | Pipeline identification syntax |
| `MyPipeline#run` | `my_package.module:MyPipeline#run` | Custom method override |

The output JSON is identical — `plan.toJSON()` from Spark's JVM, so the
same analyzer works for both Scala and Python pipelines.

## API

### `SparkPipeline` base class

The contract your pipeline implements. Just one method:

```python
from cy_py import SparkPipeline

class MyPipeline(SparkPipeline):
    def build(self, spark: SparkSession) -> DataFrame:
        ...
```

The base class is optional — `cy-py` also works with any class that has a
`build(self, spark)` method, or even a plain function. The base class
provides IDE autocompletion and type checking.

### Custom method names

If your method isn't called `build`, append `#methodName`:

```yaml
pipeline-class: my_package.pipelines:MyPipeline#run
```

### Direct usage

For advanced workflows (multiple pipelines, custom Spark configs):

```python
from cy_py import create_extraction_session, export_plan, export_catalog

spark = create_extraction_session()
df = MyPipeline().build(spark)
export_plan(df, "plans/current.json")
export_catalog(spark, ["db.orders", "db.customers"], "plans/catalog.json")
```

| Function | Description |
|----------|-------------|
| `export_plan(df, path)` | Export physical plan as JSON (does not execute the query) |
| `export_catalog(spark, tables, path)` | Export catalog statistics for `--catalog` flag |
| `create_extraction_session()` | SparkSession with deterministic CI settings |

### Deterministic Spark settings

`create_extraction_session()` configures Spark for reproducible plans:

- `spark.sql.adaptive.enabled = false` — pre-AQE plan reflects code, not data
- `spark.sql.autoBroadcastJoinThreshold = -1` — simulates production table sizes
- `local[1]` master, UI disabled, 200 shuffle partitions

These match the Scala extractor exactly, so plans are comparable across languages.

## CLI

```bash
# Standard (method defaults to "build")
cy-py my_package.pipelines:DailyOrderSummary

# Custom output path
cy-py my_package.pipelines:DailyOrderSummary plans/current.json

# Custom method name
cy-py my_package.pipelines:DailyOrderSummary#run plans/current.json
```

Outputs `plans/current.json` (plan array) and `plans/current.txt` (human-readable tree).

## Compatibility

| PySpark | Python | Java | Status |
|---------|--------|------|--------|
| 3.3.x   | 3.10+  | 8+   | Supported |
| 3.4.x   | 3.10+  | 8+   | Supported |
| 3.5.x   | 3.10+  | 11+  | Supported |

PySpark is pinned to `>=3.3,<4`. Uses only stable Spark APIs via the JVM bridge.

