Coverage for kwave/kWaveSimulation_helper/scale_source_terms_func.py: 12%
133 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 12:06 -0700
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 12:06 -0700
1import logging
2import math
4import numpy as np
6from kwave.kgrid import kWaveGrid
7from kwave.ksource import kSource
8from kwave.utils.dotdictionary import dotdict
11def scale_source_terms_func(
12 c0, dt, kgrid: kWaveGrid, source, p_source_pos_index, s_source_pos_index, u_source_pos_index, transducer_input_signal, flags: dotdict
13):
14 """
15 Subscript for the first-order k-Wave simulation functions to scale source terms to the correct units.
16 Args:
17 Returns:
19 """
20 dx, dy, dz = (
21 kgrid.dx,
22 kgrid.dy,
23 kgrid.dz,
24 )
26 # get the dimension size
27 N = kgrid.dim
29 if not check_conditions(flags.nonuniform_grid, flags.source_uy, flags.source_uz, flags.transducer_source):
30 return
32 # =========================================================================
33 # PRESSURE SOURCES
34 # =========================================================================
36 apply_pressure_source_correction(flags.source_p, flags.use_w_source_correction_p, source, dt)
38 scale_pressure_source(flags.source_p, source, kgrid, N, c0, dx, dt, p_source_pos_index, flags.nonuniform_grid)
40 # =========================================================================
41 # STRESS SOURCES
42 # =========================================================================
44 scale_stress_sources(source, c0, flags, dt, dx, N, s_source_pos_index)
46 # =========================================================================
47 # VELOCITY SOURCES
48 # =========================================================================
50 apply_velocity_source_corrections(flags.use_w_source_correction_u, flags.source_ux, flags.source_uy, flags.source_uz, source, dt)
52 scale_velocity_sources(flags, source, kgrid, c0, dt, dx, dy, dz, u_source_pos_index)
54 # =========================================================================
55 # TRANSDUCER SOURCE
56 # =========================================================================
57 transducer_input_signal = scale_transducer_source(flags.transducer_source, transducer_input_signal, c0, dt, dx, u_source_pos_index)
58 return transducer_input_signal
61def check_conditions(is_nonuniform_grid, is_source_uy, is_source_uz, is_transducer_source):
62 """
63 check for non-uniform grid and give error for source terms that haven't yet been implemented
64 Returns:
66 """
67 if is_nonuniform_grid and (is_source_uy or is_source_uz or is_transducer_source):
68 logging.log(logging.WARN, "source scaling not implemented for non-uniform grids with given source condition")
69 return False
70 return True
73def apply_pressure_source_correction(is_source_p, use_w_source_correction_p, source, dt):
74 """
75 apply k-space source correction expressed as a function of w
76 Args:
77 is_source_p:
78 use_w_source_correction_p:
79 source:
80 dt:
82 Returns:
84 """
85 if is_source_p and use_w_source_correction_p:
86 source.p = apply_source_correction(source.p, source.p_frequency_ref, dt)
89def scale_pressure_source(is_source_p, source, kgrid, N, c0, dx, dt, p_source_pos_index, is_nonuniform_grid):
90 """
91 scale the input pressure by 1/c0^2 (to convert to units of density), then
92 by 1/N (to split the input across the split density field). If the
93 pressure is injected as a mass source, also scale the pressure by
94 2*dt*c0/dx to account for the time step and convert to units of [kg/(m^3 s)]
95 Args:
96 is_source_p:
97 source:
98 kgrid:
99 N:
100 c0:
101 dx:
102 dt:
103 p_source_pos_index:
104 is_nonuniform_grid:
106 Returns:
108 """
109 if not is_source_p:
110 return
112 if source.p_mode == "dirichlet":
113 source.p = scale_pressure_source_dirichlet(source.p, c0, N, p_source_pos_index)
114 else:
115 if is_nonuniform_grid:
116 source.p = scale_pressure_source_nonuniform_grid(source.p, kgrid, c0, N, dt, p_source_pos_index)
118 else:
119 source.p = scale_pressure_source_uniform_grid(source.p, c0, N, dx, dt, p_source_pos_index)
122def scale_pressure_source_dirichlet(source_p, c0, N, p_source_pos_index):
123 if c0.size == 1:
124 # compute the scale parameter based on the homogeneous
125 # sound speed
126 source_p = source_p / (N * (c0**2))
128 else:
129 # compute the scale parameter separately for each source
130 # position based on the sound speed at that position
131 ind = range(source_p[:, 0].size)
132 mask = p_source_pos_index.flatten("F")[ind]
133 scale = 1.0 / (N * np.expand_dims(c0.ravel(order="F")[mask.ravel(order="F")] ** 2, axis=-1))
134 source_p[ind, :] *= scale
136 return source_p
139def scale_pressure_source_nonuniform_grid(source_p, kgrid, c0, N, dt, p_source_pos_index):
140 x = kgrid.x
141 xn = kgrid.xn
142 yn = kgrid.yn
143 zn = kgrid.zn
144 x_size, y_size, z_size = kgrid.size
146 # create empty matrix
147 grid_point_sep = np.zeros(x.size)
149 # compute averaged grid point separation map, the interior
150 # points are calculated using the average distance to all
151 # connected grid points (the edge values are not calculated
152 # assuming there are no source points in the PML)
153 if kgrid.dim == 1:
154 grid_point_sep[1:-1] = x_size * (xn[2:, 0] - xn[0:-2, 0]) / 2
155 elif kgrid.dim == 2:
156 grid_point_sep[1:-1, 1:-1] = x_size * (xn[2:, 1:-1] - xn[0:-2, 1:-1]) / 4 + y_size * (yn[1:-1, 2:] - yn[1:-1, 0:-2]) / 4
157 elif kgrid.dim == 3:
158 grid_point_sep[1:-1, 1:-1, 1:-1] = (
159 x_size * (xn[2:, 1:-1, 1:-1] - xn[0:-2, 1:-1, 1:-1]) / 6
160 + y_size * (yn[1:-1, 2:, 1:-1] - yn[1:-1, 0:-2, 1:-1]) / 6
161 + z_size * (zn[1:-1, 1:-1, 2:] - zn[1:-1, 1:-1, 0:-2]) / 6
162 )
164 # compute and apply scale parameter
165 for p_index in range(source_p.size[0]):
166 if c0.size == 1:
167 # compute the scale parameter based on the homogeneous sound speed
168 source_p[p_index, :] = source_p[p_index, :] * (2 * dt / (N * c0 * grid_point_sep[p_source_pos_index[p_index]]))
170 else:
171 # compute the scale parameter based on the sound speed at that position
172 source_p[p_index, :] = source_p[p_index, :] * (
173 2 * dt / (N * c0[p_source_pos_index[p_index]] * grid_point_sep[p_source_pos_index[p_index]])
174 )
175 return source_p
178def scale_pressure_source_uniform_grid(source_p, c0, N, dx, dt, p_source_pos_index):
179 if c0.size == 1:
180 # compute the scale parameter based on the homogeneous
181 # sound speed
182 source_p = source_p * (2 * dt / (N * c0 * dx))
184 else:
185 # compute the scale parameter separately for each source
186 # position based on the sound speed at that position
187 ind = range(source_p[:, 0].size)
188 mask = p_source_pos_index.flatten("F")[ind]
189 scale = (2.0 * dt) / (N * np.expand_dims(c0.ravel(order="F")[mask.ravel(order="F")], axis=-1) * dx)
190 source_p[ind, :] *= scale
191 return source_p
194def scale_stress_sources(source, c0, flags, dt, dx, N, s_source_pos_index):
195 """
196 scale the stress source by 1/N to divide amongst the split field
197 components, and if source.s_mode is not set to 'dirichlet', also scale by
198 2*dt*c0/dx to account for the time step and convert to units of
199 [kg/(m^3 s)] (note dx is used in all dimensions)
200 Args:
201 source:
202 c0:
203 flags:
204 dt:
205 dx:
206 N:
207 s_source_pos_index:
209 Returns:
211 """
212 source.sxx = scale_stress_source(source, c0, flags.source_sxx, flags.source_p0, source.sxx, dt, N, dx, s_source_pos_index)
213 source.syy = scale_stress_source(source, c0, flags.source_syy, flags.source_p0, source.syy, dt, N, dx, s_source_pos_index)
214 source.szz = scale_stress_source(source, c0, flags.source_szz, flags.source_p0, source.szz, dt, N, dx, s_source_pos_index)
215 source.sxy = scale_stress_source(source, c0, flags.source_sxy, True, source.sxy, dt, N, dx, s_source_pos_index)
216 source.sxz = scale_stress_source(source, c0, flags.source_sxz, True, source.sxz, dt, N, dx, s_source_pos_index)
217 source.syz = scale_stress_source(source, c0, flags.source_syz, True, source.syz, dt, N, dx, s_source_pos_index)
220def scale_stress_source(source, c0, is_source_exists, is_p0_exists, source_val, dt, N, dx, s_source_pos_index):
221 if is_source_exists:
222 if source.s_mode == "dirichlet" or is_p0_exists:
223 source_val = source_val / N
224 else:
225 if c0.size == 1:
226 # compute the scale parameter based on the homogeneous sound
227 # speed
228 source_val = source_val * (2 * dt * c0 / (N * dx))
230 else:
231 # compute the scale parameter separately for each source
232 # position based on the sound speed at that position
233 s_index = range(source_val.size[0])
234 source_val[s_index, :] = source_val[s_index, :] * (2 * dt * c0[s_source_pos_index[s_index]] / (N * dx))
235 return source_val
238def apply_velocity_source_corrections(
239 use_w_source_correction_u: bool, is_ux_exists: bool, is_uy_exists: bool, is_uz_exists: bool, source: kSource, dt: float
240):
241 """
242 apply k-space source correction expressed as a function of w
243 Args:
244 use_w_source_correction_u:
245 is_ux_exists:
246 is_uy_exists:
247 is_uz_exists:
248 source:
249 dt:
251 Returns:
253 """
254 if not use_w_source_correction_u:
255 return
257 if is_ux_exists:
258 source.ux = apply_source_correction(source.ux, source.u_frequency_ref, dt)
260 if is_uy_exists:
261 source.uy = apply_source_correction(source.uy, source.u_frequency_ref, dt)
263 if is_uz_exists:
264 source.uz = apply_source_correction(source.uz, source.u_frequency_ref, dt)
267def apply_source_correction(source_val, frequency_ref, dt):
268 return source_val * math.cos(2 * math.pi * frequency_ref * dt / 2)
271def scale_velocity_sources(flags, source, kgrid, c0, dt, dx, dy, dz, u_source_pos_index):
272 source.ux = scale_velocity_source_x(
273 flags.source_ux, source.u_mode, source.ux, kgrid, c0, dt, dx, u_source_pos_index, flags.nonuniform_grid
274 )
275 source.uy = scale_velocity_source(flags.source_uy, source.u_mode, source.uy, c0, dt, u_source_pos_index, dy)
276 source.uz = scale_velocity_source(flags.source_uz, source.u_mode, source.uz, c0, dt, u_source_pos_index, dz)
279def scale_velocity_source_x(is_source_ux, source_u_mode, source_val, kgrid, c0, dt, dx, u_source_pos_index, is_nonuniform_grid):
280 """
281 if source.u_mode is not set to 'dirichlet', scale the x-direction
282 velocity source terms by 2*dt*c0/dx to account for the time step and
283 convert to units of [m/s^2]
284 Returns:
286 """
287 if not is_source_ux or source_u_mode == "dirichlet":
288 return
290 if is_nonuniform_grid:
291 source_val = scale_velocity_source_nonuniform(is_source_ux, source_u_mode, kgrid, source_val, c0, dt, u_source_pos_index)
292 else:
293 source_val = scale_velocity_source(is_source_ux, source_u_mode, source_val, c0, dt, u_source_pos_index, dx)
294 return source_val
297def scale_velocity_source(is_source, source_u_mode, source_val, c0, dt, u_source_pos_index, d_direction):
298 """
299 if source.u_mode is not set to 'dirichlet', scale the d_direction
300 velocity source terms by 2*dt*c0/dz to account for the time step and
301 convert to units of [m/s^2]
302 Args:
303 is_source:
304 source_u_mode:
305 source_val:
306 c0:
307 dt:
308 u_source_pos_index:
309 d_direction:
311 Returns:
313 """
314 if not is_source or source_u_mode == "dirichlet":
315 return source_val
317 if c0.size == 1:
318 # compute the scale parameter based on the homogeneous sound speed
319 source_val = source_val * (2 * c0 * dt / d_direction)
320 else:
321 # compute the scale parameter separately for each source position
322 # based on the sound speed at that position
323 u_index = range(source_val.size[0])
324 source_val[u_index, :] = source_val[u_index, :] * (2 * c0[u_source_pos_index[u_index]] * dt / d_direction)
325 return source_val
328def scale_velocity_source_nonuniform(is_source, source_u_mode, kgrid, source_val, c0, dt, u_source_pos_index):
329 """
330 if source.u_mode is not set to 'dirichlet', scale the d_direction
331 velocity source terms by 2*dt*c0/dz to account for the time step and
332 convert to units of [m/s^2]
333 Args:
334 is_source:
335 source_u_mode:
336 kgrid:
337 source_val:
338 c0:
339 dt:
340 u_source_pos_index:
342 Returns:
344 """
345 if not is_source or source_u_mode == "dirichlet":
346 return source_val
348 # create empty matrix
349 x = kgrid.x
350 xn = kgrid.xn
351 x_size = kgrid.size[0]
352 grid_point_sep = np.zeros_like(x)
354 # compute averaged grid point separation map, the interior
355 # points are calculated using the average distance to all
356 # connected grid points (the edge values are not calculated
357 # assuming there are no source points in the PML)
358 grid_point_sep[1:-1, :, :] = x_size * (xn[2:, :, :] - xn[1:-2, :, :]) / 2
360 # compute and apply scale parameter
361 for u_index in range(source_val.size[0]):
362 if c0.size == 1:
363 # compute the scale parameter based on the homogeneous sound speed
364 source_val[u_index, :] = source_val[u_index, :] * (2 * c0 * dt / (grid_point_sep[u_source_pos_index[u_index]]))
365 else:
366 # compute the scale parameter based on the sound speed at that position
367 source_val[u_index, :] = source_val[u_index, :] * (
368 2 * c0[u_source_pos_index[u_index]] * dt / (grid_point_sep[u_source_pos_index[u_index]])
369 )
370 return source_val
373def scale_transducer_source(is_transducer_source, transducer_input_signal, c0, dt, dx, u_source_pos_index):
374 """
375 scale the transducer source term by 2*dt*c0/dx to account for the time
376 step and convert to units of [m/s^2]
377 Args:
378 is_transducer_source:
379 transducer_input_signal:
380 c0:
381 dt:
382 dx:
383 u_source_pos_index:
385 Returns:
387 """
388 if is_transducer_source:
389 if c0.size == 1:
390 transducer_input_signal = transducer_input_signal * (2 * c0 * dt / dx)
391 else:
392 # compute the scale parameter based on the average sound speed at the
393 # transducer positions (only one input signal is used to drive the transducer)
394 transducer_input_signal = transducer_input_signal * (2 * (np.mean(c0.flatten(order="F")[u_source_pos_index - 1])) * dt / dx)
395 return transducer_input_signal