Coverage for kwave/utils/signals.py: 6%
330 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 12:06 -0700
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 12:06 -0700
1import logging
2from math import floor
4import numpy as np
5import scipy
6from beartype import beartype as typechecker
7from beartype.typing import List, Optional, Tuple, Union
8from jaxtyping import Bool, Int
9from numpy.fft import fft, ifft, ifftshift
11import kwave.utils.typing as kt
13from .conversion import freq2wavenumber
14from .data import scale_SI
15from .mapgen import ndgrid
16from .math import gaussian, sinc
17from .matlab import matlab_mask, rem, unflatten_matlab_mask
18from .matrix import broadcast_axis, num_dim
21def add_noise(signal: np.ndarray, snr: float, mode="rms"):
22 """
23 Add Gaussian noise to a signal.
25 Args:
26 signal: input signal
27 snr: desired signal snr (signal-to-noise ratio) in decibels after adding noise
28 mode: 'rms' (default) or 'peak'
30 Returns:
31 Signal with augmented with noise. This behaviour differs from the k-Wave MATLAB implementation in that the SNR is nor returned.
33 """
34 if mode == "rms":
35 reference = np.sqrt(np.mean(signal**2))
36 elif mode == "peak":
37 reference = np.max(signal)
38 else:
39 raise ValueError(f"Unknown parameter '{mode}' for input mode.")
41 # calculate the standard deviation of the Gaussian noise
42 std_dev = reference / (10 ** (snr / 20))
44 # calculate noise
45 noise = std_dev * np.random.randn(*signal.shape)
47 # check the snr
48 noise_rms = np.sqrt(np.mean(noise**2))
49 snr = 20.0 * np.log10(reference / noise_rms)
51 # add noise to the recorded sensor data
52 signal = signal + noise
54 return signal
57@typechecker
58def get_win(
59 N: Union[int, np.ndarray, Tuple[int, int], Tuple[int, int, int], List[Int[kt.ScalarLike, ""]]],
60 # TODO: replace and refactor for scipy.signal.get_window
61 # https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.get_window.html#scipy.signal.get_window
62 type_: str, # TODO change this to enum in the future
63 plot_win: bool = False,
64 param: Optional[float] = None,
65 rotation: bool = False,
66 symmetric: Union[bool, Bool[np.ndarray, "N"]] = True,
67 square: bool = False,
68):
69 """
71 A frequency domain windowing function of specified type and dimensions.
73 Args:
74 N: Number of samples, [Nx] for 1D, [Nx, Ny] for 2D, [Nx, Ny, Nz] for 3D.
75 type_: Window type. Supported values: 'Bartlett', 'Bartlett-Hanning', 'Blackman', 'Blackman-Harris',
76 'Blackman-Nuttall', 'Cosine', 'Flattop', 'Gaussian', 'HalfBand',
77 'Hamming', 'Hanning', 'Kaiser', 'Lanczos', 'Nuttall',
78 'Rectangular', 'Triangular', 'Tukey'.
79 plot_win: Boolean to display the window (default = False).
80 param: Control parameter for Tukey, Blackman, Gaussian, and Kaiser windows: taper ratio (Tukey),
81 alpha (Blackman, Kaiser), standard deviation (Gaussian)
82 (default = 0.5, 0.16, 3 respectively).
83 rotation: Boolean to create windows via rotation or outer product (default = False).
84 symmetric: Boolean to make the window symmetrical (default = True).
85 Can also be a vector defining the symmetry in each matrix dimension.
86 square: Boolean to force the window to be square (default = False).
88 Returns:
89 A tuple of (win, cg) where win is the window and cg is the coherent gain of the window.
90 """
92 def cosine_series(n: int, N: int, coeffs: List[float]) -> np.ndarray:
93 """
95 Sub-function to calculate a summed filter cosine series.
97 Args:
98 n: An integer representing the current index in the series.
99 N: An integer representing the total number of terms in the series.
100 coeffs: A list of floats representing the coefficients of the cosine terms.
102 Returns:
103 A numpy ndarray containing the calculated series.
105 """
106 series = coeffs[0]
107 for index in range(1, len(coeffs)):
108 series = series + (-1) ** index * coeffs[index] * np.cos(index * 2 * np.pi * n / (N - 1))
109 return series.T
111 # Check if N is either `int` or `list of ints`
112 # assert isinstance(N, int) or isinstance(N, list) or isinstance(N, np.ndarray)
113 N = np.array(N, dtype=int)
114 N = N if np.size(N) > 1 else N.item()
116 # Check if symmetric is either `bool` or `list of bools`
117 # assert isinstance(symmetric, int) or isinstance(symmetric, list)
118 symmetric = np.array(symmetric, dtype=bool)
120 # Set default value for `param` if type is one of the special ones
121 assert not plot_win, NotImplementedError("Plotting is not implemented.")
122 if type_ == "Tukey":
123 if param is None:
124 param = 0.5
125 param = np.clip(param, a_min=0, a_max=1)
126 elif type_ == "Blackman":
127 if param is None:
128 param = 0.16
129 param = np.clip(param, a_min=0, a_max=1)
130 elif type_ == "Gaussian":
131 if param is None:
132 param = 0.5
133 param = np.clip(param, a_min=0, a_max=0.5)
134 elif type_ == "Kaiser":
135 if param is None:
136 param = 3
137 param = np.clip(param, a_min=0, a_max=100)
139 # if a non-symmetrical window is required, enlarge the window size (note,
140 # this expands each dimension individually if symmetric is a vector)
141 N = N + 1 * (1 - symmetric.astype(int))
143 # if a square window is required, replace grid sizes with smallest size and
144 # store a copy of the original size
145 if square and (N.size != 1):
146 N_orig = np.copy(N)
147 L = min(N)
148 N[:] = L
150 # create the window
151 if N.size == 1:
152 # TODO: what should this behaviour be if N is a list of ints? make windows of multiple lengths?
153 n = np.arange(0, N)
155 # TODO: find failure cases in test suite when N is zero.
156 # assert np.all(N) > 1, 'Signal length N must be greater than 1'
158 if type_ == "Bartlett":
159 win = (2 / (N - 1) * ((N - 1) / 2 - abs(n - (N - 1) / 2))).T
160 elif type_ == "Bartlett-Hanning":
161 win = (0.62 - 0.48 * abs(n / (N - 1) - 1 / 2) - 0.38 * np.cos(2 * np.pi * n / (N - 1))).T
162 elif type_ == "Blackman":
163 win = cosine_series(n, N, [(1 - param) / 2, 0.5, param / 2])
164 elif type_ == "Blackman-Harris":
165 win = cosine_series(n, N, [0.35875, 0.48829, 0.14128, 0.01168])
166 elif type_ == "Blackman-Nuttall":
167 win = cosine_series(n, N, [0.3635819, 0.4891775, 0.1365995, 0.0106411])
168 elif type_ == "Cosine":
169 win = (np.cos(np.pi * n / (N - 1) - np.pi / 2)).T
170 elif type_ == "Flattop":
171 win = cosine_series(n, N, [0.21557895, 0.41663158, 0.277263158, 0.083578947, 0.006947368])
172 elif type_ == "Gaussian":
173 win = (np.exp(-0.5 * ((n - (N - 1) / 2) / (param * (N - 1) / 2)) ** 2)).T
174 elif type_ == "HalfBand":
175 win = np.ones(N)
176 # why not to just round? => because rounding 0.5 introduces unexpected behaviour
177 # round(0.5) should be 1 but it is 0
178 ramp_length = round(N / 4 + 1e-8)
179 ramp = (
180 1 / 2
181 + 9 / 16 * np.cos(np.pi * np.arange(1, ramp_length + 1) / (2 * ramp_length))
182 - 1 / 16 * np.cos(3 * np.pi * np.arange(1, ramp_length + 1) / (2 * ramp_length))
183 )
184 if ramp_length > 0:
185 win[0:ramp_length] = np.flip(ramp)
186 win[-ramp_length:] = ramp
187 elif type_ == "Hamming":
188 win = (0.54 - 0.46 * np.cos(2 * np.pi * n / (N - 1))).T
189 elif type_ == "Hanning":
190 win = (0.5 - 0.5 * np.cos(2 * np.pi * n / (N - 1))).T
191 elif type_ == "Kaiser":
192 part_1 = scipy.special.iv(0, np.pi * param * np.sqrt(1 - (2 * n / (N - 1) - 1) ** 2))
193 part_2 = scipy.special.iv(0, np.pi * param)
194 win = part_1 / part_2
195 elif type_ == "Lanczos":
196 win = 2 * np.pi * n / (N - 1) - np.pi
197 win = sinc(win + 1e-12).T
198 elif type_ == "Nuttall":
199 win = cosine_series(n, N, [0.3635819, 0.4891775, 0.1365995, 0.0106411])
200 elif type_ == "Rectangular":
201 win = np.ones(N)
202 elif type_ == "Triangular":
203 win = (2 / N * (N / 2 - abs(n - (N - 1) / 2))).T
204 elif type_ == "Tukey":
205 win = np.ones((N, 1))
206 index = np.arange(0, (N - 1) * param / 2 + 1e-8)
207 param = param * N
208 win[0 : len(index)] = 0.5 * (1 + np.cos(2 * np.pi / param * (index - param / 2)))[:, None]
209 win[np.arange(-1, -len(index) - 1, -1)] = win[0 : len(index)]
210 win = win.squeeze(axis=-1)
211 else:
212 raise ValueError(f"Unknown window type: {type_}")
214 # trim the window if required
215 if not symmetric:
216 N -= 1
217 win = win[0:N]
218 win = np.expand_dims(win, axis=-1)
220 # calculate the coherent gain
221 cg = win.sum() / N
222 elif N.size == 2:
223 # create the 2D window
224 if rotation:
225 # create the window in one dimension using getWin recursively
226 L = max(N)
227 win_lin, _ = get_win(int(L), type_, param=param)
228 win_lin = np.squeeze(win_lin)
230 # create the reference axis
231 radius = (L - 1) / 2
232 ll = np.linspace(-radius, radius, L)
234 # create the 2D window using rotation
235 xx = np.linspace(-radius, radius, N[0])
236 yy = np.linspace(-radius, radius, N[1])
237 [x, y] = ndgrid(xx, yy)
238 r = np.sqrt(x**2 + y**2)
239 r[r > radius] = radius
240 interp_func = scipy.interpolate.interp1d(ll, win_lin)
241 win = interp_func(r)
242 win[r <= radius] = interp_func(r[r <= radius])
244 else:
245 # create the window in each dimension using getWin recursively
246 win_x, _ = get_win(int(N[0]), type_, param=param)
247 win_y, _ = get_win(int(N[1]), type_, param=param)
249 # create the 2D window using the outer product
250 win = (win_y * win_x.T).T
252 # trim the window if required
253 N = N - 1 * (1 - np.array(symmetric).astype(int))
254 win = win[0 : N[0], 0 : N[1]]
256 # calculate the coherent gain
257 cg = win.sum() / np.prod(N)
258 elif N.size == 3:
259 # create the 3D window
260 if rotation:
261 # create the window in one dimension using getWin recursively
262 L = N.max()
263 win_lin, _ = get_win(int(L), type_, param=param)
265 # create the reference axis
266 radius = (L - 1) / 2
267 ll = np.linspace(-radius, radius, L)
269 # create the 3D window using rotation
270 xx = np.linspace(-radius, radius, N[0])
271 yy = np.linspace(-radius, radius, N[1])
272 zz = np.linspace(-radius, radius, N[2])
273 [x, y, z] = ndgrid(xx, yy, zz)
274 r = np.sqrt(x**2 + y**2 + z**2)
275 r[r > radius] = radius
277 win_lin = np.squeeze(win_lin)
278 interp_func = scipy.interpolate.interp1d(ll, win_lin)
279 win = interp_func(r)
280 win[r <= radius] = interp_func(r[r <= radius])
282 else:
283 # create the window in each dimension using getWin recursively
284 win_x, _ = get_win(int(N[0]), type_, param=param)
285 win_y, _ = get_win(int(N[1]), type_, param=param)
286 win_z, _ = get_win(int(N[2]), type_, param=param)
288 # create the 2D window using the outer product
289 win_2D = win_x * win_z.T
291 # create the 3D window
292 win = np.zeros((N[0], N[1], N[2]))
293 for index in range(0, N[1]):
294 win[:, index, :] = win_2D[:, :] * win_y[index]
296 # trim the window if required
297 N = N - 1 * (1 - np.array(symmetric).astype(int))
298 win = win[0 : N[0], 0 : N[1], 0 : N[2]]
300 # calculate the coherent gain
301 cg = win.sum() / np.prod(N)
302 else:
303 raise ValueError("Invalid input for N, only 1-, 2-, and 3-D windows are supported.")
305 # enlarge the window if required
306 if square and (N.size != 1):
307 L = N[0]
308 win_sq = win
309 win = np.zeros(N_orig)
310 if N.size == 2:
311 index1 = round((N[0] - L) / 2)
312 index2 = round((N[1] - L) / 2)
313 win[index1 : (index1 + L), index2 : (index2 + L)] = win_sq
314 elif N.size == 3:
315 index1 = floor((N_orig[0] - L) / 2)
316 index2 = floor((N_orig[1] - L) / 2)
317 index3 = floor((N_orig[2] - L) / 2)
318 win[index1 : index1 + L, index2 : index2 + L, index3 : index3 + L] = win_sq
320 return win, cg
323def tone_burst(sample_freq, signal_freq, num_cycles, envelope="Gaussian", plot_signal=False, signal_length=0, signal_offset=0):
324 """
325 Create an enveloped single frequency tone burst.
327 Args:
328 sample_freq: sampling frequency in Hz
329 signal_freq: frequency of the tone burst signal in Hz
330 num_cycles: number of sinusoidal oscillations
331 envelope: Envelope used to taper the tone burst. Valid inputs are:
332 - 'Gaussian' (the default)
333 - 'Rectangular'
334 - [num_ring_up_cycles, num_ring_down_cycles]
335 The last option generates a continuous wave signal with a cosine taper of the specified length at the beginning and end.
336 plot: Boolean controlling whether the created tone burst is plotted.
337 signal_length: Signal length in number of samples. If longer than the tone burst length, the signal is appended with zeros.
338 signal_offset: Signal offset before the tone burst starts in number of samples.
339 If an array is given, a matrix of tone bursts is created where each row corresponds to
340 a tone burst for each value of the 'SignalOffset'.
342 Returns:
343 created tone burst
345 """
346 assert isinstance(signal_offset, int) or isinstance(signal_offset, np.ndarray), "signal_offset must be integer or array of integers"
347 assert isinstance(signal_length, int), "signal_length must be integer"
349 # calculate the temporal spacing
350 dt = 1 / sample_freq # [s]
352 # create the tone burst
353 tone_length = num_cycles / signal_freq # [s]
354 # We want to include the endpoint but only if it's divisible by the step-size.
355 # Modulo operator is not stable, so multiple conditions included.
356 # if ( (tone_length % dt) < 1e-18 or (np.abs(tone_length % dt - dt) < 1e-18) ):
357 if rem(tone_length, dt) < 1e-18:
358 tone_t = np.linspace(0, tone_length, int(tone_length / dt) + 1)
359 else:
360 tone_t = np.arange(0, tone_length, dt)
362 tone_burst = np.sin(2 * np.pi * signal_freq * tone_t)
363 tone_index = np.round(signal_offset)
365 # check for ring up and ring down input
366 if isinstance(envelope, list) or isinstance(envelope, np.ndarray):
367 num_ring_up_cycles, num_ring_down_cycles = envelope
369 # check signal is long enough for ring up and down
370 assert num_cycles >= (
371 num_ring_up_cycles + num_ring_down_cycles
372 ), "Input num_cycles must be longer than num_ring_up_cycles + num_ring_down_cycles."
374 # get period
375 period = 1 / signal_freq
377 # create x-axis for ramp between 0 and pi
378 up_ramp_length_points = round(num_ring_up_cycles * period / dt)
379 down_ramp_length_points = round(num_ring_down_cycles * period / dt)
380 up_ramp_axis = np.arange(0, np.pi + 1e-8, np.pi / (up_ramp_length_points - 1))
381 down_ramp_axis = np.arange(0, np.pi + 1e-8, np.pi / (down_ramp_length_points - 1))
383 # create ramp using a shifted cosine
384 up_ramp = (-np.cos(up_ramp_axis) + 1) * 0.5
385 down_ramp = (np.cos(down_ramp_axis) + 1) * 0.5
387 # apply the ramps
388 tone_burst[0:up_ramp_length_points] = tone_burst[0:up_ramp_length_points] * up_ramp
389 tone_burst[-down_ramp_length_points:] = tone_burst[-down_ramp_length_points:] * down_ramp
391 else:
392 # create the envelope
393 if envelope == "Gaussian":
394 x_lim = 3
395 window_x = np.arange(-x_lim, x_lim + 1e-8, 2 * x_lim / (len(tone_burst) - 1))
396 window = gaussian(window_x, 1, 0, 1)
397 elif envelope == "Rectangular":
398 window = np.ones_like(tone_burst)
399 elif envelope == "RingUpDown":
400 raise NotImplementedError("RingUpDown not yet implemented")
401 else:
402 raise ValueError(f"Unknown envelope {envelope}.")
404 # apply the envelope
405 tone_burst = tone_burst * window
407 # force the ends to be zero by applying a second window
408 if envelope == "Gaussian":
409 tone_burst = tone_burst * np.squeeze(get_win(len(tone_burst), type_="Tukey", param=0.05)[0])
411 # Convert tone_index and signal_offset to numpy arrays
412 signal_offset = np.array(signal_offset)
414 # Determine the length of the signal array
415 signal_length = max(signal_length, signal_offset.max() + len(tone_burst))
417 # Create the signal array with the correct size
418 signal = np.zeros((np.atleast_1d(signal_offset).size, signal_length))
420 # Add the tone burst to the signal array
421 tone_index = np.atleast_1d(tone_index)
423 if tone_index.size == 1:
424 tone_index = int(np.squeeze(tone_index))
425 signal[:, tone_index : tone_index + len(tone_burst)] = tone_burst.T
426 else:
427 for i, idx in enumerate(tone_index):
428 signal[i, int(idx) : int(idx) + len(tone_burst)] = tone_burst
430 # plot the signal if required
431 if plot_signal:
432 raise NotImplementedError
434 return signal
437def reorder_sensor_data(kgrid, sensor, sensor_data: np.ndarray) -> np.ndarray:
438 """
439 Reorders the sensor data based on the coordinates of the sensor points.
441 Args:
442 kgrid: The k-Wave grid object.
443 sensor: The k-Wave sensor object.
444 sensor_data: The sensor data to be reordered.
446 Returns:
447 np.ndarray of the reordered sensor data.
449 Raises:
450 ValueError: If the simulation is not 2D or the sensor is not defined as a binary mask.
451 """
452 # check simulation is 2D
453 if kgrid.dim != 2:
454 raise ValueError("The simulation must be 2D.")
456 # check sensor.mask is a binary mask
457 if sensor.mask.dtype != bool and set(np.unique(sensor.mask).tolist()) != {0, 1}:
458 raise ValueError("The sensor must be defined as a binary mask.")
460 # find the coordinates of the sensor points
461 x_sensor = matlab_mask(kgrid.x, sensor.mask == 1)
462 x_sensor = np.squeeze(x_sensor)
463 y_sensor = matlab_mask(kgrid.y, sensor.mask == 1)
464 y_sensor = np.squeeze(y_sensor)
466 # find the angle of each sensor point (from the centre)
467 angle = np.arctan2(-x_sensor, -y_sensor)
468 angle[angle < 0] = 2 * np.pi + angle[angle < 0]
470 # sort the sensor points in order of increasing angle
471 indices_new = np.argsort(angle, kind="stable")
473 # reorder the measure time series so that adjacent time series correspond
474 # to adjacent sensor points.
475 reordered_sensor_data = sensor_data[indices_new]
476 return reordered_sensor_data
479def reorder_binary_sensor_data(sensor_data: np.ndarray, reorder_index: np.ndarray):
480 """
481 Args:
482 sensor_data: N x K
483 reorder_index: N
485 Returns:
486 reordered sensor data
488 """
489 reorder_index = np.squeeze(reorder_index)
490 assert sensor_data.ndim == 2
491 assert reorder_index.ndim == 1
493 return sensor_data[reorder_index.argsort()]
496def calc_max_freq(max_spat_freq, c):
497 filter_cutoff_freq = max_spat_freq * c / (2 * np.pi)
498 return filter_cutoff_freq
501def get_alpha_filter(kgrid, medium, filter_cutoff, taper_ratio=0.5):
502 """
503 get_alpha_filter uses get_win to create a Tukey window via rotation to
504 pass to the medium.alpha_filter. This parameter is used to regularise time
505 reversal image reconstruction when absorption compensation is included.
507 Args:
508 kgrid: simulation grid
509 medium: simulation medium
510 filter_cutoff: Any of the filter_cutoff inputs may be set to 'max' to set the cutoff frequency to
511 the maximum frequency supported by the grid
512 taper_ratio: The taper_ratio input is used to control the width of the transition region between
513 the passband and stopband. The default value is 0.5, which corresponds to
514 a transition region of 50% of the filter width.
516 Returns:
517 alpha_filter
519 """
521 dim = num_dim(kgrid.k)
522 logging.log(logging.INFO, f" taper ratio: {taper_ratio}")
523 # extract the maximum sound speed
524 c = max(medium.sound_speed)
526 assert len(filter_cutoff) == dim, f"Input filter_cutoff must have {dim} elements for a {dim}D grid"
528 # parse cutoff freqs
529 filter_size = []
530 for idx, freq in enumerate(filter_cutoff):
531 if freq == "max":
532 filter_cutoff[idx] = calc_max_freq(kgrid.k_max[idx], c)
533 filter_size_local = kgrid.N[idx]
534 else:
535 filter_size_local, filter_cutoff[idx] = freq2wavenumber(kgrid.N[idx], kgrid.k_max[idx], filter_cutoff[idx], c, kgrid.k[idx])
536 filter_size.append(filter_size_local)
538 # create the alpha_filter
539 filter_sec, _ = get_win(filter_size, "Tukey", param=taper_ratio, rotation=True)
541 # enlarge the alpha_filter to the size of the grid
542 alpha_filter = np.zeros(kgrid.N)
543 indexes = [round((kgrid.N[idx] - filter_size[idx]) / 2) for idx in range(len(filter_size))]
545 if dim == 1:
546 alpha_filter[indexes[0] : indexes[0] + filter_size[0]] = np.squeeze(filter_sec)
547 elif dim == 2:
548 alpha_filter[indexes[0] : indexes[0] + filter_size[0], indexes[1] : indexes[1] + filter_size[1]] = filter_sec
549 elif dim == 3:
550 alpha_filter[
551 indexes[0] : indexes[0] + filter_size[0], indexes[1] : indexes[1] + filter_size[1], indexes[2] : indexes[2] + filter_size[2]
552 ] = filter_sec
554 def dim_string(cutoff_vals):
555 return "".join([(str(scale_SI(co)[0]) + " Hz by ") for co in cutoff_vals])
557 # update the command line status
558 logging.log(logging.INFO, " filter cutoff: " + dim_string(filter_cutoff)[:-4] + ".")
560 return alpha_filter
563def get_wave_number(Nx, dx, dim):
564 if Nx % 2 == 0:
565 # even
566 nx = np.arange(start=-Nx / 2, stop=Nx / 2) / Nx
567 else:
568 nx = np.arange(start=-(Nx - 1) / 2, stop=(Nx - 1) / 2 + 1) / Nx
570 kx = ifftshift((2 * np.pi / dx) * nx)
572 return kx
575def gradient_spect(f: np.ndarray, dn: List[float], dim: Optional[Union[int, List[int]]] = None, deriv_order: int = 1) -> np.ndarray:
576 """
577 gradient_spect calculates the gradient of an n-dimensional input matrix using the Fourier collocation spectral method.
578 The gradient for singleton dimensions is returned as 0.
580 Args:
581 f: A numpy ndarray representing the input matrix.
582 dn: A list of floats representing the grid spacings in each dimension.
583 dim: An optional integer or list of integers representing the dimensions along which to calculate the gradient.
584 deriv_order: An integer representing the order of the derivative to calculate. Default is 1.
586 Returns:
587 A numpy ndarray containing the calculated gradient.
589 """
591 # get size of the input function
592 sz = f.shape
594 # check if input is 1D or user defined input dimension is given
595 if dim or len(sz) == 1:
596 # check if a single dn value is given, if not, extract the required value
597 if not (isinstance(dn, int) or isinstance(dn, float)):
598 dn = dn[dim]
600 # get the grid size along the specified dimension, or the longest dimension if 1D
601 if max(sz) == np.prod(sz):
602 dim = np.argmax(sz)
603 Nx = sz[dim]
604 else:
605 Nx = sz[dim]
607 # get the wave number
608 kx = get_wave_number(Nx, dn, dim)
610 # calculate derivative and assign output
611 grads = np.real(ifft((1j * kx) ** deriv_order * fft(f, axis=dim), axis=dim))
612 else:
613 # logging.log(logging.WARN, "This implementation is not tested.")
614 # get the wave number
615 # kx = get_wave_number(sz(dim), dn[dim], dim)
617 assert len(dn) == len(sz), ValueError(f"{len(sz)} values for dn must be specified for a {len(sz)}-dimensional input matrix.")
619 grads = []
620 # calculate the gradient for each non-singleton dimension
621 for dim in range(num_dim(f)):
622 # get the wave number
623 kx = get_wave_number(sz[dim], dn[dim], dim)
624 # calculate derivative and assign output
625 # TODO: replace this with numpy broadcasting
626 kx = broadcast_axis(kx, num_dim(f), dim)
627 grads.append(np.real(ifft((1j * kx) ** deriv_order * fft(f, axis=dim), axis=dim)))
629 return grads
632def unmask_sensor_data(kgrid, sensor, sensor_data: np.ndarray) -> np.ndarray:
633 # create an empty matrix
634 if kgrid.k == 1:
635 unmasked_sensor_data = np.zeros((kgrid.Nx, 1))
636 elif kgrid.k == 2:
637 unmasked_sensor_data = np.zeros((kgrid.Nx, kgrid.Ny))
638 elif kgrid.k == 3:
639 unmasked_sensor_data = np.zeros((kgrid.Nx, kgrid.Ny, kgrid.Nz))
640 else:
641 raise NotImplementedError
643 # reorder input data
644 flat_sensor_mask = (sensor.mask != 0).flatten("F")
645 assignment_mask = unflatten_matlab_mask(unmasked_sensor_data, np.where(flat_sensor_mask)[0])
646 # unmasked_sensor_data.flatten('F')[flat_sensor_mask] = sensor_data.flatten()
647 unmasked_sensor_data[assignment_mask] = sensor_data.flatten()
648 # unmasked_sensor_data[unflatten_matlab_mask(unmasked_sensor_data, sensor.mask != 0)] = sensor_data
649 return unmasked_sensor_data
652def create_cw_signals(t_array: np.ndarray, freq: float, amp: np.ndarray, phase: np.ndarray, ramp_length: int = 4) -> np.ndarray:
653 """
654 Generate a series of continuous wave (CW) signals based on the 1D or 2D input matrices `amp` and `phase`, where each signal
655 is given by:
657 amp[i, j] .* sin(2 * pi * freq * t_array + phase[i, j]);
659 To avoid startup transients, a cosine tapered up-ramp is applied to the beginning of the signal. By default, the length
660 of this ramp is four periods of the wave. The up-ramp can be turned off by setting the `ramp_length` to 0.
662 Examples:
664 # define sampling parameters
665 f = 5e6
666 T = 1/f
667 Fs = 100e6
668 dt = 1/Fs
669 t_array = np.arange(0, 10*T, dt)
671 # define amplitude and phase
672 amp = get_win(9, 'Gaussian')
673 phase = np.arange(0, 2*pi, 9).T
675 # create signals and plot
676 cw_signal = create_cw_signals(t_array, f, amp, phase)
678 Args:
679 t_array: A numpy ndarray representing the time values.
680 freq: A float representing the frequency of the signals.
681 amp: A numpy ndarray representing the amplitudes of the signals.
682 phase: A numpy ndarray representing the phases of the signals.
683 ramp_length: An optional integer representing the length of the cosine up-ramp in periods of the wave. Default is 4.
685 Returns:
686 A numpy ndarray containing the generated CW signals.
687 """
689 if len(phase) == 1:
690 phase = phase * np.ones(amp.shape)
692 if amp.ndim > 1:
693 N1, N2 = amp.shape
694 else:
695 N1, N2 = amp.shape[0], 1
697 # create input signals
698 cw_signal = np.zeros((N1, N2, len(t_array)))
700 # create signal
701 for index1 in range(N1):
702 for index2 in range(N2):
703 if amp.ndim > 1:
704 cw_signal[index1, index2, :] = amp[index1, index2] * np.sin(2 * np.pi * freq * t_array + phase[index1, index2])
705 else:
706 cw_signal[index1, index2, :] = amp[index1] * np.sin(2 * np.pi * freq * t_array + phase[index1])
708 # apply ramp to avoid startup transients
709 if ramp_length != 0:
710 # get period and time step (assuming dt is constant)
711 period = 1 / freq
712 dt = t_array[1] - t_array[0]
714 # create x-axis for ramp between 0 and pi
715 ramp_length_points = round(ramp_length * period / dt)
716 ramp_axis = np.linspace(0, np.pi, ramp_length_points)
718 # create ramp using a shifted cosine
719 ramp = (-np.cos(ramp_axis) + 1) * 0.5
720 ramp = np.expand_dims(ramp, axis=(0, 1))
722 # apply ramp to all signals simultaneously
723 cw_signal[:, :, :ramp_length_points] = ramp * cw_signal[:, :, :ramp_length_points]
725 # remove singleton dimensions if cw_signal has more than two dimensions
726 if cw_signal.ndim > 2:
727 cw_signal = np.squeeze(cw_signal)
729 # if only a single amplitude and phase is given, force time to be the
730 # second dimensions
731 if amp.ndim == 1:
732 cw_signal = np.reshape(cw_signal, (N1, -1))
734 return cw_signal