Metadata-Version: 2.4
Name: cjm-graph-plugin-sqlite
Version: 0.0.22
Summary: A local, file-backed Context Graph worker for the cjm-plugin-system that implements graph storage, traversal, and querying using SQLite.
Author-email: "Christian J. Mills" <9126128+cj-mills@users.noreply.github.com>
License: Apache-2.0
Project-URL: Repository, https://github.com/cj-mills/cjm-graph-plugin-sqlite
Project-URL: Documentation, https://cj-mills.github.io/cjm-graph-plugin-sqlite
Keywords: nbdev,jupyter,notebook,python
Classifier: Natural Language :: English
Classifier: Intended Audience :: Developers
Classifier: Development Status :: 3 - Alpha
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3 :: Only
Requires-Python: >=3.12
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: cjm_graph_plugin_system>=0.0.23
Dynamic: license-file

# cjm-graph-plugin-sqlite


<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

## Install

``` bash
pip install cjm_graph_plugin_sqlite
```

## Project Structure

    nbs/
    ├── meta.ipynb              # Metadata introspection for the SQLite Graph plugin used by cjm-ctl to generate the registration manifest.
    ├── plugin.ipynb            # Plugin implementation for Context Graph using SQLite
    └── query_translation.ipynb # The per-backend translation of the typed query expressions (pass-2 Thread 5; stage 4): `NodeQuery`/`EdgeQuery` → parameterized SQLite SQL over the `nodes`/`edges` schema. THIS module is what makes the typed surface portable — the expression is domain- and backend-neutral; every backend tool owns a translation like this one (the ratified stage-4 split: backend owns translation; the adapter stays generic). Pure functions, unit-tested against an in-memory DB with the production schema — no plugin runtime needed.

Total: 3 notebooks

## Module Dependencies

``` mermaid
graph LR
    meta["meta<br/>Metadata"]
    plugin["plugin<br/>SQLite Graph Plugin"]
    query_translation["query_translation<br/>query_translation"]

    plugin --> query_translation
    plugin --> meta
```

*2 cross-module dependencies detected*

## CLI Reference

No CLI commands found in this project.

## Module Overview

Detailed documentation for each module in the project:

### Metadata (`meta.ipynb`)

> Metadata introspection for the SQLite Graph plugin used by cjm-ctl to
> generate the registration manifest.

#### Import

``` python
from cjm_graph_plugin_sqlite.meta import (
    get_plugin_metadata
)
```

#### Functions

``` python
def get_plugin_metadata() -> Dict[str, Any]:  # Plugin metadata for manifest generation
    """Return metadata required to register this plugin with the PluginManager."""
    # Fallback base path (current behavior for backward compatibility)
    base_path = os.path.dirname(os.path.dirname(sys.executable))
    
    # Use CJM config if available, else fallback to env-relative paths
    cjm_plugin_data_dir = os.environ.get("CJM_PLUGIN_DATA_DIR")
    
    # Plugin data directory
    plugin_name = "cjm-graph-plugin-sqlite"
    package_name = plugin_name.replace("-", "_")
    if cjm_plugin_data_dir
    "Return metadata required to register this plugin with the PluginManager."
```

### SQLite Graph Plugin (`plugin.ipynb`)

> Plugin implementation for Context Graph using SQLite

#### Import

``` python
from cjm_graph_plugin_sqlite.plugin import (
    SQLiteGraphPluginConfig,
    SQLiteGraphPlugin,
    query_nodes,
    query_edges,
    raw_query,
    integrity_check
)
```

#### Functions

``` python
def query_nodes(
    self,
    query: NodeQuery,  # Typed node query (the portable, scale-shaped surface)
) -> NodeQueryResult:  # Typed result (nodes / rows / count per query mode)
    """
    Execute a typed node query (stage 4) — translated to parameterized SQL
    by `query_translation`, run on a fresh read-only connection (SG-41).
    """
```

``` python
def query_edges(
    self,
    query: EdgeQuery,  # Typed edge query
) -> EdgeQueryResult:  # Typed result (edges / rows / count per query mode)
    "Execute a typed edge query (stage 4) — same contract as `query_nodes`."
```

``` python
def raw_query(
    self,
    query: RawQuery,  # The marked, backend-coupled raw escape (backend REQUIRED)
) -> RawQueryResult:  # Tabular backend-shaped result
    """
    Execute the raw escape — refuses backend mismatches (non-portable by
    construction); the read-only single-statement guards are `query`'s (SG-41).
    Recurring raw patterns are promotion candidates into the typed surface.
    """
```

``` python
def integrity_check(self) -> Dict[str, Any]:  # {"ok": bool, "errors": [...], "backend": "sqlite"}
    """Backend self-check (stage 4; the G3 corruption find institutionalized):
    `PRAGMA quick_check` on a fresh read-only connection so loop-backs and
    stress tests assert storage health cheaply and corruption FAILS LOUDLY."""
    con = sqlite3.connect(f"file:{self._db_path}?mode=ro", uri=True)
    try
    """
    Backend self-check (stage 4; the G3 corruption find institutionalized):
    `PRAGMA quick_check` on a fresh read-only connection so loop-backs and
    stress tests assert storage health cheaply and corruption FAILS LOUDLY.
    """
```

#### Classes

``` python
@dataclass
class SQLiteGraphPluginConfig:
    "Configuration for SQLite Graph Plugin."
    
    db_path: Optional[str] = field(...)
    readonly: bool = field(...)
```

``` python
class SQLiteGraphPlugin:
    def __init__(self):
        self.logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
        self.config: SQLiteGraphPluginConfig = None
    "Local, file-backed Context Graph implementation using SQLite."
    
    def __init__(self):
            self.logger = logging.getLogger(f"{__name__}.{type(self).__name__}")
            self.config: SQLiteGraphPluginConfig = None
    
    def name(self) -> str:  # Plugin name identifier
            """Get the plugin name identifier."""
            return get_plugin_metadata()["name"]
    
        @property
        def version(self) -> str:  # Plugin version string
        "Get the plugin name identifier."
    
    def version(self) -> str:  # Plugin version string
            """Get the plugin version string."""
            return get_plugin_metadata()["version"]
    
        def get_current_config(self) -> Dict[str, Any]:  # Current configuration as dictionary
        "Get the plugin version string."
    
    def get_current_config(self) -> Dict[str, Any]:  # Current configuration as dictionary
            """Return current configuration state."""
            if not self.config
        "Return current configuration state."
    
    def get_config_schema(self) -> Dict[str, Any]:  # JSON Schema for configuration
            """Return JSON Schema for UI generation."""
            return dataclass_to_jsonschema(SQLiteGraphPluginConfig)
    
        def initialize(
            self,
            config: Optional[Any] = None  # Configuration dataclass, dict, or None
        ) -> None
        "Return JSON Schema for UI generation."
    
    def initialize(
            self,
            config: Optional[Any] = None  # Configuration dataclass, dict, or None
        ) -> None
        "Initialize DB connection and schema."
    
    def execute(
            self,
            action: str = "get_schema",  # Action to perform
            **kwargs
        ) -> Dict[str, Any]:  # JSON-serializable result
        "Dispatch to the `@plugin_action`-tagged handler for `action` (SG-44).

Handlers are discovered by walking the class MRO for methods carrying a
`_plugin_action` tag (the same source `supported_actions` is built from
via `collect_plugin_actions`). Replaces the prior hand-maintained
if/elif chain."
    
    def add_nodes(
            self,
            nodes: List[GraphNode]  # Nodes to create
        ) -> List[str]:  # Created node IDs
        "Bulk create nodes."
    
    def add_edges(
            self,
            edges: List[GraphEdge]  # Edges to create
        ) -> List[str]:  # Created edge IDs
        "Bulk create edges."
    
    def get_node(
            self,
            node_id: str  # UUID of node to retrieve
        ) -> Optional[GraphNode]:  # Node or None if not found
        "Get a single node by ID."
    
    def get_edge(
            self,
            edge_id: str  # UUID of edge to retrieve
        ) -> Optional[GraphEdge]:  # Edge or None if not found
        "Get a single edge by ID."
    
    def find_nodes_by_source(
            self,
            source_ref: SourceRef  # External resource reference
        ) -> List[GraphNode]:  # Nodes attached to this source
        "Find all nodes linked to a specific external resource."
    
    def find_nodes_by_label(
            self,
            label: str,  # Node label to search for
            limit: int = 100  # Max results
        ) -> List[GraphNode]:  # Matching nodes
        "Find nodes by label."
    
    def get_context(
            self,
            node_id: str,  # Starting node UUID
            depth: int = 1,  # Traversal depth (1 = immediate neighbors)
            filter_labels: Optional[List[str]] = None  # Only include nodes with these labels
        ) -> GraphContext:  # Subgraph containing node and its neighborhood
        "Get the neighborhood of a specific node."
    
    def update_node(
            self,
            node_id: str,  # UUID of node to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of node properties."
    
    def update_edge(
            self,
            edge_id: str,  # UUID of edge to update
            properties: Dict[str, Any]  # Properties to merge/update
        ) -> bool:  # True if successful
        "Partial update of edge properties."
    
    def delete_nodes(
            self,
            node_ids: List[str],  # UUIDs of nodes to delete
            cascade: bool = True  # Also delete connected edges
        ) -> int:  # Number of nodes deleted
        "Delete nodes (and optionally connected edges)."
    
    def delete_edges(
            self,
            edge_ids: List[str]  # UUIDs of edges to delete
        ) -> int:  # Number of edges deleted
        "Delete edges."
    
    def get_schema(self) -> Dict[str, Any]:  # Graph schema/ontology
            """Return the current ontology/schema of the graph."""
            schema = {"node_labels": [], "edge_types": [], "counts": {}}
        "Return the current ontology/schema of the graph."
    
    def import_graph(
            self,
            graph_data: GraphContext,  # Data to import
            merge_strategy: str = "overwrite"  # "overwrite", "skip", or "merge"
        ) -> Dict[str, int]:  # Import statistics {nodes_created, edges_created, merge_strategy}
        "Bulk import a GraphContext honoring merge_strategy (SG-41).

On id-conflict: "skip" keeps the existing row untouched; "overwrite"
replaces its mutable fields with the incoming values; "merge" unions
properties (incoming wins per key) and unions node sources by identity.
Brand-new ids are always inserted. The `nodes_created`/`edges_created`
counts report rows written (inserted or updated)."
    
    def export_graph(
            self,
            filter_query: Optional[NodeQuery] = None  # Typed node filter; None = whole graph (stage 4; GraphQuery dissolved)
        ) -> GraphContext:  # Exported subgraph or full graph
        "Export the whole graph, or the subgraph selected by a typed node
query (matching nodes + the edges among them)."
    
    def query(
            self,
            sql: str,  # A single read-only SELECT (or WITH ... SELECT) statement
            params: Optional[Union[List[Any], Dict[str, Any]]] = None  # Bound parameters (positional list or named dict)
        ) -> Dict[str, Any]:  # {"columns": [...], "rows": [[...]], "row_count": int}
        "Execute a single read-only SELECT and return its rows (SG-41).

Guards reject empty input, multiple statements, and anything not starting
with SELECT/WITH. The statement runs on a fresh read-only connection
(URI `mode=ro`), so even a query that slips past the prefix guard cannot
mutate the database. Bound `params` use SQLite's qmark placeholders."
```

### query_translation (`query_translation.ipynb`)

> The per-backend translation of the typed query expressions (pass-2
> Thread 5; stage 4): `NodeQuery`/`EdgeQuery` → parameterized SQLite SQL
> over the `nodes`/`edges` schema. THIS module is what makes the typed
> surface portable — the expression is domain- and backend-neutral;
> every backend tool owns a translation like this one (the ratified
> stage-4 split: backend owns translation; the adapter stays generic).
> Pure functions, unit-tested against an in-memory DB with the
> production schema — no plugin runtime needed.

#### Import

``` python
from cjm_graph_plugin_sqlite.query_translation import (
    NODE_FULL_COLUMNS,
    EDGE_FULL_COLUMNS,
    translate_node_query,
    translate_edge_query
)
```

#### Functions

``` python
def _json_path(
    prop: str,  # Property name or dotted path (e.g. "payload.document_id")
) -> str:  # SQL string literal for json_extract's path argument
    "Validate a dotted property path and render the JSON-path literal."
```

``` python
def _escape_like(
    value: str,  # Raw substring the caller wants matched literally
) -> str:  # Value with LIKE wildcards escaped (ESCAPE '\\')
    "Escape `%` / `_` / `\` so `contains` matches literally, never as wildcards."
```

``` python
def _predicate_sql(
    pred: PropertyPredicate,  # One typed property predicate
    props_col: str,  # SQL expression for the properties JSON column (e.g. "n.properties")
) -> Tuple[str, List[Any]]:  # (SQL fragment, bound params)
    "Compile one property predicate to a parameterized SQL fragment."
```

``` python
def _source_match_sql(
    sp: SourcePredicate,  # Provenance match (content-hash-primary per CR-19)
    sources_expr: str,  # SQL expression for the sources JSON column (e.g. "n.sources")
) -> Tuple[str, List[Any]]:  # (EXISTS fragment, bound params)
    """
    Compile a source predicate to an EXISTS over the sources array.
    
    Content-hash-primary (identity field = the stable query surface, C19).
    A locator constraint RAISES — unsupported in the typed translation
    (use `find_nodes_by_source` for locator equality; recurring need here
    is the promotion evidence the raw-escape posture wants recorded).
    """
```

``` python
def _relation_exists_sql(
    rel: RelationPredicate,  # One-hop relation constraint (+ far-end constraints)
    node_expr: str,  # SQL expression for the candidate node id (e.g. "n.id", "e.source_id")
) -> Tuple[str, List[Any]]:  # (EXISTS fragment, bound params)
    """
    Compile a relation predicate to a correlated EXISTS (no row multiplication).
    
    Far-end constraints (stage-4 promotions): `node_id` / `node_ids` pin the
    far node; `node_source` nests a provenance EXISTS on the far node.
    Subquery scoping keeps the fixed aliases (r / fn / src) collision-free.
    """
```

``` python
def _order_limit_sql(
    query,  # NodeQuery or EdgeQuery (order_by / limit / offset fields)
    props_col: str,  # Properties column expression for ORDER BY paths
    params: List[Any],  # Bound-params list (appended in place)
) -> str:  # ORDER BY / LIMIT / OFFSET tail
    "Compile the shared ordering + paging tail."
```

``` python
def translate_node_query(
    q: NodeQuery,  # Typed node query
) -> Tuple[str, List[Any], str, Optional[List[str]]]:  # (sql, params, mode, row keys)
    """
    Translate a `NodeQuery` to parameterized SQLite SQL.
    
    mode: "count" | "rows" | "full" (count > project > full, mirroring the
    result DTO's exactly-one-populated contract). For "rows", the returned
    keys list zips against each cursor row ("id" always first; "label"
    projects structurally; "sources" projects as the raw JSON column for the
    caller to parse).
    """
```

``` python
def translate_edge_query(
    q: EdgeQuery,  # Typed edge query
) -> Tuple[str, List[Any], str, Optional[List[str]]]:  # (sql, params, mode, row keys)
    """
    Translate an `EdgeQuery` to parameterized SQLite SQL.
    
    Same mode contract as `translate_node_query`. Projected rows always carry
    `id`/`source_id`/`target_id`; `relation_type` projects structurally.
    Endpoint constraints (`source_related`/`target_related` — the D13
    NEXT-chain count) compile to correlated EXISTS on the endpoint node.
    """
```

#### Variables

``` python
_PROP_PATH_RE  # Dotted-path guard (paths are interpolated; values are bound)
NODE_FULL_COLUMNS = 'n.id, n.label, n.properties, n.sources, n.created_at, n.updated_at'  # Matches _row_to_node order
EDGE_FULL_COLUMNS = 'e.id, e.source_id, e.target_id, e.relation_type, e.properties, e.created_at, e.updated_at'  # Matches _row_to_edge order
```
