Coverage for kwave/reconstruction/time_reversal.py: 97%

50 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-24 12:06 -0700

1""" 

2Time reversal reconstruction for photoacoustic imaging. 

3 

4This class handles time reversal reconstruction of initial pressure distribution 

5from sensor data. It supports both 2D and 3D simulations and automatically 

6applies compensation for half-plane recording. 

7 

8Example: 

9 >>> tr = TimeReversal(kgrid, medium, sensor) 

10 >>> p0_recon = tr(kspaceFirstOrder2D, simulation_options, execution_options) 

11""" 

12 

13from typing import Any, Callable, Dict 

14 

15import numpy as np 

16 

17from kwave.kgrid import kWaveGrid 

18from kwave.kmedium import kWaveMedium 

19from kwave.ksensor import kSensor 

20from kwave.ksource import kSource 

21from kwave.options import SimulationExecutionOptions, SimulationOptions 

22 

23 

24class TimeReversal: 

25 """ 

26 Time reversal reconstruction for photoacoustic imaging. 

27 

28 This class handles time reversal reconstruction of initial pressure distribution 

29 from sensor data. It supports both 2D and 3D simulations and automatically 

30 applies compensation for half-plane recording. 

31 

32 Args: 

33 kgrid: Computational grid for the simulation 

34 medium: Medium properties for wave propagation 

35 sensor: Sensor object containing the sensor mask 

36 compensation_factor: Factor to compensate for half-plane recording (default: 2.0) 

37 

38 Raises: 

39 ValueError: If inputs are invalid for time reversal 

40 

41 Note: 

42 Future versions may support: 

43 - GPU acceleration via use_gpu parameter 

44 - Differentiable operations via differentiable parameter 

45 - Custom boundary conditions via boundary_condition parameter 

46 - Elastic wave propagation via elastic parameter 

47 """ 

48 

49 def __init__(self, kgrid: kWaveGrid, medium: kWaveMedium, sensor: kSensor, compensation_factor: float = 2.0) -> None: 

50 """ 

51 Initialize time reversal reconstruction. 

52 

53 Args: 

54 kgrid: Computational grid for the simulation 

55 medium: Medium properties for wave propagation 

56 sensor: Sensor object containing the sensor mask 

57 compensation_factor: Factor to compensate for half-plane recording (default: 2.0) 

58 

59 Raises: 

60 ValueError: If inputs are invalid for time reversal 

61 """ 

62 self.kgrid = kgrid 

63 self.medium = medium 

64 self.sensor = sensor 

65 self.compensation_factor = compensation_factor 

66 self._source = None 

67 self._new_sensor = None 

68 

69 # Validate inputs 

70 if sensor.mask is None: 

71 raise ValueError("Sensor mask must be set for time reversal. Use sensor.mask = ...") 

72 

73 # Check for valid time array 

74 if kgrid.t_array is None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 raise ValueError("t_array must be explicitly set for time reversal") 

76 if isinstance(kgrid.t_array, str): 

77 if kgrid.t_array == "auto": 

78 raise ValueError("t_array must be explicitly set for time reversal") 

79 else: 

80 raise ValueError(f"Invalid t_array value: {kgrid.t_array}") 

81 

82 # Validate compensation factor 

83 if compensation_factor <= 0: 

84 raise ValueError("compensation_factor must be positive") 

85 

86 # Validate sensor mask has at least one active point 

87 if not np.any(sensor.mask): 

88 raise ValueError("Sensor mask must have at least one active point") 

89 

90 # Validate sensor mask shape matches grid dimensions 

91 if not np.array_equal(sensor.mask.shape, kgrid.N): 

92 raise ValueError(f"Sensor mask shape {sensor.mask.shape} does not match grid dimensions {kgrid.N}") 

93 

94 def __call__( 

95 self, simulation_function: Callable, simulation_options: SimulationOptions, execution_options: SimulationExecutionOptions 

96 ) -> np.ndarray: 

97 """ 

98 Run time reversal reconstruction. 

99 

100 Args: 

101 simulation_function: Function to run the simulation (e.g., kspaceFirstOrder2D) 

102 simulation_options: Options for the simulation 

103 execution_options: Options for execution 

104 

105 Returns: 

106 Reconstructed initial pressure distribution 

107 

108 Raises: 

109 ValueError: If simulation_function, simulation_options, or execution_options are None, 

110 or if sensor does not have recorded pressure data 

111 """ 

112 if simulation_function is None: 

113 raise ValueError("simulation_function must be provided") 

114 if simulation_options is None: 

115 raise ValueError("simulation_options must be provided") 

116 if execution_options is None: 

117 raise ValueError("execution_options must be provided") 

118 

119 # Validate sensor has recorded pressure data 

120 if not hasattr(self.sensor, "recorded_pressure") or self.sensor.recorded_pressure is None: 

121 raise ValueError("Sensor must have recorded pressure data. Run a forward simulation first.") 

122 

123 # Create source and sensor for reconstruction 

124 self._source = kSource() 

125 self._source.p_mask = self.sensor.mask # Use sensor mask as source mask 

126 self._source.p = np.flip(self.sensor.recorded_pressure, axis=1) # Time-reverse the recorded pressure 

127 self._source.p_mode = "dirichlet" # Use dirichlet boundary condition 

128 

129 self._new_sensor = kSensor(mask=self.sensor.mask) 

130 

131 # Run reconstruction 

132 result = simulation_function(self.kgrid, self._source, self._new_sensor, self.medium, simulation_options, execution_options) 

133 

134 # Process result 

135 if isinstance(result, dict): 

136 p0_recon = result["p_final"] 

137 else: 

138 p0_recon = result 

139 

140 # Apply compensation factor and positivity condition 

141 p0_recon = self.compensation_factor * p0_recon 

142 p0_recon[p0_recon < 0] = 0 # Apply positivity condition 

143 

144 return p0_recon