Coverage for kwave/reconstruction/time_reversal.py: 97%
50 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 12:06 -0700
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 12:06 -0700
1"""
2Time reversal reconstruction for photoacoustic imaging.
4This class handles time reversal reconstruction of initial pressure distribution
5from sensor data. It supports both 2D and 3D simulations and automatically
6applies compensation for half-plane recording.
8Example:
9 >>> tr = TimeReversal(kgrid, medium, sensor)
10 >>> p0_recon = tr(kspaceFirstOrder2D, simulation_options, execution_options)
11"""
13from typing import Any, Callable, Dict
15import numpy as np
17from kwave.kgrid import kWaveGrid
18from kwave.kmedium import kWaveMedium
19from kwave.ksensor import kSensor
20from kwave.ksource import kSource
21from kwave.options import SimulationExecutionOptions, SimulationOptions
24class TimeReversal:
25 """
26 Time reversal reconstruction for photoacoustic imaging.
28 This class handles time reversal reconstruction of initial pressure distribution
29 from sensor data. It supports both 2D and 3D simulations and automatically
30 applies compensation for half-plane recording.
32 Args:
33 kgrid: Computational grid for the simulation
34 medium: Medium properties for wave propagation
35 sensor: Sensor object containing the sensor mask
36 compensation_factor: Factor to compensate for half-plane recording (default: 2.0)
38 Raises:
39 ValueError: If inputs are invalid for time reversal
41 Note:
42 Future versions may support:
43 - GPU acceleration via use_gpu parameter
44 - Differentiable operations via differentiable parameter
45 - Custom boundary conditions via boundary_condition parameter
46 - Elastic wave propagation via elastic parameter
47 """
49 def __init__(self, kgrid: kWaveGrid, medium: kWaveMedium, sensor: kSensor, compensation_factor: float = 2.0) -> None:
50 """
51 Initialize time reversal reconstruction.
53 Args:
54 kgrid: Computational grid for the simulation
55 medium: Medium properties for wave propagation
56 sensor: Sensor object containing the sensor mask
57 compensation_factor: Factor to compensate for half-plane recording (default: 2.0)
59 Raises:
60 ValueError: If inputs are invalid for time reversal
61 """
62 self.kgrid = kgrid
63 self.medium = medium
64 self.sensor = sensor
65 self.compensation_factor = compensation_factor
66 self._source = None
67 self._new_sensor = None
69 # Validate inputs
70 if sensor.mask is None:
71 raise ValueError("Sensor mask must be set for time reversal. Use sensor.mask = ...")
73 # Check for valid time array
74 if kgrid.t_array is None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 raise ValueError("t_array must be explicitly set for time reversal")
76 if isinstance(kgrid.t_array, str):
77 if kgrid.t_array == "auto":
78 raise ValueError("t_array must be explicitly set for time reversal")
79 else:
80 raise ValueError(f"Invalid t_array value: {kgrid.t_array}")
82 # Validate compensation factor
83 if compensation_factor <= 0:
84 raise ValueError("compensation_factor must be positive")
86 # Validate sensor mask has at least one active point
87 if not np.any(sensor.mask):
88 raise ValueError("Sensor mask must have at least one active point")
90 # Validate sensor mask shape matches grid dimensions
91 if not np.array_equal(sensor.mask.shape, kgrid.N):
92 raise ValueError(f"Sensor mask shape {sensor.mask.shape} does not match grid dimensions {kgrid.N}")
94 def __call__(
95 self, simulation_function: Callable, simulation_options: SimulationOptions, execution_options: SimulationExecutionOptions
96 ) -> np.ndarray:
97 """
98 Run time reversal reconstruction.
100 Args:
101 simulation_function: Function to run the simulation (e.g., kspaceFirstOrder2D)
102 simulation_options: Options for the simulation
103 execution_options: Options for execution
105 Returns:
106 Reconstructed initial pressure distribution
108 Raises:
109 ValueError: If simulation_function, simulation_options, or execution_options are None,
110 or if sensor does not have recorded pressure data
111 """
112 if simulation_function is None:
113 raise ValueError("simulation_function must be provided")
114 if simulation_options is None:
115 raise ValueError("simulation_options must be provided")
116 if execution_options is None:
117 raise ValueError("execution_options must be provided")
119 # Validate sensor has recorded pressure data
120 if not hasattr(self.sensor, "recorded_pressure") or self.sensor.recorded_pressure is None:
121 raise ValueError("Sensor must have recorded pressure data. Run a forward simulation first.")
123 # Create source and sensor for reconstruction
124 self._source = kSource()
125 self._source.p_mask = self.sensor.mask # Use sensor mask as source mask
126 self._source.p = np.flip(self.sensor.recorded_pressure, axis=1) # Time-reverse the recorded pressure
127 self._source.p_mode = "dirichlet" # Use dirichlet boundary condition
129 self._new_sensor = kSensor(mask=self.sensor.mask)
131 # Run reconstruction
132 result = simulation_function(self.kgrid, self._source, self._new_sensor, self.medium, simulation_options, execution_options)
134 # Process result
135 if isinstance(result, dict):
136 p0_recon = result["p_final"]
137 else:
138 p0_recon = result
140 # Apply compensation factor and positivity condition
141 p0_recon = self.compensation_factor * p0_recon
142 p0_recon[p0_recon < 0] = 0 # Apply positivity condition
144 return p0_recon