Coverage for kwave/utils/signals.py: 6%

330 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2025-03-24 12:06 -0700

1import logging 

2from math import floor 

3 

4import numpy as np 

5import scipy 

6from beartype import beartype as typechecker 

7from beartype.typing import List, Optional, Tuple, Union 

8from jaxtyping import Bool, Int 

9from numpy.fft import fft, ifft, ifftshift 

10 

11import kwave.utils.typing as kt 

12 

13from .conversion import freq2wavenumber 

14from .data import scale_SI 

15from .mapgen import ndgrid 

16from .math import gaussian, sinc 

17from .matlab import matlab_mask, rem, unflatten_matlab_mask 

18from .matrix import broadcast_axis, num_dim 

19 

20 

21def add_noise(signal: np.ndarray, snr: float, mode="rms"): 

22 """ 

23 Add Gaussian noise to a signal. 

24 

25 Args: 

26 signal: input signal 

27 snr: desired signal snr (signal-to-noise ratio) in decibels after adding noise 

28 mode: 'rms' (default) or 'peak' 

29 

30 Returns: 

31 Signal with augmented with noise. This behaviour differs from the k-Wave MATLAB implementation in that the SNR is nor returned. 

32 

33 """ 

34 if mode == "rms": 

35 reference = np.sqrt(np.mean(signal**2)) 

36 elif mode == "peak": 

37 reference = np.max(signal) 

38 else: 

39 raise ValueError(f"Unknown parameter '{mode}' for input mode.") 

40 

41 # calculate the standard deviation of the Gaussian noise 

42 std_dev = reference / (10 ** (snr / 20)) 

43 

44 # calculate noise 

45 noise = std_dev * np.random.randn(*signal.shape) 

46 

47 # check the snr 

48 noise_rms = np.sqrt(np.mean(noise**2)) 

49 snr = 20.0 * np.log10(reference / noise_rms) 

50 

51 # add noise to the recorded sensor data 

52 signal = signal + noise 

53 

54 return signal 

55 

56 

57@typechecker 

58def get_win( 

59 N: Union[int, np.ndarray, Tuple[int, int], Tuple[int, int, int], List[Int[kt.ScalarLike, ""]]], 

60 # TODO: replace and refactor for scipy.signal.get_window 

61 # https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.get_window.html#scipy.signal.get_window 

62 type_: str, # TODO change this to enum in the future 

63 plot_win: bool = False, 

64 param: Optional[float] = None, 

65 rotation: bool = False, 

66 symmetric: Union[bool, Bool[np.ndarray, "N"]] = True, 

67 square: bool = False, 

68): 

69 """ 

70 

71 A frequency domain windowing function of specified type and dimensions. 

72 

73 Args: 

74 N: Number of samples, [Nx] for 1D, [Nx, Ny] for 2D, [Nx, Ny, Nz] for 3D. 

75 type_: Window type. Supported values: 'Bartlett', 'Bartlett-Hanning', 'Blackman', 'Blackman-Harris', 

76 'Blackman-Nuttall', 'Cosine', 'Flattop', 'Gaussian', 'HalfBand', 

77 'Hamming', 'Hanning', 'Kaiser', 'Lanczos', 'Nuttall', 

78 'Rectangular', 'Triangular', 'Tukey'. 

79 plot_win: Boolean to display the window (default = False). 

80 param: Control parameter for Tukey, Blackman, Gaussian, and Kaiser windows: taper ratio (Tukey), 

81 alpha (Blackman, Kaiser), standard deviation (Gaussian) 

82 (default = 0.5, 0.16, 3 respectively). 

83 rotation: Boolean to create windows via rotation or outer product (default = False). 

84 symmetric: Boolean to make the window symmetrical (default = True). 

85 Can also be a vector defining the symmetry in each matrix dimension. 

86 square: Boolean to force the window to be square (default = False). 

87 

88 Returns: 

89 A tuple of (win, cg) where win is the window and cg is the coherent gain of the window. 

90 """ 

91 

92 def cosine_series(n: int, N: int, coeffs: List[float]) -> np.ndarray: 

93 """ 

94 

95 Sub-function to calculate a summed filter cosine series. 

96 

97 Args: 

98 n: An integer representing the current index in the series. 

99 N: An integer representing the total number of terms in the series. 

100 coeffs: A list of floats representing the coefficients of the cosine terms. 

101 

102 Returns: 

103 A numpy ndarray containing the calculated series. 

104 

105 """ 

106 series = coeffs[0] 

107 for index in range(1, len(coeffs)): 

108 series = series + (-1) ** index * coeffs[index] * np.cos(index * 2 * np.pi * n / (N - 1)) 

109 return series.T 

110 

111 # Check if N is either `int` or `list of ints` 

112 # assert isinstance(N, int) or isinstance(N, list) or isinstance(N, np.ndarray) 

113 N = np.array(N, dtype=int) 

114 N = N if np.size(N) > 1 else N.item() 

115 

116 # Check if symmetric is either `bool` or `list of bools` 

117 # assert isinstance(symmetric, int) or isinstance(symmetric, list) 

118 symmetric = np.array(symmetric, dtype=bool) 

119 

120 # Set default value for `param` if type is one of the special ones 

121 assert not plot_win, NotImplementedError("Plotting is not implemented.") 

122 if type_ == "Tukey": 

123 if param is None: 

124 param = 0.5 

125 param = np.clip(param, a_min=0, a_max=1) 

126 elif type_ == "Blackman": 

127 if param is None: 

128 param = 0.16 

129 param = np.clip(param, a_min=0, a_max=1) 

130 elif type_ == "Gaussian": 

131 if param is None: 

132 param = 0.5 

133 param = np.clip(param, a_min=0, a_max=0.5) 

134 elif type_ == "Kaiser": 

135 if param is None: 

136 param = 3 

137 param = np.clip(param, a_min=0, a_max=100) 

138 

139 # if a non-symmetrical window is required, enlarge the window size (note, 

140 # this expands each dimension individually if symmetric is a vector) 

141 N = N + 1 * (1 - symmetric.astype(int)) 

142 

143 # if a square window is required, replace grid sizes with smallest size and 

144 # store a copy of the original size 

145 if square and (N.size != 1): 

146 N_orig = np.copy(N) 

147 L = min(N) 

148 N[:] = L 

149 

150 # create the window 

151 if N.size == 1: 

152 # TODO: what should this behaviour be if N is a list of ints? make windows of multiple lengths? 

153 n = np.arange(0, N) 

154 

155 # TODO: find failure cases in test suite when N is zero. 

156 # assert np.all(N) > 1, 'Signal length N must be greater than 1' 

157 

158 if type_ == "Bartlett": 

159 win = (2 / (N - 1) * ((N - 1) / 2 - abs(n - (N - 1) / 2))).T 

160 elif type_ == "Bartlett-Hanning": 

161 win = (0.62 - 0.48 * abs(n / (N - 1) - 1 / 2) - 0.38 * np.cos(2 * np.pi * n / (N - 1))).T 

162 elif type_ == "Blackman": 

163 win = cosine_series(n, N, [(1 - param) / 2, 0.5, param / 2]) 

164 elif type_ == "Blackman-Harris": 

165 win = cosine_series(n, N, [0.35875, 0.48829, 0.14128, 0.01168]) 

166 elif type_ == "Blackman-Nuttall": 

167 win = cosine_series(n, N, [0.3635819, 0.4891775, 0.1365995, 0.0106411]) 

168 elif type_ == "Cosine": 

169 win = (np.cos(np.pi * n / (N - 1) - np.pi / 2)).T 

170 elif type_ == "Flattop": 

171 win = cosine_series(n, N, [0.21557895, 0.41663158, 0.277263158, 0.083578947, 0.006947368]) 

172 elif type_ == "Gaussian": 

173 win = (np.exp(-0.5 * ((n - (N - 1) / 2) / (param * (N - 1) / 2)) ** 2)).T 

174 elif type_ == "HalfBand": 

175 win = np.ones(N) 

176 # why not to just round? => because rounding 0.5 introduces unexpected behaviour 

177 # round(0.5) should be 1 but it is 0 

178 ramp_length = round(N / 4 + 1e-8) 

179 ramp = ( 

180 1 / 2 

181 + 9 / 16 * np.cos(np.pi * np.arange(1, ramp_length + 1) / (2 * ramp_length)) 

182 - 1 / 16 * np.cos(3 * np.pi * np.arange(1, ramp_length + 1) / (2 * ramp_length)) 

183 ) 

184 if ramp_length > 0: 

185 win[0:ramp_length] = np.flip(ramp) 

186 win[-ramp_length:] = ramp 

187 elif type_ == "Hamming": 

188 win = (0.54 - 0.46 * np.cos(2 * np.pi * n / (N - 1))).T 

189 elif type_ == "Hanning": 

190 win = (0.5 - 0.5 * np.cos(2 * np.pi * n / (N - 1))).T 

191 elif type_ == "Kaiser": 

192 part_1 = scipy.special.iv(0, np.pi * param * np.sqrt(1 - (2 * n / (N - 1) - 1) ** 2)) 

193 part_2 = scipy.special.iv(0, np.pi * param) 

194 win = part_1 / part_2 

195 elif type_ == "Lanczos": 

196 win = 2 * np.pi * n / (N - 1) - np.pi 

197 win = sinc(win + 1e-12).T 

198 elif type_ == "Nuttall": 

199 win = cosine_series(n, N, [0.3635819, 0.4891775, 0.1365995, 0.0106411]) 

200 elif type_ == "Rectangular": 

201 win = np.ones(N) 

202 elif type_ == "Triangular": 

203 win = (2 / N * (N / 2 - abs(n - (N - 1) / 2))).T 

204 elif type_ == "Tukey": 

205 win = np.ones((N, 1)) 

206 index = np.arange(0, (N - 1) * param / 2 + 1e-8) 

207 param = param * N 

208 win[0 : len(index)] = 0.5 * (1 + np.cos(2 * np.pi / param * (index - param / 2)))[:, None] 

209 win[np.arange(-1, -len(index) - 1, -1)] = win[0 : len(index)] 

210 win = win.squeeze(axis=-1) 

211 else: 

212 raise ValueError(f"Unknown window type: {type_}") 

213 

214 # trim the window if required 

215 if not symmetric: 

216 N -= 1 

217 win = win[0:N] 

218 win = np.expand_dims(win, axis=-1) 

219 

220 # calculate the coherent gain 

221 cg = win.sum() / N 

222 elif N.size == 2: 

223 # create the 2D window 

224 if rotation: 

225 # create the window in one dimension using getWin recursively 

226 L = max(N) 

227 win_lin, _ = get_win(int(L), type_, param=param) 

228 win_lin = np.squeeze(win_lin) 

229 

230 # create the reference axis 

231 radius = (L - 1) / 2 

232 ll = np.linspace(-radius, radius, L) 

233 

234 # create the 2D window using rotation 

235 xx = np.linspace(-radius, radius, N[0]) 

236 yy = np.linspace(-radius, radius, N[1]) 

237 [x, y] = ndgrid(xx, yy) 

238 r = np.sqrt(x**2 + y**2) 

239 r[r > radius] = radius 

240 interp_func = scipy.interpolate.interp1d(ll, win_lin) 

241 win = interp_func(r) 

242 win[r <= radius] = interp_func(r[r <= radius]) 

243 

244 else: 

245 # create the window in each dimension using getWin recursively 

246 win_x, _ = get_win(int(N[0]), type_, param=param) 

247 win_y, _ = get_win(int(N[1]), type_, param=param) 

248 

249 # create the 2D window using the outer product 

250 win = (win_y * win_x.T).T 

251 

252 # trim the window if required 

253 N = N - 1 * (1 - np.array(symmetric).astype(int)) 

254 win = win[0 : N[0], 0 : N[1]] 

255 

256 # calculate the coherent gain 

257 cg = win.sum() / np.prod(N) 

258 elif N.size == 3: 

259 # create the 3D window 

260 if rotation: 

261 # create the window in one dimension using getWin recursively 

262 L = N.max() 

263 win_lin, _ = get_win(int(L), type_, param=param) 

264 

265 # create the reference axis 

266 radius = (L - 1) / 2 

267 ll = np.linspace(-radius, radius, L) 

268 

269 # create the 3D window using rotation 

270 xx = np.linspace(-radius, radius, N[0]) 

271 yy = np.linspace(-radius, radius, N[1]) 

272 zz = np.linspace(-radius, radius, N[2]) 

273 [x, y, z] = ndgrid(xx, yy, zz) 

274 r = np.sqrt(x**2 + y**2 + z**2) 

275 r[r > radius] = radius 

276 

277 win_lin = np.squeeze(win_lin) 

278 interp_func = scipy.interpolate.interp1d(ll, win_lin) 

279 win = interp_func(r) 

280 win[r <= radius] = interp_func(r[r <= radius]) 

281 

282 else: 

283 # create the window in each dimension using getWin recursively 

284 win_x, _ = get_win(int(N[0]), type_, param=param) 

285 win_y, _ = get_win(int(N[1]), type_, param=param) 

286 win_z, _ = get_win(int(N[2]), type_, param=param) 

287 

288 # create the 2D window using the outer product 

289 win_2D = win_x * win_z.T 

290 

291 # create the 3D window 

292 win = np.zeros((N[0], N[1], N[2])) 

293 for index in range(0, N[1]): 

294 win[:, index, :] = win_2D[:, :] * win_y[index] 

295 

296 # trim the window if required 

297 N = N - 1 * (1 - np.array(symmetric).astype(int)) 

298 win = win[0 : N[0], 0 : N[1], 0 : N[2]] 

299 

300 # calculate the coherent gain 

301 cg = win.sum() / np.prod(N) 

302 else: 

303 raise ValueError("Invalid input for N, only 1-, 2-, and 3-D windows are supported.") 

304 

305 # enlarge the window if required 

306 if square and (N.size != 1): 

307 L = N[0] 

308 win_sq = win 

309 win = np.zeros(N_orig) 

310 if N.size == 2: 

311 index1 = round((N[0] - L) / 2) 

312 index2 = round((N[1] - L) / 2) 

313 win[index1 : (index1 + L), index2 : (index2 + L)] = win_sq 

314 elif N.size == 3: 

315 index1 = floor((N_orig[0] - L) / 2) 

316 index2 = floor((N_orig[1] - L) / 2) 

317 index3 = floor((N_orig[2] - L) / 2) 

318 win[index1 : index1 + L, index2 : index2 + L, index3 : index3 + L] = win_sq 

319 

320 return win, cg 

321 

322 

323def tone_burst(sample_freq, signal_freq, num_cycles, envelope="Gaussian", plot_signal=False, signal_length=0, signal_offset=0): 

324 """ 

325 Create an enveloped single frequency tone burst. 

326 

327 Args: 

328 sample_freq: sampling frequency in Hz 

329 signal_freq: frequency of the tone burst signal in Hz 

330 num_cycles: number of sinusoidal oscillations 

331 envelope: Envelope used to taper the tone burst. Valid inputs are: 

332 - 'Gaussian' (the default) 

333 - 'Rectangular' 

334 - [num_ring_up_cycles, num_ring_down_cycles] 

335 The last option generates a continuous wave signal with a cosine taper of the specified length at the beginning and end. 

336 plot: Boolean controlling whether the created tone burst is plotted. 

337 signal_length: Signal length in number of samples. If longer than the tone burst length, the signal is appended with zeros. 

338 signal_offset: Signal offset before the tone burst starts in number of samples. 

339 If an array is given, a matrix of tone bursts is created where each row corresponds to 

340 a tone burst for each value of the 'SignalOffset'. 

341 

342 Returns: 

343 created tone burst 

344 

345 """ 

346 assert isinstance(signal_offset, int) or isinstance(signal_offset, np.ndarray), "signal_offset must be integer or array of integers" 

347 assert isinstance(signal_length, int), "signal_length must be integer" 

348 

349 # calculate the temporal spacing 

350 dt = 1 / sample_freq # [s] 

351 

352 # create the tone burst 

353 tone_length = num_cycles / signal_freq # [s] 

354 # We want to include the endpoint but only if it's divisible by the step-size. 

355 # Modulo operator is not stable, so multiple conditions included. 

356 # if ( (tone_length % dt) < 1e-18 or (np.abs(tone_length % dt - dt) < 1e-18) ): 

357 if rem(tone_length, dt) < 1e-18: 

358 tone_t = np.linspace(0, tone_length, int(tone_length / dt) + 1) 

359 else: 

360 tone_t = np.arange(0, tone_length, dt) 

361 

362 tone_burst = np.sin(2 * np.pi * signal_freq * tone_t) 

363 tone_index = np.round(signal_offset) 

364 

365 # check for ring up and ring down input 

366 if isinstance(envelope, list) or isinstance(envelope, np.ndarray): 

367 num_ring_up_cycles, num_ring_down_cycles = envelope 

368 

369 # check signal is long enough for ring up and down 

370 assert num_cycles >= ( 

371 num_ring_up_cycles + num_ring_down_cycles 

372 ), "Input num_cycles must be longer than num_ring_up_cycles + num_ring_down_cycles." 

373 

374 # get period 

375 period = 1 / signal_freq 

376 

377 # create x-axis for ramp between 0 and pi 

378 up_ramp_length_points = round(num_ring_up_cycles * period / dt) 

379 down_ramp_length_points = round(num_ring_down_cycles * period / dt) 

380 up_ramp_axis = np.arange(0, np.pi + 1e-8, np.pi / (up_ramp_length_points - 1)) 

381 down_ramp_axis = np.arange(0, np.pi + 1e-8, np.pi / (down_ramp_length_points - 1)) 

382 

383 # create ramp using a shifted cosine 

384 up_ramp = (-np.cos(up_ramp_axis) + 1) * 0.5 

385 down_ramp = (np.cos(down_ramp_axis) + 1) * 0.5 

386 

387 # apply the ramps 

388 tone_burst[0:up_ramp_length_points] = tone_burst[0:up_ramp_length_points] * up_ramp 

389 tone_burst[-down_ramp_length_points:] = tone_burst[-down_ramp_length_points:] * down_ramp 

390 

391 else: 

392 # create the envelope 

393 if envelope == "Gaussian": 

394 x_lim = 3 

395 window_x = np.arange(-x_lim, x_lim + 1e-8, 2 * x_lim / (len(tone_burst) - 1)) 

396 window = gaussian(window_x, 1, 0, 1) 

397 elif envelope == "Rectangular": 

398 window = np.ones_like(tone_burst) 

399 elif envelope == "RingUpDown": 

400 raise NotImplementedError("RingUpDown not yet implemented") 

401 else: 

402 raise ValueError(f"Unknown envelope {envelope}.") 

403 

404 # apply the envelope 

405 tone_burst = tone_burst * window 

406 

407 # force the ends to be zero by applying a second window 

408 if envelope == "Gaussian": 

409 tone_burst = tone_burst * np.squeeze(get_win(len(tone_burst), type_="Tukey", param=0.05)[0]) 

410 

411 # Convert tone_index and signal_offset to numpy arrays 

412 signal_offset = np.array(signal_offset) 

413 

414 # Determine the length of the signal array 

415 signal_length = max(signal_length, signal_offset.max() + len(tone_burst)) 

416 

417 # Create the signal array with the correct size 

418 signal = np.zeros((np.atleast_1d(signal_offset).size, signal_length)) 

419 

420 # Add the tone burst to the signal array 

421 tone_index = np.atleast_1d(tone_index) 

422 

423 if tone_index.size == 1: 

424 tone_index = int(np.squeeze(tone_index)) 

425 signal[:, tone_index : tone_index + len(tone_burst)] = tone_burst.T 

426 else: 

427 for i, idx in enumerate(tone_index): 

428 signal[i, int(idx) : int(idx) + len(tone_burst)] = tone_burst 

429 

430 # plot the signal if required 

431 if plot_signal: 

432 raise NotImplementedError 

433 

434 return signal 

435 

436 

437def reorder_sensor_data(kgrid, sensor, sensor_data: np.ndarray) -> np.ndarray: 

438 """ 

439 Reorders the sensor data based on the coordinates of the sensor points. 

440 

441 Args: 

442 kgrid: The k-Wave grid object. 

443 sensor: The k-Wave sensor object. 

444 sensor_data: The sensor data to be reordered. 

445 

446 Returns: 

447 np.ndarray of the reordered sensor data. 

448 

449 Raises: 

450 ValueError: If the simulation is not 2D or the sensor is not defined as a binary mask. 

451 """ 

452 # check simulation is 2D 

453 if kgrid.dim != 2: 

454 raise ValueError("The simulation must be 2D.") 

455 

456 # check sensor.mask is a binary mask 

457 if sensor.mask.dtype != bool and set(np.unique(sensor.mask).tolist()) != {0, 1}: 

458 raise ValueError("The sensor must be defined as a binary mask.") 

459 

460 # find the coordinates of the sensor points 

461 x_sensor = matlab_mask(kgrid.x, sensor.mask == 1) 

462 x_sensor = np.squeeze(x_sensor) 

463 y_sensor = matlab_mask(kgrid.y, sensor.mask == 1) 

464 y_sensor = np.squeeze(y_sensor) 

465 

466 # find the angle of each sensor point (from the centre) 

467 angle = np.arctan2(-x_sensor, -y_sensor) 

468 angle[angle < 0] = 2 * np.pi + angle[angle < 0] 

469 

470 # sort the sensor points in order of increasing angle 

471 indices_new = np.argsort(angle, kind="stable") 

472 

473 # reorder the measure time series so that adjacent time series correspond 

474 # to adjacent sensor points. 

475 reordered_sensor_data = sensor_data[indices_new] 

476 return reordered_sensor_data 

477 

478 

479def reorder_binary_sensor_data(sensor_data: np.ndarray, reorder_index: np.ndarray): 

480 """ 

481 Args: 

482 sensor_data: N x K 

483 reorder_index: N 

484 

485 Returns: 

486 reordered sensor data 

487 

488 """ 

489 reorder_index = np.squeeze(reorder_index) 

490 assert sensor_data.ndim == 2 

491 assert reorder_index.ndim == 1 

492 

493 return sensor_data[reorder_index.argsort()] 

494 

495 

496def calc_max_freq(max_spat_freq, c): 

497 filter_cutoff_freq = max_spat_freq * c / (2 * np.pi) 

498 return filter_cutoff_freq 

499 

500 

501def get_alpha_filter(kgrid, medium, filter_cutoff, taper_ratio=0.5): 

502 """ 

503 get_alpha_filter uses get_win to create a Tukey window via rotation to 

504 pass to the medium.alpha_filter. This parameter is used to regularise time 

505 reversal image reconstruction when absorption compensation is included. 

506 

507 Args: 

508 kgrid: simulation grid 

509 medium: simulation medium 

510 filter_cutoff: Any of the filter_cutoff inputs may be set to 'max' to set the cutoff frequency to 

511 the maximum frequency supported by the grid 

512 taper_ratio: The taper_ratio input is used to control the width of the transition region between 

513 the passband and stopband. The default value is 0.5, which corresponds to 

514 a transition region of 50% of the filter width. 

515 

516 Returns: 

517 alpha_filter 

518 

519 """ 

520 

521 dim = num_dim(kgrid.k) 

522 logging.log(logging.INFO, f" taper ratio: {taper_ratio}") 

523 # extract the maximum sound speed 

524 c = max(medium.sound_speed) 

525 

526 assert len(filter_cutoff) == dim, f"Input filter_cutoff must have {dim} elements for a {dim}D grid" 

527 

528 # parse cutoff freqs 

529 filter_size = [] 

530 for idx, freq in enumerate(filter_cutoff): 

531 if freq == "max": 

532 filter_cutoff[idx] = calc_max_freq(kgrid.k_max[idx], c) 

533 filter_size_local = kgrid.N[idx] 

534 else: 

535 filter_size_local, filter_cutoff[idx] = freq2wavenumber(kgrid.N[idx], kgrid.k_max[idx], filter_cutoff[idx], c, kgrid.k[idx]) 

536 filter_size.append(filter_size_local) 

537 

538 # create the alpha_filter 

539 filter_sec, _ = get_win(filter_size, "Tukey", param=taper_ratio, rotation=True) 

540 

541 # enlarge the alpha_filter to the size of the grid 

542 alpha_filter = np.zeros(kgrid.N) 

543 indexes = [round((kgrid.N[idx] - filter_size[idx]) / 2) for idx in range(len(filter_size))] 

544 

545 if dim == 1: 

546 alpha_filter[indexes[0] : indexes[0] + filter_size[0]] = np.squeeze(filter_sec) 

547 elif dim == 2: 

548 alpha_filter[indexes[0] : indexes[0] + filter_size[0], indexes[1] : indexes[1] + filter_size[1]] = filter_sec 

549 elif dim == 3: 

550 alpha_filter[ 

551 indexes[0] : indexes[0] + filter_size[0], indexes[1] : indexes[1] + filter_size[1], indexes[2] : indexes[2] + filter_size[2] 

552 ] = filter_sec 

553 

554 def dim_string(cutoff_vals): 

555 return "".join([(str(scale_SI(co)[0]) + " Hz by ") for co in cutoff_vals]) 

556 

557 # update the command line status 

558 logging.log(logging.INFO, " filter cutoff: " + dim_string(filter_cutoff)[:-4] + ".") 

559 

560 return alpha_filter 

561 

562 

563def get_wave_number(Nx, dx, dim): 

564 if Nx % 2 == 0: 

565 # even 

566 nx = np.arange(start=-Nx / 2, stop=Nx / 2) / Nx 

567 else: 

568 nx = np.arange(start=-(Nx - 1) / 2, stop=(Nx - 1) / 2 + 1) / Nx 

569 

570 kx = ifftshift((2 * np.pi / dx) * nx) 

571 

572 return kx 

573 

574 

575def gradient_spect(f: np.ndarray, dn: List[float], dim: Optional[Union[int, List[int]]] = None, deriv_order: int = 1) -> np.ndarray: 

576 """ 

577 gradient_spect calculates the gradient of an n-dimensional input matrix using the Fourier collocation spectral method. 

578 The gradient for singleton dimensions is returned as 0. 

579 

580 Args: 

581 f: A numpy ndarray representing the input matrix. 

582 dn: A list of floats representing the grid spacings in each dimension. 

583 dim: An optional integer or list of integers representing the dimensions along which to calculate the gradient. 

584 deriv_order: An integer representing the order of the derivative to calculate. Default is 1. 

585 

586 Returns: 

587 A numpy ndarray containing the calculated gradient. 

588 

589 """ 

590 

591 # get size of the input function 

592 sz = f.shape 

593 

594 # check if input is 1D or user defined input dimension is given 

595 if dim or len(sz) == 1: 

596 # check if a single dn value is given, if not, extract the required value 

597 if not (isinstance(dn, int) or isinstance(dn, float)): 

598 dn = dn[dim] 

599 

600 # get the grid size along the specified dimension, or the longest dimension if 1D 

601 if max(sz) == np.prod(sz): 

602 dim = np.argmax(sz) 

603 Nx = sz[dim] 

604 else: 

605 Nx = sz[dim] 

606 

607 # get the wave number 

608 kx = get_wave_number(Nx, dn, dim) 

609 

610 # calculate derivative and assign output 

611 grads = np.real(ifft((1j * kx) ** deriv_order * fft(f, axis=dim), axis=dim)) 

612 else: 

613 # logging.log(logging.WARN, "This implementation is not tested.") 

614 # get the wave number 

615 # kx = get_wave_number(sz(dim), dn[dim], dim) 

616 

617 assert len(dn) == len(sz), ValueError(f"{len(sz)} values for dn must be specified for a {len(sz)}-dimensional input matrix.") 

618 

619 grads = [] 

620 # calculate the gradient for each non-singleton dimension 

621 for dim in range(num_dim(f)): 

622 # get the wave number 

623 kx = get_wave_number(sz[dim], dn[dim], dim) 

624 # calculate derivative and assign output 

625 # TODO: replace this with numpy broadcasting 

626 kx = broadcast_axis(kx, num_dim(f), dim) 

627 grads.append(np.real(ifft((1j * kx) ** deriv_order * fft(f, axis=dim), axis=dim))) 

628 

629 return grads 

630 

631 

632def unmask_sensor_data(kgrid, sensor, sensor_data: np.ndarray) -> np.ndarray: 

633 # create an empty matrix 

634 if kgrid.k == 1: 

635 unmasked_sensor_data = np.zeros((kgrid.Nx, 1)) 

636 elif kgrid.k == 2: 

637 unmasked_sensor_data = np.zeros((kgrid.Nx, kgrid.Ny)) 

638 elif kgrid.k == 3: 

639 unmasked_sensor_data = np.zeros((kgrid.Nx, kgrid.Ny, kgrid.Nz)) 

640 else: 

641 raise NotImplementedError 

642 

643 # reorder input data 

644 flat_sensor_mask = (sensor.mask != 0).flatten("F") 

645 assignment_mask = unflatten_matlab_mask(unmasked_sensor_data, np.where(flat_sensor_mask)[0]) 

646 # unmasked_sensor_data.flatten('F')[flat_sensor_mask] = sensor_data.flatten() 

647 unmasked_sensor_data[assignment_mask] = sensor_data.flatten() 

648 # unmasked_sensor_data[unflatten_matlab_mask(unmasked_sensor_data, sensor.mask != 0)] = sensor_data 

649 return unmasked_sensor_data 

650 

651 

652def create_cw_signals(t_array: np.ndarray, freq: float, amp: np.ndarray, phase: np.ndarray, ramp_length: int = 4) -> np.ndarray: 

653 """ 

654 Generate a series of continuous wave (CW) signals based on the 1D or 2D input matrices `amp` and `phase`, where each signal 

655 is given by: 

656 

657 amp[i, j] .* sin(2 * pi * freq * t_array + phase[i, j]); 

658 

659 To avoid startup transients, a cosine tapered up-ramp is applied to the beginning of the signal. By default, the length 

660 of this ramp is four periods of the wave. The up-ramp can be turned off by setting the `ramp_length` to 0. 

661 

662 Examples: 

663 

664 # define sampling parameters 

665 f = 5e6 

666 T = 1/f 

667 Fs = 100e6 

668 dt = 1/Fs 

669 t_array = np.arange(0, 10*T, dt) 

670 

671 # define amplitude and phase 

672 amp = get_win(9, 'Gaussian') 

673 phase = np.arange(0, 2*pi, 9).T 

674 

675 # create signals and plot 

676 cw_signal = create_cw_signals(t_array, f, amp, phase) 

677 

678 Args: 

679 t_array: A numpy ndarray representing the time values. 

680 freq: A float representing the frequency of the signals. 

681 amp: A numpy ndarray representing the amplitudes of the signals. 

682 phase: A numpy ndarray representing the phases of the signals. 

683 ramp_length: An optional integer representing the length of the cosine up-ramp in periods of the wave. Default is 4. 

684 

685 Returns: 

686 A numpy ndarray containing the generated CW signals. 

687 """ 

688 

689 if len(phase) == 1: 

690 phase = phase * np.ones(amp.shape) 

691 

692 if amp.ndim > 1: 

693 N1, N2 = amp.shape 

694 else: 

695 N1, N2 = amp.shape[0], 1 

696 

697 # create input signals 

698 cw_signal = np.zeros((N1, N2, len(t_array))) 

699 

700 # create signal 

701 for index1 in range(N1): 

702 for index2 in range(N2): 

703 if amp.ndim > 1: 

704 cw_signal[index1, index2, :] = amp[index1, index2] * np.sin(2 * np.pi * freq * t_array + phase[index1, index2]) 

705 else: 

706 cw_signal[index1, index2, :] = amp[index1] * np.sin(2 * np.pi * freq * t_array + phase[index1]) 

707 

708 # apply ramp to avoid startup transients 

709 if ramp_length != 0: 

710 # get period and time step (assuming dt is constant) 

711 period = 1 / freq 

712 dt = t_array[1] - t_array[0] 

713 

714 # create x-axis for ramp between 0 and pi 

715 ramp_length_points = round(ramp_length * period / dt) 

716 ramp_axis = np.linspace(0, np.pi, ramp_length_points) 

717 

718 # create ramp using a shifted cosine 

719 ramp = (-np.cos(ramp_axis) + 1) * 0.5 

720 ramp = np.expand_dims(ramp, axis=(0, 1)) 

721 

722 # apply ramp to all signals simultaneously 

723 cw_signal[:, :, :ramp_length_points] = ramp * cw_signal[:, :, :ramp_length_points] 

724 

725 # remove singleton dimensions if cw_signal has more than two dimensions 

726 if cw_signal.ndim > 2: 

727 cw_signal = np.squeeze(cw_signal) 

728 

729 # if only a single amplitude and phase is given, force time to be the 

730 # second dimensions 

731 if amp.ndim == 1: 

732 cw_signal = np.reshape(cw_signal, (N1, -1)) 

733 

734 return cw_signal