Coverage for fastblocks / _health_integration.py: 0%
210 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 12:49 -0800
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-19 12:49 -0800
1"""ACB HealthService integration for FastBlocks.
3This module bridges FastBlocks components with ACB's comprehensive health monitoring system.
4It registers FastBlocks-specific health checks while maintaining existing MCP health checks.
6Author: lesleslie <les@wedgwoodwebworks.com>
7Created: 2025-10-01
8"""
10import typing as t
11from contextlib import suppress
12from uuid import UUID
14from acb.adapters import AdapterStatus
15from acb.depends import Inject, depends
17# Optional ACB health imports (graceful degradation if not available)
18try:
19 from acb.services.health import (
20 HealthCheckMixin,
21 HealthCheckResult,
22 HealthStatus,
23 )
25 acb_health_available = True
26except ImportError:
27 acb_health_available = False
28 HealthCheckMixin = object # Fallback base class
29 HealthCheckResult = None
30 HealthCheckType = None
31 HealthStatus = None
34class FastBlocksHealthCheck(HealthCheckMixin): # type: ignore[misc]
35 """Base health check implementation for FastBlocks components."""
37 @depends.inject # type: ignore[misc] # ACB untyped decorator
38 def __init__(
39 self,
40 config: Inject[t.Any],
41 component_id: str | None = None,
42 component_name: str | None = None,
43 ) -> None:
44 if acb_health_available:
45 super().__init__()
46 self.config = config
47 self._component_id: str = component_id or self.__class__.__name__.lower()
48 self._component_name: str = component_name or self.__class__.__name__
50 @property
51 def component_id(self) -> str:
52 """Get unique identifier for this component."""
53 return self._component_id
55 @property
56 def component_name(self) -> str:
57 """Get human-readable name for this component."""
58 return self._component_name
60 async def _perform_health_check(
61 self,
62 check_type: t.Any, # HealthCheckType when available
63 ) -> t.Any: # HealthCheckResult when available
64 """Default health check - override in subclasses."""
65 if not acb_health_available:
66 return None
68 return HealthCheckResult(
69 component_id=self.component_id,
70 component_name=self.component_name,
71 status=HealthStatus.HEALTHY,
72 check_type=check_type,
73 message=f"{self.component_name} is operational",
74 )
77class TemplatesHealthCheck(FastBlocksHealthCheck):
78 """Health check for FastBlocks template system."""
80 def __init__(self) -> None:
81 super().__init__(
82 component_id="templates",
83 component_name="Template System",
84 )
86 def _check_template_adapter_status(
87 self, templates: t.Any, details: dict[str, t.Any]
88 ) -> tuple[t.Any, str]:
89 """Check templates adapter status and update details."""
90 if not hasattr(templates, "app") or templates.app is None:
91 return HealthStatus.DEGRADED, "Template app not initialized"
93 details["jinja_env_initialized"] = True
95 # Check template directory accessibility
96 if hasattr(templates.app, "env") and templates.app.env.loader:
97 details["loader_available"] = True
98 return HealthStatus.HEALTHY, "Template system operational"
99 return HealthStatus.DEGRADED, "Template loader not configured"
101 async def _check_cache_availability(self, details: dict[str, t.Any]) -> None:
102 """Check cache availability and update details."""
103 try:
104 cache = await depends.get("cache")
105 details["cache_available"] = cache is not None
106 except Exception:
107 details["cache_available"] = False
109 async def _perform_health_check(
110 self,
111 check_type: t.Any,
112 ) -> t.Any:
113 """Check template system health."""
114 if not acb_health_available:
115 return None
117 details: dict[str, t.Any] = {}
118 status = HealthStatus.HEALTHY
119 message = "Template system operational"
121 try:
122 # Try to get templates adapter
123 templates = await depends.get("templates")
125 if templates is None:
126 status = HealthStatus.DEGRADED
127 message = "Templates adapter not initialized"
128 else:
129 status, message = self._check_template_adapter_status(
130 templates, details
131 )
133 # Check cache availability
134 await self._check_cache_availability(details)
136 except Exception as e:
137 status = HealthStatus.UNHEALTHY
138 message = f"Template health check failed: {e}"
139 details["error"] = str(e)
141 return HealthCheckResult(
142 component_id=self.component_id,
143 component_name=self.component_name,
144 status=status,
145 check_type=check_type,
146 message=message,
147 details=details,
148 )
151class CacheHealthCheck(FastBlocksHealthCheck):
152 """Health check for FastBlocks cache system."""
154 def __init__(self) -> None:
155 super().__init__(
156 component_id="cache",
157 component_name="Cache System",
158 )
160 async def _test_cache_operations(
161 self, cache: t.Any, details: dict[str, t.Any]
162 ) -> tuple[t.Any, str]: # Returns (status, message)
163 """Test cache read/write operations and update details."""
164 test_key = "__fastblocks_health_check__"
165 test_value = "health_check_ok"
167 try:
168 # Test set operation
169 await cache.set(test_key, test_value, ttl=10)
170 details["write_test"] = "passed"
172 # Test get operation
173 retrieved = await cache.get(test_key)
174 if retrieved == test_value:
175 details["read_test"] = "passed"
176 await cache.delete(test_key)
177 return HealthStatus.HEALTHY, "Cache system operational"
179 details["read_test"] = "failed"
180 return HealthStatus.DEGRADED, "Cache read verification failed"
182 except Exception as e:
183 details["operation_error"] = str(e)
184 return HealthStatus.DEGRADED, f"Cache operations failed: {e}"
186 async def _collect_cache_stats(
187 self, cache: t.Any, details: dict[str, t.Any]
188 ) -> None:
189 """Collect cache statistics if available."""
190 if hasattr(cache, "get_stats"):
191 with suppress(Exception): # Stats not critical
192 stats = await cache.get_stats()
193 if hasattr(stats, "hit_ratio"):
194 details["cache_hit_ratio"] = stats.hit_ratio
196 async def _perform_health_check(
197 self,
198 check_type: t.Any,
199 ) -> t.Any:
200 """Check cache system health."""
201 if not acb_health_available:
202 return None
204 details: dict[str, t.Any] = {}
205 status = HealthStatus.HEALTHY
206 message = "Cache system operational"
208 try:
209 # Try to get cache adapter
210 cache = await depends.get("cache")
212 if cache is None:
213 status = HealthStatus.DEGRADED
214 message = "Cache adapter not available (degraded mode)"
215 details["reason"] = "cache_disabled_or_not_configured"
216 else:
217 # Test cache operations
218 status, message = await self._test_cache_operations(cache, details)
220 # Collect stats if available
221 await self._collect_cache_stats(cache, details)
223 except Exception as e:
224 status = HealthStatus.UNHEALTHY
225 message = f"Cache health check failed: {e}"
226 details["error"] = str(e)
228 return HealthCheckResult(
229 component_id=self.component_id,
230 component_name=self.component_name,
231 status=status,
232 check_type=check_type,
233 message=message,
234 details=details,
235 )
238class RoutesHealthCheck(FastBlocksHealthCheck):
239 """Health check for FastBlocks routing system."""
241 def __init__(self) -> None:
242 super().__init__(
243 component_id="routes",
244 component_name="Routing System",
245 )
247 def _check_routes_adapter(
248 self, routes: t.Any, details: dict[str, t.Any]
249 ) -> tuple[t.Any, str]: # Returns (status, message)
250 """Check routes adapter status and update details."""
251 if not hasattr(routes, "routes"):
252 return HealthStatus.DEGRADED, "Routes collection not available"
254 route_count = len(routes.routes) if routes.routes else 0
255 details["route_count"] = route_count
257 if route_count == 0:
258 return HealthStatus.DEGRADED, "No routes registered"
260 return HealthStatus.HEALTHY, f"{route_count} routes registered"
262 async def _perform_health_check(
263 self,
264 check_type: t.Any,
265 ) -> t.Any:
266 """Check routing system health."""
267 if not acb_health_available:
268 return None
270 details: dict[str, t.Any] = {}
271 status = HealthStatus.HEALTHY
272 message = "Routing system operational"
274 try:
275 # Try to get routes adapter
276 routes = await depends.get("routes")
278 if routes is None:
279 status = HealthStatus.DEGRADED
280 message = "Routes adapter not initialized"
281 else:
282 status, message = self._check_routes_adapter(routes, details)
284 except Exception as e:
285 status = HealthStatus.UNHEALTHY
286 message = f"Routes health check failed: {e}"
287 details["error"] = str(e)
289 return HealthCheckResult(
290 component_id=self.component_id,
291 component_name=self.component_name,
292 status=status,
293 check_type=check_type,
294 message=message,
295 details=details,
296 )
299class DatabaseHealthCheck(FastBlocksHealthCheck):
300 """Health check for database connectivity."""
302 def __init__(self) -> None:
303 super().__init__(
304 component_id="database",
305 component_name="Database",
306 )
308 async def _perform_health_check(
309 self,
310 check_type: t.Any,
311 ) -> t.Any:
312 """Check database health."""
313 if not acb_health_available:
314 return None
316 details: dict[str, t.Any] = {}
317 status = HealthStatus.HEALTHY
318 message = "Database operational"
320 try:
321 # Try to get sql adapter
322 sql = await depends.get("sql")
324 if sql is None:
325 status = HealthStatus.DEGRADED
326 message = "Database adapter not configured"
327 details["reason"] = "sql_adapter_not_available"
328 else:
329 # Try a simple database query
330 try:
331 # Most databases support SELECT 1 as a ping query
332 await sql.execute("SELECT 1")
333 details["connectivity_test"] = "passed"
335 # Check if we have connection pool info
336 if hasattr(sql, "get_connection_info"):
337 with suppress(Exception): # Connection info not critical
338 conn_info = await sql.get_connection_info()
339 details["connection_info"] = conn_info
341 except Exception as e:
342 status = HealthStatus.UNHEALTHY
343 message = f"Database query failed: {e}"
344 details["connectivity_test"] = "failed"
345 details["error"] = str(e)
347 except Exception as e:
348 status = HealthStatus.DEGRADED
349 message = f"Database health check failed: {e}"
350 details["error"] = str(e)
352 return HealthCheckResult(
353 component_id=self.component_id,
354 component_name=self.component_name,
355 status=status,
356 check_type=check_type,
357 message=message,
358 details=details,
359 )
362async def register_fastblocks_health_checks() -> bool:
363 """Register all FastBlocks components with ACB HealthService.
365 Returns:
366 True if registration successful, False if ACB HealthService unavailable
367 """
368 if not acb_health_available:
369 return False
371 try:
372 # Get ACB HealthService from the service registry
373 health_service = await depends.get("health_service")
375 if health_service is None:
376 return False
378 # Register all FastBlocks health checks
379 await health_service.register_component(TemplatesHealthCheck())
380 await health_service.register_component(CacheHealthCheck())
381 await health_service.register_component(RoutesHealthCheck())
382 await health_service.register_component(DatabaseHealthCheck())
384 return True
386 except Exception:
387 # Graceful degradation if registration fails
388 return False
391def _determine_overall_health_status(results: dict[str, t.Any]) -> str:
392 """Determine overall health status from individual component results."""
393 statuses = [r.get("status", "unknown") for r in results.values()]
395 if "unhealthy" in statuses or "critical" in statuses:
396 return "unhealthy"
397 elif "degraded" in statuses:
398 return "degraded"
399 elif all(s == "healthy" for s in statuses):
400 return "healthy"
401 return "unknown"
404async def _get_component_health_results(health_service: t.Any) -> dict[str, t.Any]:
405 """Get health results for all components."""
406 component_ids = ["templates", "cache", "routes", "database"]
407 results = {}
409 for component_id in component_ids:
410 try:
411 result = await health_service.get_component_health(component_id)
412 if result:
413 results[component_id] = result.to_dict()
414 except Exception:
415 results[component_id] = {
416 "status": "unknown",
417 "message": "Health check failed",
418 }
420 return results
423async def get_fastblocks_health_summary() -> dict[str, t.Any]:
424 """Get comprehensive health summary for all FastBlocks components.
426 Returns:
427 Dictionary with health status for each component
428 """
429 if not acb_health_available:
430 return {
431 "status": "unknown",
432 "message": "ACB HealthService not available",
433 "components": {},
434 }
436 try:
437 health_service = await depends.get("health_service")
439 if health_service is None:
440 return {
441 "status": "unknown",
442 "message": "ACB HealthService not initialized",
443 "components": {},
444 }
446 # Get health status for all registered components
447 results = await _get_component_health_results(health_service)
449 # Determine overall status
450 overall_status = _determine_overall_health_status(results)
452 return {
453 "status": overall_status,
454 "message": f"FastBlocks health status: {overall_status}",
455 "components": results,
456 }
458 except Exception as e:
459 return {
460 "status": "error",
461 "message": f"Health check failed: {e}",
462 "components": {},
463 }
466# Module metadata for ACB discovery
467MODULE_ID = UUID("01937d88-0000-7000-8000-000000000001")
468MODULE_STATUS = AdapterStatus.STABLE
470# Auto-register health checks on module import
471# Note: Registration happens during application startup via depends.set()
472# This ensures proper async context is available