Metadata-Version: 2.4
Name: distributed-pg-lock
Version: 0.1.1
Summary: A robust distributed locking system using PostgreSQL for coordinating resource access across distributed systems with automatic heartbeating, timeout management, and stale lock recovery.
Home-page: https://github.com/surendrahingane/distributed-pg-lock
Author: Surendra Sahadeo Hingane
Author-email: surendrahingne@gmail.com
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3.8
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: sqlalchemy>=1.4
Requires-Dist: tenacity>=8.0
Provides-Extra: test
Requires-Dist: pytest; extra == "test"
Requires-Dist: psycopg2-binary; extra == "test"
Dynamic: classifier
Dynamic: license-file
Dynamic: provides-extra
Dynamic: requires-dist

# Distributed PostgreSQL Lock

[![PyPI version](https://img.shields.io/pypi/v/distributed-pg-lock)](https://pypi.org/project/distributed-pg-lock/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

A robust distributed locking system using PostgreSQL for coordinating resource access across distributed systems with automatic heartbeating, timeout management, and stale lock recovery.

## Features

- **Distributed Locking**: Coordinate resources across multiple processes/machines
- **Heartbeat Monitoring**: Built-in keep-alive mechanism prevents premature lock expiration for long operations
- **Timeout Handling**: Configurable lock duration with automatic release
- **Stale Lock Recovery**: Self-healing system cleans up abandoned locks
- **Context Manager Support**: Pythonic `with` syntax for automatic acquisition/release
- **Cross-Platform**: Works anywhere with PostgreSQL (Kubernetes, cloud, on-premise)
- **High Availability**: Database-backed persistence ensures lock integrity
- **Customizable Timeouts**: Fine-tune lock durations from seconds to hours
- **Owner Identification**: Track lock ownership across distributed systems
- **Graceful Shutdown**: Automatic lock release on process termination

## Key Use Cases

1. **Critical Section Protection**: Safeguard shared resources in microservices architectures
2. **Distributed Cron Jobs**: Prevent duplicate execution across multiple nodes
3. **Resource Pool Management**: Coordinate access to limited resources (APIs, devices, files)
4. **Leader Election**: Implement master-node selection in clusters
5. **Transaction Sequencing**: Enforce ordered processing in queue systems
6. **Database Migration Coordination**: Safely execute schema changes across replicas

### Real-World Use Cases
#### 1. Distributed Script Execution
Ensure critical scripts run on only one pod in Kubernetes clusters:
```python
with lock_manager.get_lock("db_maintenance_script"):
    if lock.is_acquired:
        run_database_maintenance()  # Executes on single pod only
```

#### 2. Financial Transaction Processing
Prevent duplicate payments in microservices architectures:
```python
def process_payment(txn_id):
    lock = lock_manager.get_lock(f"payment_{txn_id}")
    if lock.is_acquired:
        charge_credit_card(txn_id)  # Exclusive processing
```

#### 3. Inventory Reservation Systems
Prevent overselling during flash sales:
```python
def reserve_item(item_id):
    with lock_manager.get_lock(f"inventory_{item_id}"):
        if lock.is_acquired and check_stock(item_id) > 0:
            reduce_inventory(item_id)
```

#### 4. Database Migration Coordination
Safely execute schema changes in clustered environments:
```python
lock = lock_manager.get_lock("schema_migration_v2")
if lock.is_acquired:
    execute_migration()  # Single execution guarantee
```

#### 5. IoT Command Sequencing
Serialize commands to smart devices:
```python
def send_device_command(device_id, command):
    with lock_manager.get_lock(f"device_{device_id}"):
        if lock.is_acquired:
            device.send(command)  # No command collisions
```

#### 6. Leader Election
Implement master node selection:
```python
def elect_leader():
    lock = lock_manager.get_lock("cluster_leader")
    if lock.is_acquired:
        start_leader_processes()  # Only one node becomes leader
```

#### 7. Batch Job Coordination
Prevent duplicate cron job execution:
```python
def run_nightly_report():
    lock = lock_manager.get_lock("nightly_report_job")
    if lock.is_acquired:
        generate_reports()  # Runs once across entire cluster
```

#### 8. Resource Pool Management
Coordinate limited API quotas:
```python
def call_rate_limited_api():
    with lock_manager.get_lock("api_quota_slot"):
        if lock.is_acquired:
            make_api_call()  # Controlled concurrency
```

#### 9. ETL Pipeline Synchronization
Ensure ordered data processing:
```python
def process_data_chunk(chunk_id):
    lock = lock_manager.get_lock(f"chunk_{chunk_id}")
    if lock.is_acquired:
        transform_and_load(chunk_id)  # Sequential processing
```

#### 10. CI/CD Deployment Locking
Prevent concurrent deployments:
```python
def deploy_to_production():
    with lock_manager.get_lock("production_deployment"):
        if lock.is_acquired:
            run_deployment_script()  # Deployment safety
```

## Ideal For
- Financial systems requiring transaction integrity
- Financial transaction processing systems
- E-commerce platforms handling flash sales
- IoT networks managing device communications
- Data pipelines processing critical datasets
- Kubernetes environments coordinating pods
- Microservices architectures needing resource synchronization
- Database maintenance operations
- Distributed cron job management
- Inventory management systems
- CI/CD pipeline coordination

## Installation

```bash
pip install distributed-pg-lock
```

## Basic Usage

```python
import logging
from distributed_pg_lock import DistributedLockManager, db

# Centralized logging configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Configure database (supports environment variable or direct passing)
# Option 1: Set DB_URL environment variable
#   export DB_URL='postgresql://user:pass@localhost/mydb'
#   db.configure()
#
# Option 2: Pass directly
db.configure(url="postgresql://user:pass@localhost/mydb")

db.create_tables()  # Initialize required tables

lock_manager = DistributedLockManager()

# Acquire lock using context manager
lock = lock_manager.get_lock("payment_processor")
with lock:
    if lock.is_acquired:
        logger.info("Lock acquired - processing payments")
        # Critical operations here
    else:
        logger.warning("Failed to acquire lock - skipping execution")
```

## Advanced Usage

```python
# High-concurrency processing example
from concurrent.futures import ThreadPoolExecutor
from distributed_pg_lock import DistributedLockManager

lock_manager = DistributedLockManager(lock_timeout_minutes=0.5)

def process_task(task_id):
    lock = lock_manager.get_lock(f"task_{task_id}")
    with lock:
        if lock.is_acquired:
            logger.info(f"Processing task {task_id}")
            # Resource-intensive operations
        else:
            logger.debug(f"Lock contention for task {task_id}")

# Process 100 tasks concurrently
with ThreadPoolExecutor(max_workers=20) as executor:
    executor.map(process_task, range(100))
```

## Documentation

### Database Configuration

#### Environment Variable
```bash
export DB_URL="postgresql://user:password@localhost:5432/dbname"
```

#### Direct Configuration
```python
db.configure(
    url="postgresql://user:pass@localhost/dbname",
    pool_size=10,
    max_overflow=20,
    echo=False
)
```

#### Connection Error Handling

```python
ValueError: Database URL not provided.
Please pass `db_url` explicitly or set the DB_URL environment variable.

Expected format examples:
  - PostgreSQL: postgresql://user:password@localhost:5432/dbname

Example usage:
  export DB_URL='postgresql://user:pass@localhost:5432/mydb'
  or
  db = DatabaseConnector(db_url='postgresql://user:pass@localhost:5432/mydb')
```

### Lock Manager Options
```python
# Custom lock configuration
lock_manager = DistributedLockManager(
    lock_timeout_minutes=1.5,        # Auto-release after 90s inactivity
    owner_id="worker-node-42",       # Optional custom owner ID (auto-generated if not provided)
)
```

### Force Release Locks (Admin)
```python
lock_manager.force_release_lock("stale_resource")
```

## Testing
```bash
pip install -e .[test]
pytest tests/ --log-cli-level=INFO
```

## Performance
See `tests/load_test_performance.py` for a load testing script that simulates:
- 200 concurrent pods/nodes
- High-contention resource access
- Mixed lock durations (3s–15s, including cases that exceed `lock_timeout_minutes`)
- Automatic stale lock recovery
