#
# Copyright (C) 2013-2023 The ESPResSo project
#
# This file is part of ESPResSo.
#
# ESPResSo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ESPResSo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import itertools
import numpy as np
from . import utils
from .detail.walberla import VTKOutputBase, LatticeWalberla
from .script_interface import ScriptInterfaceHelper, script_interface_register, array_variant
import espressomd.detail.walberla
import espressomd.shapes
import espressomd.code_features
[docs]
class VelocityBounceBack:
"""
Hold velocity information for the velocity bounce back boundary
condition at a single node.
"""
def __init__(self, velocity):
utils.check_type_or_throw_except(
velocity, 3, float, "VelocityBounceBack velocity must be three floats")
self.velocity = velocity
[docs]
class HydrodynamicInteraction(ScriptInterfaceHelper):
"""
Base class for LB implementations.
"""
def __getitem__(self, key):
raise NotImplementedError("Derived classes must implement this method")
def __str__(self):
return f"{self.__class__.__name__}({self.get_params()})"
[docs]
def validate_params(self, params):
pass
[docs]
def valid_keys(self):
return {"agrid", "tau", "density", "ext_force_density",
"kinematic_viscosity", "lattice", "kT", "seed"}
[docs]
def required_keys(self):
return {"lattice", "density", "kinematic_viscosity", "tau"}
[docs]
def default_params(self):
return {"lattice": None, "seed": 0, "kT": 0.,
"ext_force_density": [0.0, 0.0, 0.0]}
[docs]
def mach_limit(self):
"""
The fluid velocity is limited to :math:`v_{\\mathrm{max}} = 0.20`
(see *quasi-incompressible limit* in :cite:`kruger17a`,
chapter 7, page 272), which corresponds to Mach 0.35.
The relative error in the fluid density between a compressible fluid
and an incompressible fluid at Mach 0.30 is less than 5% (see
*constant density assumption* in :cite:`kundu02a` chapter 16, page
663). Since the speed of sound is :math:`c_s = 1 / \\sqrt{3}` in LB
velocity units in a D3Q19 lattice, the velocity limit at Mach 0.30
is :math:`v_{\\mathrm{max}} = 0.30 / \\sqrt{3} \\approx 0.17`.
At Mach 0.35 the relative error is around 6% and
:math:`v_{\\mathrm{max}} = 0.35 / \\sqrt{3} \\approx 0.20`.
Returns
-------
v_max : :obj:`float`
The Mach limit expressed in LB velocity units.
"""
return 0.20
@classmethod
def _check_mach_limit(cls, velocities):
vel_max = cls.mach_limit(cls)
velocities = np.reshape(velocities, (-1, 3))
if np.any(np.linalg.norm(velocities, axis=1) > vel_max):
speed_of_sound = 1. / np.sqrt(3.)
mach_number = vel_max / speed_of_sound
raise ValueError(f"Slip velocity exceeds Mach {mach_number:.2f}")
@property
def pressure_tensor(self):
tensor = self.call_method("get_pressure_tensor")
return utils.array_locked(tensor)
@pressure_tensor.setter
def pressure_tensor(self, value):
raise RuntimeError(f"Property 'pressure_tensor' is read-only")
[docs]
@script_interface_register
class LBFluidWalberla(HydrodynamicInteraction,
espressomd.detail.walberla.LatticeModel):
"""
The lattice-Boltzmann method for hydrodynamics using waLBerla.
If argument ``lattice`` is not provided, one will be default
constructed if an argument ``agrid`` is provided.
Parameters
----------
lattice : :obj:`espressomd.lb.LatticeWalberla <espressomd.detail.walberla.LatticeWalberla>`
Lattice object. If not provided, a default one will be constructed
using the ``agrid`` parameter.
agrid : :obj:`float`
Lattice constant. The box size in every direction must be an integer
multiple of ``agrid``. Cannot be provided together with ``lattice``.
tau : :obj:`float`
LB time step, must be an integer multiple of the MD time step.
density : :obj:`float`
Fluid density.
kinematic_viscosity : :obj:`float`
Fluid kinematic viscosity.
ext_force_density : (3,) array_like of :obj:`float`, optional
Force density applied on the fluid.
kT : :obj:`float`, optional
Thermal energy of the simulated heat bath (for thermalized fluids).
Set it to 0 for an unthermalized fluid.
seed : :obj:`int`, optional
Initial counter value (or seed) of the philox RNG.
Required for a thermalized fluid. Must be positive.
single_precision : :obj:`bool`, optional
Use single-precision floating-point arithmetic.
Methods
-------
get_interpolated_velocity()
Get LB fluid velocity at specified position.
Parameters
----------
pos : (3,) array_like of :obj:`float`
The position at which velocity is requested.
Returns
-------
v : (3,) array_like :obj:`float`
The LB fluid velocity at ``pos``.
add_force_at_pos():
Adds a force to the fluid at given position.
Parameters
----------
pos : (3,) array_like of :obj:`float`
The position at which the force will be added.
force : (3,) array_like of :obj:`float`
The force vector which will be distributed at the position.
clear_boundaries()
Remove velocity bounce-back boundary conditions.
save_checkpoint()
Write LB node populations and boundary conditions to a file.
Parameters
----------
path : :obj:`str`
Destination file path.
binary : :obj:`bool`
Whether to write in binary or ASCII mode.
load_checkpoint()
Load LB node populations and boundary conditions from a file.
Parameters
----------
path : :obj:`str`
File path to read from.
binary : :obj:`bool`
Whether to read in binary or ASCII mode.
add_vtk_writer()
Attach a VTK writer.
Parameters
----------
vtk : :class:`espressomd.lb.VTKOutput`
VTK writer.
remove_vtk_writer()
Detach a VTK writer.
Parameters
----------
vtk : :class:`espressomd.lb.VTKOutput`
VTK writer.
clear_vtk_writers()
Detach all VTK writers.
"""
_so_name = "walberla::LBFluidCPU"
_so_features = ("WALBERLA",)
_so_creation_policy = "GLOBAL"
_so_bind_methods = (
"add_force_at_pos",
"clear_boundaries",
"get_interpolated_velocity",
"add_vtk_writer",
"remove_vtk_writer",
"clear_vtk_writers",
)
def __init__(self, *args, **kwargs):
if "sip" not in kwargs:
params = self.default_params()
params.update(kwargs)
self.validate_params(params)
super().__init__(*args, **params)
else:
super().__init__(**kwargs)
[docs]
def validate_params(self, params):
super().validate_params(params)
# construct default lattice if necessary
if params.get("lattice") is None:
if "agrid" not in params:
raise ValueError("missing argument 'lattice' or 'agrid'")
params["lattice"] = LatticeWalberla(
agrid=params.pop("agrid"), n_ghost_layers=1)
elif "agrid" in params:
raise ValueError("cannot provide both 'lattice' and 'agrid'")
utils.check_required_keys(self.required_keys(), params.keys())
utils.check_valid_keys(self.valid_keys(), params.keys())
[docs]
def default_params(self):
return {"single_precision": False, **super().default_params()}
[docs]
def valid_keys(self):
return {"single_precision", *super().valid_keys()}
def __getitem__(self, key):
if isinstance(key, (tuple, list, np.ndarray)) and len(key) == 3:
if any(isinstance(item, slice) for item in key):
return LBFluidSliceWalberla(parent_sip=self, slice_range=key)
else:
return LBFluidNodeWalberla(
parent_sip=self, index=np.array(key))
raise TypeError(
f"{key} is not a valid index. Should be a point on the "
"nodegrid e.g. lbf[0,0,0], or a slice e.g. lbf[:,0,0]")
[docs]
def add_boundary_from_shape(self, shape,
velocity=np.zeros(3, dtype=float),
boundary_type=VelocityBounceBack):
"""
Set velocity bounce-back boundary conditions from a shape.
Parameters
----------
shape : :obj:`espressomd.shapes.Shape`
Shape to rasterize.
velocity : (3,) or (L, M, N, 3) array_like of :obj:`float`, optional
Slip velocity. By default no-slip boundary conditions are used.
If a vector of 3 values, a uniform slip velocity is used,
otherwise ``L, M, N`` must be equal to the LB grid dimensions.
boundary_type : Union[:class:`~espressomd.lb.VelocityBounceBack`] (optional)
Type of the boundary condition.
"""
if not issubclass(boundary_type, VelocityBounceBack):
raise TypeError(
"Parameter 'boundary_type' must be a subclass of VelocityBounceBack")
utils.check_type_or_throw_except(
shape, 1, espressomd.shapes.Shape, "expected an espressomd.shapes.Shape")
if np.shape(velocity) not in [(3,), tuple(self.shape) + (3,)]:
raise ValueError(
f'Cannot process velocity value grid of shape {np.shape(velocity)}')
# range checks
lattice_speed = self.call_method("get_lattice_speed")
velocity = np.array(velocity, dtype=float).reshape((-1, 3))
velocity *= 1. / lattice_speed
self._check_mach_limit(velocity)
mask = self.get_shape_bitmask(shape=shape).astype(int)
self.call_method(
"add_boundary_from_shape",
raster=array_variant(mask.flatten()),
values=array_variant(velocity.flatten()))
[docs]
@script_interface_register
class LBFluidWalberlaGPU(LBFluidWalberla):
"""
Initialize the lattice-Boltzmann method for hydrodynamic flow using
waLBerla for the GPU. See :class:`HydrodynamicInteraction` for the
list of parameters.
"""
_so_name = "walberla::LBFluidGPU"
_so_creation_policy = "GLOBAL"
_so_features = ("WALBERLA", "CUDA")
[docs]
def default_params(self):
params = super().default_params()
params["single_precision"] = True
return params
[docs]
@script_interface_register
class LBFluidNodeWalberla(ScriptInterfaceHelper):
_so_name = "walberla::LBFluidNode"
_so_creation_policy = "GLOBAL"
[docs]
def required_keys(self):
return {"parent_sip", "index"}
def __reduce__(self):
raise NotImplementedError("Cannot serialize LB fluid node objects")
def __eq__(self, obj):
return isinstance(obj, LBFluidNodeWalberla) and self.index == obj.index
def __hash__(self):
return hash(self.index)
@property
def index(self):
return tuple(self._index)
@index.setter
def index(self, value):
raise RuntimeError("Parameter 'index' is read-only.")
@property
def density(self):
return self.call_method("get_density")
@density.setter
def density(self, value):
self.call_method("set_density", value=value)
@property
def population(self):
return utils.array_locked(self.call_method("get_population"))
@population.setter
def population(self, value):
self.call_method("set_population", value=value)
@property
def pressure_tensor(self):
tensor = self.call_method("get_pressure_tensor")
return utils.array_locked(tensor)
@pressure_tensor.setter
def pressure_tensor(self, value):
raise RuntimeError("Property 'pressure_tensor' is read-only.")
@property
def pressure_tensor_neq(self):
tensor = self.call_method("get_pressure_tensor_neq")
return utils.array_locked(tensor)
@pressure_tensor_neq.setter
def pressure_tensor_neq(self, value):
raise RuntimeError("Property 'pressure_tensor_neq' is read-only.")
@property
def is_boundary(self):
return self.call_method("get_is_boundary")
@is_boundary.setter
def is_boundary(self, value):
raise RuntimeError("Property 'is_boundary' is read-only.")
@property
def boundary(self):
"""
Returns
-------
:class:`~espressomd.lb.VelocityBounceBack`
If the node is a boundary node
None
If the node is not a boundary node
"""
velocity = self.call_method("get_velocity_at_boundary")
if velocity is not None:
return VelocityBounceBack(velocity)
return None
@boundary.setter
def boundary(self, value):
"""
Parameters
----------
value : :class:`~espressomd.lb.VelocityBounceBack` or ``None``
If value is :class:`~espressomd.lb.VelocityBounceBack`,
set the node to be a boundary node with the specified velocity.
If value is ``None``, the node will become a fluid node.
"""
if isinstance(value, VelocityBounceBack):
value = value.velocity
lattice_speed = self.call_method("get_lattice_speed")
HydrodynamicInteraction._check_mach_limit(
np.array(value) / lattice_speed)
elif value is not None:
raise TypeError(
"Parameter 'value' must be an instance of VelocityBounceBack or None")
self.call_method("set_velocity_at_boundary", value=value)
@property
def boundary_force(self):
return self.call_method("get_boundary_force")
@boundary_force.setter
def boundary_force(self, value):
raise RuntimeError("Property 'boundary_force' is read-only.")
@property
def velocity(self):
return self.call_method("get_velocity")
@velocity.setter
def velocity(self, value):
self.call_method("set_velocity", value=value)
@property
def last_applied_force(self):
return self.call_method("get_last_applied_force")
@last_applied_force.setter
def last_applied_force(self, value):
self.call_method("set_last_applied_force", value=value)
[docs]
@script_interface_register
class LBFluidSliceWalberla(ScriptInterfaceHelper):
_so_name = "walberla::LBFluidSlice"
_so_creation_policy = "GLOBAL"
[docs]
def required_keys(self):
return {"parent_sip", "slice_range"}
[docs]
def validate_params(self, params):
utils.check_required_keys(self.required_keys(), params.keys())
def __init__(self, *args, **kwargs):
if "sip" in kwargs:
super().__init__(**kwargs)
else:
self.validate_params(kwargs)
slice_range = kwargs.pop("slice_range")
grid_size = kwargs["parent_sip"].shape
extra_kwargs = espressomd.detail.walberla.get_slice_bounding_box(
slice_range, grid_size)
node = LBFluidNodeWalberla(index=np.array([0, 0, 0]), **kwargs)
super().__init__(*args, node_sip=node, **kwargs, **extra_kwargs)
def __iter__(self):
lower, upper = self.call_method("get_slice_ranges")
indices = [list(range(lower[i], upper[i])) for i in range(3)]
lb_sip = self.call_method("get_lb_sip")
for index in itertools.product(*indices):
yield LBFluidNodeWalberla(parent_sip=lb_sip, index=np.array(index))
def __reduce__(self):
raise NotImplementedError("Cannot serialize LB fluid slice objects")
def _getter(self, attr):
value_grid, shape = self.call_method(f"get_{attr}")
if attr == "velocity_at_boundary":
value_grid = [
None if x is None else VelocityBounceBack(x) for x in value_grid]
return utils.array_locked(np.reshape(value_grid, shape))
def _setter(self, attr, values):
dimensions = self.call_method("get_slice_size")
if 0 in dimensions:
raise AttributeError(
f"Cannot set properties of an empty '{self.__class__.__name__}' object")
values = np.copy(values)
value_shape = tuple(self.call_method("get_value_shape", name=attr))
target_shape = (*dimensions, *value_shape)
# broadcast if only one element was provided
if values.shape == value_shape or values.shape == () and value_shape == (1,):
values = np.full(target_shape, values)
def shape_squeeze(shape):
return tuple(x for x in shape if x != 1)
if shape_squeeze(values.shape) != shape_squeeze(target_shape):
target_shape = tuple([int(x) for x in target_shape])
raise ValueError(
f"Input-dimensions of '{attr}' array {values.shape} does not match slice dimensions {target_shape}")
self.call_method(f"set_{attr}", values=values.flatten())
@property
def density(self):
return self._getter("density",)
@density.setter
def density(self, value):
self._setter("density", value)
@property
def population(self):
return self._getter("population")
@population.setter
def population(self, value):
self._setter("population", value)
@property
def pressure_tensor(self):
return self._getter("pressure_tensor")
@pressure_tensor.setter
def pressure_tensor(self, value):
raise RuntimeError("Property 'pressure_tensor' is read-only.")
@property
def pressure_tensor_neq(self):
return self._getter("pressure_tensor_neq")
@pressure_tensor_neq.setter
def pressure_tensor_neq(self, value):
raise RuntimeError("Property 'pressure_tensor_neq' is read-only.")
@property
def is_boundary(self):
return self._getter("is_boundary")
@is_boundary.setter
def is_boundary(self, value):
raise RuntimeError("Property 'is_boundary' is read-only.")
@property
def boundary(self):
"""
Returns
-------
(N, M, L) array_like of :class:`~espressomd.lb.VelocityBounceBack`
If the nodes are boundary nodes
(N, M, L) array_like of ``None``
If the nodes are not boundary nodes
"""
return self._getter("velocity_at_boundary")
@boundary.setter
def boundary(self, values):
"""
Parameters
----------
values : (N, M, L) array_like of :class:`~espressomd.lb.VelocityBounceBack` or ``None``
If values are :class:`~espressomd.lb.VelocityBounceBack`,
set the nodes to be boundary nodes with the specified velocity.
If values are ``None``, the nodes will become fluid nodes.
"""
type_error_msg = "Parameter 'values' must be an array_like of VelocityBounceBack or None"
values = np.copy(values)
lattice_speed = self.call_method("get_lattice_speed")
if values.dtype != np.dtype("O"):
raise TypeError(type_error_msg)
for index in np.ndindex(*values.shape):
if values[index] is not None:
if not isinstance(values[index], VelocityBounceBack):
raise TypeError(type_error_msg)
HydrodynamicInteraction._check_mach_limit(
np.array(values[index].velocity) / lattice_speed)
values[index] = np.array(values[index].velocity)
self._setter("velocity_at_boundary", values=values)
@property
def boundary_force(self):
return self._getter("boundary_force")
@boundary_force.setter
def boundary_force(self, value):
raise RuntimeError("Property 'boundary_force' is read-only.")
@property
def velocity(self):
return self._getter("velocity")
@velocity.setter
def velocity(self, value):
self._setter("velocity", value)
@property
def last_applied_force(self):
return self._getter("last_applied_force")
@last_applied_force.setter
def last_applied_force(self, value):
self._setter("last_applied_force", value)
[docs]
@script_interface_register
class VTKOutput(VTKOutputBase):
"""
Create a VTK writer.
Files are written to ``<base_folder>/<identifier>/<prefix>_*.vtu``.
Summary is written to ``<base_folder>/<identifier>.pvd``.
Manual VTK callbacks can be called at any time to take a snapshot
of the current state of the LB fluid.
Automatic VTK callbacks can be disabled at any time and re-enabled later.
Please note that the internal VTK counter is no longer incremented when
an automatic callback is disabled, which means the number of LB steps
between two frames will not always be an integer multiple of ``delta_N``.
Parameters
----------
identifier : :obj:`str`
Name of the VTK writer.
observables : :obj:`list`, {'density', 'velocity_vector', 'pressure_tensor'}
List of observables to write to the VTK files.
delta_N : :obj:`int`
Write frequency. If this value is 0 (default), the object is a
manual VTK callback that must be triggered manually. Otherwise,
it is an automatic callback that is added to the time loop and
writes every ``delta_N`` LB steps.
base_folder : :obj:`str` (optional), default is 'vtk_out'
Path to the output VTK folder.
prefix : :obj:`str` (optional), default is 'simulation_step'
Prefix for VTK files.
"""
_so_name = "walberla::LBVTKHandle"
_so_creation_policy = "GLOBAL"
_so_bind_methods = ("enable", "disable", "write")
[docs]
def required_keys(self):
return self.valid_keys() - self.default_params().keys()
def __repr__(self):
class_id = f"{self.__class__.__module__}.{self.__class__.__name__}"
if self.delta_N:
write_when = f"every {self.delta_N} LB steps"
if not self.enabled:
write_when += " (disabled)"
else:
write_when = "on demand"
return f"<{class_id}: write to '{self.vtk_uid}' {write_when}>"
[docs]
def edge_detection(boundary_mask, periodicity):
"""
Find boundary nodes in contact with the fluid. Relies on a convolution
kernel constructed from the D3Q19 stencil.
Parameters
----------
boundary_mask : (N, M, L) array_like of :obj:`bool`
Bitmask for the rasterized boundary geometry.
periodicity : (3,) array_like of :obj:`bool`
Bitmask for the box periodicity.
Returns
-------
(N, 3) array_like of :obj:`int`
The indices of the boundary nodes at the interface with the fluid.
"""
import scipy.signal
import itertools
fluid_mask = np.logical_not(boundary_mask)
# edge kernel
edge = -np.ones((3, 3, 3))
for i, j, k in itertools.product((0, 2), (0, 2), (0, 2)):
edge[i, j, k] = 0
edge[1, 1, 1] = -np.sum(edge)
# periodic convolution
wrapped_mask = np.pad(fluid_mask.astype(int), 3 * [(2, 2)], mode="wrap")
if not periodicity[0]:
wrapped_mask[:2, :, :] = 0
wrapped_mask[-2:, :, :] = 0
if not periodicity[1]:
wrapped_mask[:, :2, :] = 0
wrapped_mask[:, -2:, :] = 0
if not periodicity[2]:
wrapped_mask[:, :, :2] = 0
wrapped_mask[:, :, -2:] = 0
convolution = scipy.signal.convolve(
wrapped_mask, edge, mode="same", method="direct")[2:-2, 2:-2, 2:-2]
convolution = np.multiply(convolution, boundary_mask)
return np.array(np.nonzero(convolution < 0)).T
[docs]
def calc_cylinder_tangential_vectors(center, agrid, offset, node_indices):
"""
Utility function to calculate a constant slip velocity tangential to the
surface of a cylinder.
Parameters
----------
center : (3,) array_like of :obj:`float`
Center of the cylinder.
agrid : :obj:`float`
LB agrid.
offset : :obj:`float`
LB offset.
node_indices : (N, 3) array_like of :obj:`int`
Indices of the boundary surface nodes.
Returns
-------
(N, 3) array_like of :obj:`float`
The unit vectors tangential to the surface of a cylinder.
"""
velocities = []
for ijk in node_indices:
p = (ijk + offset) * agrid
r = center - p
norm = np.linalg.norm(r[:2])
if norm < 1e-10:
velocities.append(np.zeros(3))
continue
angle_r = np.arccos(np.dot(r[:2] / norm, [1, 0]))
angle_v = angle_r - np.pi / 2
flip = np.sign(r[1])
slip_velocity = np.array([flip * np.cos(angle_v), np.sin(angle_v), 0.])
velocities.append(slip_velocity)
return np.array(velocities)