"""Fast Fourier transforms (:mod:`fluiddyn.calcul.easypyfft`)
=============================================================
.. autofunction:: fftw_grid_size
Provides classes for performing fft in 1, 2, and 3 dimensions:
.. autoclass:: FFTP2D
:members:
.. autoclass:: FFTW2DReal2Complex
:members:
.. autoclass:: FFTW3DReal2Complex
:members:
.. autoclass:: FFTW1D
:members:
.. autoclass:: FFTW1DReal2Complex
:members:
"""
import os
from time import time
import numpy as np
from ..util.mpi import nb_proc, printby0
try:
import scipy.fftpack as fftp
except ImportError:
pass
if "OMP_NUM_THREADS" in os.environ:
nthreads = int(os.environ["OMP_NUM_THREADS"])
else:
nthreads = 1
[docs]
def fftw_grid_size(nk, bases=[2, 3, 5, 7, 11, 13], debug=False):
"""Find the closest multiple of prime powers greater than or equal to nk
using Mixed Integer Linear Programming (MILP). Useful while setting the
grid-size to be compatible with FFTW.
Parameters
----------
nk : int
Lower bound for the spectral grid size.
bases : array-like, optional
List of bases, typically prime numbers.
debug : bool, optional
Print useful messages.
Returns
-------
int
"""
if {2, 3, 5} == set(bases):
if debug:
print("Using scipy.fftpack.next_fast_len")
return fftp.next_fast_len(nk)
elif {2, 3, 5, 7, 11, 13} == set(bases):
try:
import pyfftw
return pyfftw.next_fast_len(nk)
except (ImportError, AttributeError):
pass
else:
if debug:
print("Using pyfftw.next_fast_len")
if not {2, 3, 5, 7, 11, 13}.issuperset(bases):
raise ValueError(
"FFTW only supports bases which are a subset of "
"{2, 3, 5, 7, 11, 13}."
)
import pulp
prob = pulp.LpProblem("FFTW Grid-size Problem")
bases = np.array(bases)
bases_order1 = bases[bases < 10]
bases_order2 = bases[bases >= 10]
exp_max = np.ceil(np.log2(nk))
exps = pulp.LpVariable.dicts(
"exponent_o1", bases_order1, 0, exp_max, cat=pulp.LpInteger
)
exps.update(
pulp.LpVariable.dicts(
"exponent_o2", bases_order2, 0, 1, cat=pulp.LpInteger
)
)
log_nk_new = pulp.LpVariable("log_grid_size", 0)
log_nk_new = pulp.lpDot(exps.values(), np.log(bases))
prob += log_nk_new # Target to be minimized
# Subject to:
prob += log_nk_new >= np.log(nk), "T1"
if {11, 13}.issubset(bases):
prob += exps[11] + exps[13] <= 1, "T2"
if debug:
print("bases =", bases)
print("exponents =", exps)
print("log_nk_new =", log_nk_new)
# prob.writeLP("FFTWGridSizeOptimizationModel.lp")
prob.solve()
if debug:
print("Status:", pulp.LpStatus[prob.status])
for v in prob.variables():
print(v.name, "=", v.varValue)
if pulp.LpStatus[prob.status] == "Infeasible":
raise ValueError(f"Not enough bases: {bases}")
exps_solution = [v.varValue for v in prob.variables()]
nk_new = np.prod(np.power(bases, exps_solution))
return int(nk_new)
class BaseFFT:
def run_tests(self):
arr = np.random.rand(*self.shapeX)
arr_fft = self.fft(arr)
arr = self.ifft(arr_fft)
arr_fft = self.fft(arr)
nrj = self.compute_energy_from_spatial(arr)
nrj_fft = self.compute_energy_from_Fourier(arr_fft)
assert np.allclose(nrj, nrj_fft), (nrj, nrj_fft, nb_proc * nrj_fft - nrj)
arr2_fft = np.zeros(self.shapeK, dtype=np.complex128)
self.fft_as_arg(arr, arr2_fft)
nrj2_fft = self.compute_energy_from_Fourier(arr2_fft)
assert np.allclose(nrj, nrj2_fft)
arr2 = np.empty(self.shapeX)
self.ifft_as_arg(arr_fft, arr2)
nrj2 = self.compute_energy_from_spatial(arr2)
assert np.allclose(nrj, nrj2)
def run_benchs(self, nb_time_execute=10):
arr = np.zeros(self.shapeX)
arr_fft = np.zeros(self.shapeK, dtype=np.complex128)
times = []
for i in range(nb_time_execute):
t_start = time()
self.fft_as_arg(arr, arr_fft)
times.append(time() - t_start)
time_fft = np.mean(times)
times = []
for i in range(nb_time_execute):
t_start = time()
self.ifft_as_arg(arr_fft, arr)
times.append(time() - t_start)
time_ifft = np.mean(times)
name = self.__class__.__name__
printby0(
"Internal bench (" + name + ")\n"
"time fft ({}): {:.6f} s\n".format(name, time_fft)
+ f"time ifft ({name}): {time_ifft:.6f} s"
)
return time_fft, time_ifft
def get_short_name(self):
return self.__class__.__name__.lower()
def compute_energy_from_X(self, fieldX):
return np.mean(fieldX**2 / 2.0)
def get_local_size_X(self):
return np.prod(self.shapeX)
def get_shapeK_seq(self):
return self.shapeK
get_shapeK_loc = get_shapeK_seq
def get_shapeX_seq(self):
return self.shapeX
get_shapeX_loc = get_shapeX_seq
[docs]
class FFTP2D(BaseFFT):
"""A class to use fftp"""
def __init__(self, nx, ny):
if nx % 2 != 0 or ny % 2 != 0:
raise ValueError("nx and ny should be even")
self.nx = nx
self.ny = ny
self.shapeX = (ny, nx)
self.nkx = int(float(nx) / 2 + 1)
self.shapeK = self.shapeK_seq = self.shapeK_loc = (ny, self.nkx)
self.coef_norm = nx * ny
self.fft2d = self.fft
self.ifft2d = self.ifft
def fft(self, ff):
if not (isinstance(ff[0, 0], float)):
print("Warning: not array of floats")
big_ff_fft = fftp.fft2(ff) / self.coef_norm
small_ff_fft = big_ff_fft[:, 0 : self.nkx]
return small_ff_fft
def ifft(self, small_ff_fft, ARG_IS_COMPLEX=False):
if not (isinstance(small_ff_fft[0, 0], complex)):
print("Warning: not array of complexes")
# print('small_ff_fft\n', small_ff_fft)
big_ff_fft = np.empty(self.shapeX, dtype=np.complex128)
big_ff_fft[:, 0 : self.nkx] = small_ff_fft
for iky in range(self.ny):
big_ff_fft[iky, self.nkx :] = small_ff_fft[
-iky, self.nkx - 2 : 0 : -1
].conj()
# print('big_ff_fft final\n', big_ff_fft)
result_ifft = fftp.ifft2(big_ff_fft * self.coef_norm)
if np.max(np.imag(result_ifft)) > 10 ** (-8):
print(
"ifft2: imaginary part of ifft not equal to zero,",
np.max(np.imag(result_ifft)),
)
return np.real(result_ifft)
def fft_as_arg(self, field, field_fft):
field_fft[:] = self.fft(field)
def ifft_as_arg(self, field_fft, field):
field[:] = self.ifft(field_fft)
def compute_energy_from_Fourier(self, ff_fft):
return (
np.sum(abs(ff_fft[:, 0]) ** 2 + abs(ff_fft[:, -1]) ** 2)
+ 2 * np.sum(abs(ff_fft[:, 1:-1]) ** 2)
) / 2
def compute_energy_from_spatial(self, ff):
return np.mean(abs(ff) ** 2) / 2
class BasePyFFT(BaseFFT):
def __init__(self, shapeX):
try:
import pyfftw
except ImportError as err:
raise ImportError(
"ImportError {0}. Instead fftpack can be used (?)", err
)
if isinstance(shapeX, int):
shapeX = [shapeX]
shapeK = list(shapeX)
shapeK[-1] = shapeK[-1] // 2 + 1
shapeK = tuple(shapeK)
self.shapeX = shapeX
self.shapeK = self.shapeK_seq = self.shapeK_loc = shapeK
self.empty_aligned = pyfftw.empty_aligned
self.arrayX = pyfftw.empty_aligned(shapeX, np.float64)
self.arrayK = pyfftw.empty_aligned(shapeK, np.complex128)
axes = tuple(range(len(shapeX)))
self.fftplan = pyfftw.FFTW(
input_array=self.arrayX,
output_array=self.arrayK,
axes=axes,
direction="FFTW_FORWARD",
threads=nthreads,
)
self.ifftplan = pyfftw.FFTW(
input_array=self.arrayK,
output_array=self.arrayX,
axes=axes,
direction="FFTW_BACKWARD",
threads=nthreads,
)
self.coef_norm = np.prod(shapeX)
self.inv_coef_norm = 1.0 / self.coef_norm
def fft(self, fieldX):
fieldK = self.empty_aligned(self.shapeK, np.complex128)
self.fftplan(input_array=fieldX, output_array=fieldK)
return fieldK / self.coef_norm
def ifft(self, fieldK):
fieldX = self.empty_aligned(self.shapeX, np.float64)
# This copy is needed because FFTW_DESTROY_INPUT is used.
# See pyfftw.readthedocs.io/en/latest/source/pyfftw/pyfftw.html
self.arrayK[:] = fieldK
self.ifftplan(
input_array=self.arrayK, output_array=fieldX, normalise_idft=False
)
return fieldX
def fft_as_arg(self, fieldX, fieldK):
self.fftplan(input_array=fieldX, output_array=fieldK)
fieldK *= self.inv_coef_norm
def ifft_as_arg(self, fieldK, fieldX):
# This copy is needed because FFTW_DESTROY_INPUT is used.
# See pyfftw.readthedocs.io/en/latest/source/pyfftw/pyfftw.html
# fieldK = fieldK.copy()
# self.ifftplan(input_array=fieldK, output_array=fieldX,
# this seems faster (but it could depend on the size)
self.arrayK[:] = fieldK
self.ifftplan(
input_array=self.arrayK, output_array=fieldX, normalise_idft=False
)
def ifft_as_arg_destroy(self, fieldK, fieldX):
self.ifftplan(
input_array=fieldK, output_array=fieldX, normalise_idft=False
)
def compute_energy_from_Fourier(self, ff_fft):
result = self.sum_wavenumbers(abs(ff_fft) ** 2) / 2
return result
compute_energy_from_K = compute_energy_from_Fourier
def compute_energy_from_spatial(self, ff):
return np.mean(abs(ff) ** 2) / 2
def project_fft_on_realX(self, ff_fft):
return self.fft(self.ifft(ff_fft))
def get_is_transposed(self):
return False
def create_arrayX(self, value=None):
"""Return a constant array in real space."""
shapeX = self.shapeX
field = self.empty_aligned(shapeX)
if value is not None:
field.fill(value)
return field
def create_arrayK(self, value=None):
"""Return a constant array in real space."""
shapeK = self.shapeK
field = self.empty_aligned(shapeK, dtype=np.complex128)
if value is not None:
field.fill(value)
return field
[docs]
class FFTW2DReal2Complex(BasePyFFT):
"""A class to use fftw"""
def __init__(self, nx, ny):
shapeX = (ny, nx)
super().__init__(shapeX)
self.fft2d = self.fft
self.ifft2d = self.ifft
def sum_wavenumbers(self, ff_fft):
if self.shapeX[1] % 2 == 0:
return (
np.sum(ff_fft[:, 0])
+ np.sum(ff_fft[:, -1])
+ 2 * np.sum(ff_fft[:, 1:-1])
)
else:
return np.sum(ff_fft[:, 0]) + 2 * np.sum(ff_fft[:, 1:])
def get_seq_indices_first_K(self):
return 0, 0
def get_seq_indices_first_X(self):
return 0, 0
[docs]
def get_x_adim_loc(self):
"""Get the coordinates of the points stored locally.
Returns
-------
x0loc : np.ndarray
x1loc : np.ndarray
The indices correspond to the index of the dimension in real space.
"""
nyseq, nxseq = self.get_shapeX_seq()
ix0_start, ix1_start = self.get_seq_indices_first_X()
nx0loc, nx1loc = self.get_shapeX_loc()
x0loc = np.array(range(ix0_start, ix0_start + nx0loc))
x1loc = np.array(range(ix1_start, ix1_start + nx1loc))
return x0loc, x1loc
[docs]
def get_k_adim_loc(self):
"""Get the non-dimensional wavenumbers stored locally.
Returns
-------
k0_adim_loc : np.ndarray
k1_adim_loc : np.ndarray
The indices correspond to the index of the dimension in spectral space.
"""
nyseq, nxseq = self.get_shapeX_seq()
kyseq = np.array(
list(range(nyseq // 2 + 1)) + list(range(-nyseq // 2 + 1, 0))
)
kxseq = np.array(range(nxseq // 2 + 1))
if self.get_is_transposed():
k0seq, k1seq = kxseq, kyseq
else:
k0seq, k1seq = kyseq, kxseq
ik0_start, ik1_start = self.get_seq_indices_first_K()
nk0loc, nk1loc = self.get_shapeK_loc()
k0_adim_loc = k0seq[ik0_start : ik0_start + nk0loc]
k1_adim_loc = k1seq[ik1_start : ik1_start + nk1loc]
return k0_adim_loc, k1_adim_loc
[docs]
class FFTW3DReal2Complex(BasePyFFT):
"""A class to use fftw"""
def __init__(self, nx, ny, nz):
shapeX = (nz, ny, nx)
super().__init__(shapeX)
self.fft3d = self.fft
self.ifft3d = self.ifft
def sum_wavenumbers(self, ff_fft):
if self.shapeX[2] % 2 == 0:
return (
np.sum(ff_fft[:, :, 0])
+ np.sum(ff_fft[:, :, -1])
+ 2 * np.sum(ff_fft[:, :, 1:-1])
)
else:
return np.sum(ff_fft[:, :, 0]) + 2 * np.sum(ff_fft[:, :, 1:])
def get_k_adim(self):
nK0, nK1, nK2 = self.shapeK
kz_adim_max = nK0 // 2
kz_adim_min = -((nK0 - 1) // 2)
ky_adim_max = nK1 // 2
ky_adim_min = -((nK1 - 1) // 2)
return (
np.r_[0 : kz_adim_max + 1, kz_adim_min:0],
np.r_[0 : ky_adim_max + 1, ky_adim_min:0],
np.arange(nK2),
)
# def get_k_adim_loc(self):
# return self.get_k_adim()
def get_dimX_K(self):
return 0, 1, 2
# def get_seq_indices_first_K(self):
# return 0, 0
# def compute_energy_from_spatial(self, ff):
# return np.mean(abs(ff)**2)/2
def project_fft_on_realX(self, ff_fft):
return self.fft2d(self.ifft2d(ff_fft))
def build_invariant_arrayX_from_2d_indices12X(self, o2d, arr2d):
nX0, nX1, nX2 = self.get_shapeX_seq()
nX0loc, nX1loc, nX2loc = self.get_shapeX_loc()
if (nX1, nX2) != o2d.get_shapeX_seq():
raise ValueError("Not the same physical shape...")
# check that the 2d fft is not with distributed memory...
if o2d.get_shapeX_loc() != o2d.get_shapeX_seq():
raise ValueError("2d fft is with distributed memory...")
ind0seq_first, ind1seq_first, _ = self.get_seq_indices_first_K()
if (nX1loc, nX2loc) == o2d.get_shapeX_loc():
arr3d_loc_2dslice = arr2d
else:
raise NotImplementedError
arr3d = np.empty([nX0loc, nX1loc, nX2loc])
for i0 in range(nX0loc):
arr3d[i0] = arr3d_loc_2dslice
return arr3d
def build_invariant_arrayK_from_2d_indices12X(self, o2d, arr2d):
nK0, nK1, nK2 = self.get_shapeK_seq()
ret = np.zeros((nK0,) + o2d.shapeK_seq, dtype=np.complex128)
ret[0] = arr2d
return ret
[docs]
def get_seq_indices_first_X(self):
"""Get the "sequential" indices of the first number in Real space."""
return 0, 0, 0
[docs]
def get_seq_indices_first_K(self):
"""Get the "sequential" indices of the first number in Fourier space."""
return 0, 0, 0
[docs]
def get_k_adim_loc(self):
"""Get the non-dimensional wavenumbers stored locally.
Returns
-------
k0_adim_loc : np.ndarray
k1_adim_loc : np.ndarray
k2_adim_loc : np.ndarray
The indices correspond to the index of the dimension in spectral space.
"""
nK0, nK1, nK2 = self.get_shapeK_seq()
nK0_loc, nK1_loc, nK2_loc = self.get_shapeK_loc()
d0, d1, d2 = self.get_dimX_K()
i0_start, i1_start, i2_start = self.get_seq_indices_first_K()
k0_adim = compute_k_adim_seq_3d(nK0, d0)
k0_adim_loc = k0_adim[i0_start : i0_start + nK0_loc]
k1_adim = compute_k_adim_seq_3d(nK1, d1)
k1_adim_loc = k1_adim[i1_start : i1_start + nK1_loc]
k2_adim_loc = compute_k_adim_seq_3d(nK2, d2)
return k0_adim_loc, k1_adim_loc, k2_adim_loc
def compute_k_adim_seq_3d(nk, axis):
"""Compute the adimensional wavenumber for an axis.
Parameters
----------
nk : int
Global size in Fourier space for the axis.
axis : int
Index of the axis in real space (0 for z, 1 for y and 2 for x).
"""
if axis == 2:
return np.arange(nk)
else:
k_adim_max = nk // 2
k_adim_min = -((nk - 1) // 2)
return np.r_[0 : k_adim_max + 1, k_adim_min:0]
[docs]
class FFTW1D(BasePyFFT):
"""A class to use fftw 1D"""
def __init__(self, n):
try:
import pyfftw
except ImportError as err:
raise ImportError("ImportError. Instead fftpack?", err)
if n % 2 != 0:
raise ValueError("n should be even")
shapeX = (n,)
shapeK = (n,)
self.shapeX = shapeX
self.shapeK = self.shapeK_seq = self.shapeK_loc = shapeK
self.arrayX = pyfftw.empty_aligned(shapeX, "complex128")
self.arrayK = pyfftw.empty_aligned(shapeK, "complex128")
self.fftplan = pyfftw.FFTW(
input_array=self.arrayX,
output_array=self.arrayK,
axes=(-1,),
direction="FFTW_FORWARD",
threads=nthreads,
)
self.ifftplan = pyfftw.FFTW(
input_array=self.arrayK,
output_array=self.arrayX,
axes=(-1,),
direction="FFTW_BACKWARD",
threads=nthreads,
)
self.coef_norm = n
self.inv_coef_norm = 1.0 / n
def fft(self, ff):
self.arrayX[:] = ff
self.fftplan()
return self.arrayK / self.coef_norm
def ifft(self, ff_fft):
self.arrayK[:] = ff_fft
self.ifftplan()
return self.arrayX.copy()
[docs]
class FFTW1DReal2Complex(BasePyFFT):
"""A class to use fftw 1D"""
def sum_wavenumbers(self, ff_fft):
if self.shapeX[0] % 2 == 0:
return ff_fft[0] + ff_fft[-1] + 2 * np.sum(ff_fft[1:-1])
else:
return ff_fft[0] + 2 * np.sum(ff_fft[1:])
def compute_energy_from_Fourier(self, ff_fft):
return (
abs(ff_fft[0]) ** 2
+ 2 * np.sum(abs(ff_fft[1:-1]) ** 2)
+ abs(ff_fft[-1]) ** 2
) / 2
def compute_energy_from_spatial(self, ff):
return np.mean(abs(ff) ** 2) / 2