Coverage for fastblocks / actions / query / parser.py: 14%
202 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 12:49 -0800
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 12:49 -0800
1"""Universal Query Parser for FastBlocks.
3Converts HTTP request query parameters into ACB universal database queries.
4Provides automatic filtering, pagination, sorting, and model lookup capabilities.
6Author: lesleslie <les@wedgwoodwebworks.com>
7Created: 2025-01-13
8"""
10import typing as t
11from contextlib import suppress
13from acb.debug import debug
14from acb.depends import Inject, depends
15from starlette.requests import Request
16from fastblocks.htmx import HtmxRequest
19class UniversalQueryParser:
20 @depends.inject
21 def __init__(
22 self,
23 request: HtmxRequest | Request,
24 query: Inject[t.Any],
25 model_class: t.Any = None,
26 pattern: str = "advanced",
27 default_limit: int = 10,
28 max_limit: int = 100,
29 ) -> None:
30 self.request = request
31 self.model_class = model_class
32 self.pattern = pattern
33 self.default_limit = default_limit
34 self.max_limit = max_limit
35 self.query = query
37 def _parse_pagination(self, params: dict[str, str]) -> tuple[int, int, int]:
38 page = max(1, int(params.pop("page", 1)))
39 limit = min(
40 self.max_limit, max(1, int(params.pop("limit", self.default_limit)))
41 )
42 offset = (page - 1) * limit
43 debug(f"Pagination: page={page}, limit={limit}, offset={offset}")
44 return page, limit, offset
46 def _parse_sorting(self, params: dict[str, str]) -> tuple[str | None, str]:
47 order_by = params.pop("order_by", None)
48 order_dir = params.pop("order_dir", "asc").lower()
49 if order_dir not in ("asc", "desc"):
50 order_dir = "asc"
51 debug(f"Sorting: order_by={order_by}, order_dir={order_dir}")
52 return order_by, order_dir
54 def _parse_filters(self, params: dict[str, str]) -> list[tuple[str, str, t.Any]]:
55 filters = []
56 for key, value in params.items():
57 if "__" in key:
58 field, operator = key.rsplit("__", 1)
59 processed_value = self._process_operator_value(operator, value)
60 filters.append((field, operator, processed_value))
61 else:
62 processed_value = self._process_simple_value(value)
63 filters.append((key, "equals", processed_value))
64 debug(f"Filters: {filters}")
65 return filters
67 def _process_operator_value(self, operator: str, value: str) -> t.Any:
68 if operator == "null":
69 return value.lower() in ("true", "1", "yes")
70 elif operator == "in":
71 return [v.strip() for v in value.split(",")]
72 elif operator in ("gt", "gte", "lt", "lte"):
73 return self._convert_to_number(value)
74 return value
76 def _process_simple_value(self, value: str) -> t.Any:
77 if value.lower() in ("true", "false"):
78 return value.lower() == "true"
79 elif value.lower() in ("null", "none"):
80 return None
81 return self._convert_to_number(value)
83 def _convert_to_number(self, value: str) -> t.Any:
84 with suppress(ValueError):
85 if "." in value:
86 return float(value)
88 return int(value)
89 return value
91 def _apply_operator_filter(
92 self, query_builder: t.Any, field: str, operator: str, value: t.Any
93 ) -> t.Any:
94 """Apply a single filter based on operator."""
95 if operator == "equals":
96 query_builder = query_builder.where(field, value)
97 elif operator == "gt":
98 query_builder = query_builder.where_gt(field, value)
99 elif operator == "gte":
100 query_builder = query_builder.where_gte(field, value)
101 elif operator == "lt":
102 query_builder = query_builder.where_lt(field, value)
103 elif operator == "lte":
104 query_builder = query_builder.where_lte(field, value)
105 elif operator == "contains":
106 query_builder = query_builder.where_like(field, f"%{value}%")
107 elif operator == "icontains":
108 query_builder = query_builder.where_ilike(field, f"%{value}%")
109 elif operator == "in":
110 query_builder = query_builder.where_in(field, value)
111 elif operator == "not":
112 query_builder = query_builder.where_not(field, value)
113 elif operator == "null":
114 if value:
115 query_builder = query_builder.where_null(field)
116 else:
117 query_builder = query_builder.where_not_null(field)
118 else:
119 debug(f"Unknown operator '{operator}' for field '{field}', skipping")
120 return query_builder
122 def _apply_filters( # noqa: C901
123 self, query_builder: t.Any, filters: list[tuple[str, str, t.Any]]
124 ) -> t.Any:
125 for field, operator, value in filters:
126 try:
127 query_builder = self._apply_operator_filter(
128 query_builder, field, operator, value
129 )
130 except AttributeError as e:
131 debug(
132 f"Query builder method not available for operator '{operator}': {e}"
133 )
135 return query_builder
137 def _apply_sorting(
138 self, query_builder: t.Any, order_by: str | None, order_dir: str
139 ) -> t.Any:
140 if order_by:
141 try:
142 if order_dir == "desc":
143 query_builder = query_builder.order_by_desc(order_by)
144 else:
145 query_builder = query_builder.order_by(order_by)
146 except AttributeError as e:
147 debug(f"Query builder sorting method not available: {e}")
149 return query_builder
151 def _apply_pagination(self, query_builder: t.Any, offset: int, limit: int) -> t.Any:
152 try:
153 return query_builder.offset(offset).limit(limit)
154 except AttributeError as e:
155 debug(f"Query builder pagination method not available: {e}")
156 return query_builder
158 async def parse_and_execute(self) -> list[t.Any]:
159 if not self._validate_query_requirements():
160 return []
161 params = dict(getattr(self.request, "query_params", {}))
162 debug(f"Original query params: {params}")
163 _, limit, offset = self._parse_pagination(params)
164 order_by, order_dir = self._parse_sorting(params)
165 filters = self._parse_filters(params)
166 try:
167 query_builder = self._get_query_builder(filters)
168 if query_builder is None:
169 return []
171 return await self._execute_query(
172 query_builder, filters, order_by, order_dir, offset, limit
173 )
174 except Exception as e:
175 debug(f"Query execution failed: {e}")
176 return []
178 def _validate_query_requirements(self) -> bool:
179 if not self.model_class:
180 debug("No model class provided for query parsing")
181 return False
182 if not self.query:
183 debug("Universal query interface not available")
184 return False
185 return True
187 def _get_query_builder(self, filters: list[tuple[str, str, t.Any]]) -> t.Any:
188 if self.pattern == "simple":
189 return self._handle_simple_pattern(filters)
190 elif self.pattern in ("repository", "specification"):
191 debug(
192 f"{self.pattern.title()} pattern not fully implemented, falling back to advanced"
193 )
194 return self.query.for_model(self.model_class).advanced
195 return self.query.for_model(self.model_class).advanced
197 def _handle_simple_pattern(self, filters: list[tuple[str, str, t.Any]]) -> t.Any:
198 query_builder = self.query.for_model(self.model_class).simple
199 if filters:
200 for field, operator, value in filters:
201 if operator == "equals":
202 try:
203 query_builder = query_builder.where(field, value)
204 except AttributeError:
205 debug("Simple query pattern doesn't support where clause")
206 break
207 return query_builder
209 async def _execute_query(
210 self,
211 query_builder: t.Any,
212 filters: list[tuple[str, str, t.Any]],
213 order_by: str | None,
214 order_dir: str,
215 offset: int,
216 limit: int,
217 ) -> list[t.Any]:
218 if self.pattern == "simple":
219 return t.cast(list[t.Any], await query_builder.all())
221 query_builder = self._apply_filters(query_builder, filters)
222 query_builder = self._apply_sorting(query_builder, order_by, order_dir)
223 query_builder = self._apply_pagination(query_builder, offset, limit)
225 debug(f"Executing query for model {self.model_class.__name__}")
226 results = t.cast(list[t.Any], await query_builder.all())
227 debug(f"Query returned {len(results)} results")
229 return results
231 async def get_count(self) -> int:
232 if not self.model_class or not self.query:
233 return 0
234 params = dict(getattr(self.request, "query_params", {}))
235 params.pop("page", None)
236 params.pop("limit", None)
237 params.pop("order_by", None)
238 params.pop("order_dir", None)
239 filters = self._parse_filters(params)
240 try:
241 query_builder = self.query.for_model(self.model_class).advanced
242 query_builder = self._apply_filters(query_builder, filters)
244 return t.cast(int, await query_builder.count())
245 except Exception as e:
246 debug(f"Count query failed: {e}")
247 return 0
249 def get_pagination_info(self) -> dict[str, t.Any]:
250 params = dict(getattr(self.request, "query_params", {}))
251 page, limit, offset = self._parse_pagination(params)
253 return {
254 "page": page,
255 "limit": limit,
256 "offset": offset,
257 "has_prev": page > 1,
258 "prev_page": page - 1 if page > 1 else None,
259 "next_page": page + 1,
260 }
263async def get_model_for_query(model_name: str) -> t.Any | None:
264 try:
265 models = await depends.get("models")
266 if models and hasattr(models, model_name):
267 return getattr(models, model_name)
268 except Exception as e:
269 debug(f"Failed to get model '{model_name}': {e}")
271 return None
274async def create_query_context(
275 request: HtmxRequest | Request,
276 model_name: str | None = None,
277 base_context: dict[str, t.Any] | None = None,
278) -> dict[str, t.Any]:
279 if base_context is None:
280 base_context = {}
282 context = dict(base_context)
284 if not model_name:
285 query_params = getattr(request, "query_params", {})
286 model_name = query_params.get("model")
288 if not model_name:
289 return context
291 model_class = await get_model_for_query(model_name)
292 if not model_class:
293 debug(f"Model '{model_name}' not found")
294 return context
296 parser = UniversalQueryParser(request, model_class)
298 context.update(
299 {
300 f"{model_name}_parser": parser,
301 f"{model_name}_pagination": parser.get_pagination_info(),
302 "universal_query": {
303 "model_name": model_name,
304 "model_class": model_class,
305 "parser": parser,
306 },
307 }
308 )
310 return context