Coverage for fastblocks / actions / query / parser.py: 14%

202 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-19 12:49 -0800

1"""Universal Query Parser for FastBlocks. 

2 

3Converts HTTP request query parameters into ACB universal database queries. 

4Provides automatic filtering, pagination, sorting, and model lookup capabilities. 

5 

6Author: lesleslie <les@wedgwoodwebworks.com> 

7Created: 2025-01-13 

8""" 

9 

10import typing as t 

11from contextlib import suppress 

12 

13from acb.debug import debug 

14from acb.depends import Inject, depends 

15from starlette.requests import Request 

16from fastblocks.htmx import HtmxRequest 

17 

18 

19class UniversalQueryParser: 

20 @depends.inject 

21 def __init__( 

22 self, 

23 request: HtmxRequest | Request, 

24 query: Inject[t.Any], 

25 model_class: t.Any = None, 

26 pattern: str = "advanced", 

27 default_limit: int = 10, 

28 max_limit: int = 100, 

29 ) -> None: 

30 self.request = request 

31 self.model_class = model_class 

32 self.pattern = pattern 

33 self.default_limit = default_limit 

34 self.max_limit = max_limit 

35 self.query = query 

36 

37 def _parse_pagination(self, params: dict[str, str]) -> tuple[int, int, int]: 

38 page = max(1, int(params.pop("page", 1))) 

39 limit = min( 

40 self.max_limit, max(1, int(params.pop("limit", self.default_limit))) 

41 ) 

42 offset = (page - 1) * limit 

43 debug(f"Pagination: page={page}, limit={limit}, offset={offset}") 

44 return page, limit, offset 

45 

46 def _parse_sorting(self, params: dict[str, str]) -> tuple[str | None, str]: 

47 order_by = params.pop("order_by", None) 

48 order_dir = params.pop("order_dir", "asc").lower() 

49 if order_dir not in ("asc", "desc"): 

50 order_dir = "asc" 

51 debug(f"Sorting: order_by={order_by}, order_dir={order_dir}") 

52 return order_by, order_dir 

53 

54 def _parse_filters(self, params: dict[str, str]) -> list[tuple[str, str, t.Any]]: 

55 filters = [] 

56 for key, value in params.items(): 

57 if "__" in key: 

58 field, operator = key.rsplit("__", 1) 

59 processed_value = self._process_operator_value(operator, value) 

60 filters.append((field, operator, processed_value)) 

61 else: 

62 processed_value = self._process_simple_value(value) 

63 filters.append((key, "equals", processed_value)) 

64 debug(f"Filters: {filters}") 

65 return filters 

66 

67 def _process_operator_value(self, operator: str, value: str) -> t.Any: 

68 if operator == "null": 

69 return value.lower() in ("true", "1", "yes") 

70 elif operator == "in": 

71 return [v.strip() for v in value.split(",")] 

72 elif operator in ("gt", "gte", "lt", "lte"): 

73 return self._convert_to_number(value) 

74 return value 

75 

76 def _process_simple_value(self, value: str) -> t.Any: 

77 if value.lower() in ("true", "false"): 

78 return value.lower() == "true" 

79 elif value.lower() in ("null", "none"): 

80 return None 

81 return self._convert_to_number(value) 

82 

83 def _convert_to_number(self, value: str) -> t.Any: 

84 with suppress(ValueError): 

85 if "." in value: 

86 return float(value) 

87 

88 return int(value) 

89 return value 

90 

91 def _apply_operator_filter( 

92 self, query_builder: t.Any, field: str, operator: str, value: t.Any 

93 ) -> t.Any: 

94 """Apply a single filter based on operator.""" 

95 if operator == "equals": 

96 query_builder = query_builder.where(field, value) 

97 elif operator == "gt": 

98 query_builder = query_builder.where_gt(field, value) 

99 elif operator == "gte": 

100 query_builder = query_builder.where_gte(field, value) 

101 elif operator == "lt": 

102 query_builder = query_builder.where_lt(field, value) 

103 elif operator == "lte": 

104 query_builder = query_builder.where_lte(field, value) 

105 elif operator == "contains": 

106 query_builder = query_builder.where_like(field, f"%{value}%") 

107 elif operator == "icontains": 

108 query_builder = query_builder.where_ilike(field, f"%{value}%") 

109 elif operator == "in": 

110 query_builder = query_builder.where_in(field, value) 

111 elif operator == "not": 

112 query_builder = query_builder.where_not(field, value) 

113 elif operator == "null": 

114 if value: 

115 query_builder = query_builder.where_null(field) 

116 else: 

117 query_builder = query_builder.where_not_null(field) 

118 else: 

119 debug(f"Unknown operator '{operator}' for field '{field}', skipping") 

120 return query_builder 

121 

122 def _apply_filters( # noqa: C901 

123 self, query_builder: t.Any, filters: list[tuple[str, str, t.Any]] 

124 ) -> t.Any: 

125 for field, operator, value in filters: 

126 try: 

127 query_builder = self._apply_operator_filter( 

128 query_builder, field, operator, value 

129 ) 

130 except AttributeError as e: 

131 debug( 

132 f"Query builder method not available for operator '{operator}': {e}" 

133 ) 

134 

135 return query_builder 

136 

137 def _apply_sorting( 

138 self, query_builder: t.Any, order_by: str | None, order_dir: str 

139 ) -> t.Any: 

140 if order_by: 

141 try: 

142 if order_dir == "desc": 

143 query_builder = query_builder.order_by_desc(order_by) 

144 else: 

145 query_builder = query_builder.order_by(order_by) 

146 except AttributeError as e: 

147 debug(f"Query builder sorting method not available: {e}") 

148 

149 return query_builder 

150 

151 def _apply_pagination(self, query_builder: t.Any, offset: int, limit: int) -> t.Any: 

152 try: 

153 return query_builder.offset(offset).limit(limit) 

154 except AttributeError as e: 

155 debug(f"Query builder pagination method not available: {e}") 

156 return query_builder 

157 

158 async def parse_and_execute(self) -> list[t.Any]: 

159 if not self._validate_query_requirements(): 

160 return [] 

161 params = dict(getattr(self.request, "query_params", {})) 

162 debug(f"Original query params: {params}") 

163 _, limit, offset = self._parse_pagination(params) 

164 order_by, order_dir = self._parse_sorting(params) 

165 filters = self._parse_filters(params) 

166 try: 

167 query_builder = self._get_query_builder(filters) 

168 if query_builder is None: 

169 return [] 

170 

171 return await self._execute_query( 

172 query_builder, filters, order_by, order_dir, offset, limit 

173 ) 

174 except Exception as e: 

175 debug(f"Query execution failed: {e}") 

176 return [] 

177 

178 def _validate_query_requirements(self) -> bool: 

179 if not self.model_class: 

180 debug("No model class provided for query parsing") 

181 return False 

182 if not self.query: 

183 debug("Universal query interface not available") 

184 return False 

185 return True 

186 

187 def _get_query_builder(self, filters: list[tuple[str, str, t.Any]]) -> t.Any: 

188 if self.pattern == "simple": 

189 return self._handle_simple_pattern(filters) 

190 elif self.pattern in ("repository", "specification"): 

191 debug( 

192 f"{self.pattern.title()} pattern not fully implemented, falling back to advanced" 

193 ) 

194 return self.query.for_model(self.model_class).advanced 

195 return self.query.for_model(self.model_class).advanced 

196 

197 def _handle_simple_pattern(self, filters: list[tuple[str, str, t.Any]]) -> t.Any: 

198 query_builder = self.query.for_model(self.model_class).simple 

199 if filters: 

200 for field, operator, value in filters: 

201 if operator == "equals": 

202 try: 

203 query_builder = query_builder.where(field, value) 

204 except AttributeError: 

205 debug("Simple query pattern doesn't support where clause") 

206 break 

207 return query_builder 

208 

209 async def _execute_query( 

210 self, 

211 query_builder: t.Any, 

212 filters: list[tuple[str, str, t.Any]], 

213 order_by: str | None, 

214 order_dir: str, 

215 offset: int, 

216 limit: int, 

217 ) -> list[t.Any]: 

218 if self.pattern == "simple": 

219 return t.cast(list[t.Any], await query_builder.all()) 

220 

221 query_builder = self._apply_filters(query_builder, filters) 

222 query_builder = self._apply_sorting(query_builder, order_by, order_dir) 

223 query_builder = self._apply_pagination(query_builder, offset, limit) 

224 

225 debug(f"Executing query for model {self.model_class.__name__}") 

226 results = t.cast(list[t.Any], await query_builder.all()) 

227 debug(f"Query returned {len(results)} results") 

228 

229 return results 

230 

231 async def get_count(self) -> int: 

232 if not self.model_class or not self.query: 

233 return 0 

234 params = dict(getattr(self.request, "query_params", {})) 

235 params.pop("page", None) 

236 params.pop("limit", None) 

237 params.pop("order_by", None) 

238 params.pop("order_dir", None) 

239 filters = self._parse_filters(params) 

240 try: 

241 query_builder = self.query.for_model(self.model_class).advanced 

242 query_builder = self._apply_filters(query_builder, filters) 

243 

244 return t.cast(int, await query_builder.count()) 

245 except Exception as e: 

246 debug(f"Count query failed: {e}") 

247 return 0 

248 

249 def get_pagination_info(self) -> dict[str, t.Any]: 

250 params = dict(getattr(self.request, "query_params", {})) 

251 page, limit, offset = self._parse_pagination(params) 

252 

253 return { 

254 "page": page, 

255 "limit": limit, 

256 "offset": offset, 

257 "has_prev": page > 1, 

258 "prev_page": page - 1 if page > 1 else None, 

259 "next_page": page + 1, 

260 } 

261 

262 

263async def get_model_for_query(model_name: str) -> t.Any | None: 

264 try: 

265 models = await depends.get("models") 

266 if models and hasattr(models, model_name): 

267 return getattr(models, model_name) 

268 except Exception as e: 

269 debug(f"Failed to get model '{model_name}': {e}") 

270 

271 return None 

272 

273 

274async def create_query_context( 

275 request: HtmxRequest | Request, 

276 model_name: str | None = None, 

277 base_context: dict[str, t.Any] | None = None, 

278) -> dict[str, t.Any]: 

279 if base_context is None: 

280 base_context = {} 

281 

282 context = dict(base_context) 

283 

284 if not model_name: 

285 query_params = getattr(request, "query_params", {}) 

286 model_name = query_params.get("model") 

287 

288 if not model_name: 

289 return context 

290 

291 model_class = await get_model_for_query(model_name) 

292 if not model_class: 

293 debug(f"Model '{model_name}' not found") 

294 return context 

295 

296 parser = UniversalQueryParser(request, model_class) 

297 

298 context.update( 

299 { 

300 f"{model_name}_parser": parser, 

301 f"{model_name}_pagination": parser.get_pagination_info(), 

302 "universal_query": { 

303 "model_name": model_name, 

304 "model_class": model_class, 

305 "parser": parser, 

306 }, 

307 } 

308 ) 

309 

310 return context