Source code for espressomd.zn

#
# Copyright (C) 2024 The ESPResSo project
#
# This file is part of ESPResSo.
#
# ESPResSo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ESPResSo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import subprocess
import numpy as np
import zndraw.zndraw
import zndraw.utils
import zndraw.draw
import znsocket
import znjson
import espressomd
import secrets
import time
import urllib.parse
import typing as t
import scipy.spatial.transform


# Standard colors
color_dict = {"black": "#303030",
              "red": "#e6194B",
              "green": "#3cb44b",
              "yellow": "#ffe119",
              "blue": "#4363d8",
              "orange": "#f58231",
              "purple": "#911eb4",
              "cyan": "#42d4f4",
              "magenta": "#f032e6",
              "lime": "#bfef45",
              "brown": "#9A6324",
              "grey": "#a9a9a9",
              "white": "#f0f0f0"}


[docs] class EspressoConverter(znjson.ConverterBase): """ Converter for ESPResSo systems to ASEDict """ level = 100 representation = "ase.Atoms" instance = espressomd.system.System
[docs] def encode(self, system) -> zndraw.utils.ASEDict: self.system = system self.particles = self.system.part.all() self.num_particles = len(self.particles) self.params = system.visualizer_params self.numbers = self.num_particles * [1] if self.params["folded"] is True: self.positions = self.particles.pos_folded else: self.positions = self.particles.pos if self.params["colors"] is None: self.colors = self.get_default_colors() else: self.colors = self.set_colors(self.params["colors"]) if self.params["radii"] is None: self.radii = self.get_default_radii() else: self.radii = self.set_radii(self.params["radii"]) if self.params["bonds"] is True: bonds = self.get_bonds() else: bonds = [] arrays = { "colors": self.colors, "radii": self.radii, } cell = [[system.box_l[0], 0, 0], [0, system.box_l[1], 0], [0, 0, system.box_l[2]]] pbc = system.periodicity calc = None info = {} if self.params["vector_field"] is not None: vectors = self.params["vector_field"]() else: vectors = [] return zndraw.utils.ASEDict( numbers=self.numbers, positions=self.positions.tolist(), connectivity=bonds, arrays=arrays, info=info, calc=calc, pbc=pbc.tolist(), cell=cell, vectors=vectors, )
[docs] def decode(self, value): value = None return value
[docs] def get_default_colors(self): return [color_dict["white"]] * self.num_particles
[docs] def get_default_radii(self): return [0.5] * self.num_particles
[docs] def set_colors(self, colors): color_list = list() for p in self.particles: color = colors[p.type] # if color starts with #, assume it is a hex color if color.startswith("#"): color_list.append(color) else: if color not in color_dict: raise ValueError( f"Color {color} not found in color dictionary") color_list.append(color_dict[color]) return color_list
[docs] def set_radii(self, radii): radius_list = list() for p in self.particles: radius_list.append(radii[p.type]) return radius_list
[docs] def get_bonds(self): bonds = [] for p in self.particles: if not p.bonds: continue for bond in p.bonds: if len(bond) == 4: bonds.append([p.id, bond[1], 1]) bonds.append([p.id, bond[2], 1]) bonds.append([bond[2], bond[3], 1]) else: for bond_partner in bond[1:]: bonds.append([p.id, bond_partner, 1]) self.process_bonds(bonds) return bonds
[docs] def process_bonds(self, bonds): half_box_l = 0.5 * self.system.box_l num_part = len(self.positions) bonds_to_remove = [] bonds_to_add = [] for b in bonds: try: if self.params["folded"] is True: x_a = self.system.part.by_id(b[0]).pos_folded x_b = self.system.part.by_id(b[1]).pos_folded else: x_a = self.system.part.by_id(b[0]).pos x_b = self.system.part.by_id(b[1]).pos except Exception: bonds_to_remove.append(b) continue dx = x_b - x_a if np.all(np.abs(dx) < half_box_l): continue if self.params["folded"] is False: bonds_to_remove.append(b) continue d = self.cut_bond(x_a, dx) if d is np.inf: bonds_to_remove.append(b) continue s_a = x_a + 0.8 * dx s_b = x_b - 0.8 * dx bonds_to_remove.append(b) self.add_ghost_particle(pos=s_a, color=self.colors[b[0]]) bonds_to_add.append([b[0], num_part, 1]) self.add_ghost_particle(pos=s_b, color=self.colors[b[1]]) bonds_to_add.append([b[1], num_part + 1, 1]) num_part += 2 for b in bonds_to_remove: bonds.remove(b) bonds.extend(bonds_to_add)
[docs] def cut_bond(self, x_a, dx): if np.dot(dx, dx) < 1e-9: return np.inf shift = np.rint(dx / self.system.box_l) dx -= shift * self.system.box_l best_d = np.inf for i in range(3): if dx[i] == 0: continue elif dx[i] > 0: p0_i = self.system.box_l[i] else: p0_i = 0 d = (p0_i - x_a[i]) / dx[i] if d < best_d: best_d = d return best_d
[docs] def add_ghost_particle(self, pos, color): self.positions = np.vstack([self.positions, pos]) self.radii.append(1e-6 * min(self.radii)) self.colors.append(color) self.numbers.append(2)
znjson.config.register(EspressoConverter)
[docs] class LBField: """ Convert the ESPResSo lattice-Boltzmann field to a vector field for visualization. Samples the field at a given step size and offset over the lattice nodes. Parameters ---------- system : :class:`~espressomd.system.System` ESPResSo system step_x : :obj:`int`, optional Step size in x direction, by default 1 step_y : :obj:`int`, optional Step size in y direction, by default 1 step_z : :obj:`int`, optional Step size in z direction, by default 1 offset_x : :obj:`int`, optional Offset in x direction, by default 0 offset_y : :obj:`int`, optional Offset in y direction, by default 0 offset_z : :obj:`int`, optional Offset in z direction, by default 0 scale : :obj:`float`, optional Scale the velocity vectors, by default 1.0 arrow_config : :obj:`dict`, optional Configuration for the arrows, by default None and then uses the default configuration: 'colormap': [[-0.5, 0.9, 0.5], [-0.0, 0.9, 0.5]] HSL colormap for the arrows, where the first value is the minimum value and the second value is the maximum value. 'normalize': True Normalize the colormap to the maximum value each frame 'colorrange': [0, 1] Range of the colormap, only used if normalize is False 'scale_vector_thickness': True Scale the thickness of the arrows with the velocity 'opacity': 1.0 Opacity of the arrows """ def __init__(self, system: espressomd.system.System, step_x: int = 1, step_y: int = 1, step_z: int = 1, offset_x: int = 0, offset_y: int = 0, offset_z: int = 0, scale: float = 1.0, arrow_config: dict = None): self.step_x = step_x self.step_y = step_y self.step_z = step_z self.offset_x = offset_x self.offset_y = offset_y self.offset_z = offset_z self.scale = scale self.box = system.box_l if system.lb is None: raise ValueError("System does not have a lattice-Boltzmann solver") self.lbf = system.lb self.agrid = system.lb.agrid self.arrow_config = arrow_config def _build_grid(self): x = np.arange(self.offset_x + self.agrid / 2, self.box[0], self.step_x * self.agrid) y = np.arange(self.offset_y + self.agrid / 2, self.box[1], self.step_y * self.agrid) z = np.arange(self.offset_z + self.agrid / 2, self.box[2], self.step_z * self.agrid) origins = np.array(np.meshgrid(x, y, z)).T.reshape(-1, 3) return origins def _get_velocities(self): velocities = self.lbf[:, :, :].velocity velocities = velocities[::self.step_x, ::self.step_y, ::self.step_z] velocities = self.scale * velocities velocities = np.swapaxes(velocities, 0, 3) velocities = np.swapaxes(velocities, 2, 3) velocities = velocities.T.reshape(-1, 3) return velocities def __call__(self): origins = self._build_grid() velocities = self._get_velocities() velocities = origins + velocities vector_field = np.stack([origins, velocities], axis=1) return vector_field.tolist()
[docs] class VectorField: """ Give an array of origins and vectors to create a vector field for visualization. The vectorfield is updated every time it is called. Both origins and vectors must have the same shape. Both origins and vectors must be 3D numpy arrays in the shape of ``(n, m, 3)``, where ``n`` is the number of frames the field has and ``m`` is the number of vectors. The number of frames ``n`` must larger or equal to the number of times the update function will be called in the update loop. Parameters ---------- origins : (n, m, 3) array_like of :obj:`float` Array of origins for the vectors vectors : (n, m, 3) array_like of :obj:`float` Array of vectors scale : :obj:`float`, optional Scale the vectors, by default 1 arrow_config : :obj:`dict`, optional Configuration for the arrows, by default ``None`` and then uses the default configuration: 'colormap': [[-0.5, 0.9, 0.5], [-0.0, 0.9, 0.5]] HSL colormap for the arrows, where the first value is the minimum value and the second value is the maximum value. 'normalize': True Normalize the colormap to the maximum value each frame 'colorrange': [0, 1] Range of the colormap, only used if normalize is False 'scale_vector_thickness': True Scale the thickness of the arrows with the velocity 'opacity': 1.0 Opacity of the arrows """ def __init__(self, origins: np.ndarray, vectors: np.ndarray, scale: float = 1, arrow_config: dict = None): self.origins = origins self.vectors = vectors self.scale = scale self.frame_count = 0 self.arrow_config = arrow_config def __call__(self): if self.origins.shape != self.vectors.shape: raise ValueError("Origins and vectors must have the same shape") origins = self.origins[self.frame_count] vectors = origins + self.vectors[self.frame_count] self.frame_count += 1 origins = origins.reshape(-1, 3) vectors = vectors.reshape(-1, 3) vectors = self.scale * vectors vector_field = np.stack([origins, vectors], axis=1) return vector_field.tolist()
[docs] class Visualizer(): """ Visualizer for ESPResSo simulations using ZnDraw. Main component of the visualizer is the ZnDraw server, which is started as a subprocess. The ZnDraw client is used to communicate with the server and send the visualized data. The visualized data is encoded using the :class:`EspressoConverter`, which converts the ESPResSo system to an typed dict. The visualizer uploads a new frame to the server every time the update method is called. Parameters ---------- system : :class:`~espressomd.system.System` ESPResSo system to visualize port : :obj:`int`, optional Port for the ZnDraw server, by default 1234, if taken, the next available port is used token : :obj:`str`, optional Token for the ZnDraw server, by default a random token is generated folded : :obj:`bool`, optional Fold the positions of the particles into the simulation box, by default True colors : :obj:`dict`, optional Dictionary containing color type mapping for the particles, by default all particles are white radii : :obj:`dict`, optional Dictionary containing radii type mapping for the particles, by default all particles have a radius of 0.5 bonds : :obj:`bool`, optional Draw bonds between particles, by default ``False`` jupyter : :obj:`bool`, optional Show the visualizer in a Jupyter notebook, by default True vector_field : :class:`~espressomd.zn.VectorField` or :class:`~espressomd.zn.LBField`, optional Vector field to visualize, by default ``None`` """ SERVER_PORT = None SOCKET_PORT = None def __init__(self, system: espressomd.system.System = None, port: int = 1234, token: str = None, folded: bool = True, colors: dict = None, radii: dict = None, bonds: bool = False, jupyter: bool = True, vector_field: t.Union[VectorField, LBField] = None, ): self.system = system self.params = { "folded": folded, "colors": colors, "radii": radii, "bonds": bonds, "vector_field": vector_field, } self.url = "http://127.0.0.1" self.frame_count = 0 if token is None: self.token = secrets.token_hex(4) else: self.token = token # A server is started in a subprocess, and we have to wait for it if self.SERVER_PORT is None: print("Starting ZnDraw server, this may take a few seconds") self.port = port self._start_server() time.sleep(10) self._start_zndraw() time.sleep(1) if vector_field is not None: self.arrow_config = {'colormap': [[-0.5, 0.9, 0.5], [-0.0, 0.9, 0.5]], 'normalize': True, 'colorrange': [0, 1], 'scale_vector_thickness': True, 'opacity': 1.0} if vector_field.arrow_config is not None: for key, value in vector_field.arrow_config.items(): if key not in self.arrow_config: raise ValueError(f"Invalid key {key} in arrow_config") self.arrow_config[key] = value if self.params["bonds"] and not self.params["folded"]: print( "Warning: Unfolded positions may result in incorrect bond visualization") if jupyter: self._show_jupyter() else: raise NotImplementedError( "Only Jupyter notebook is supported at the moment") def _start_server(self): """ Start the ZnDraw server through a subprocess """ self.socket_port = zndraw.utils.get_port(default=6374) Visualizer.SERVER_PORT = self.port Visualizer.SOCKET_PORT = self.socket_port self.server = subprocess.Popen(["zndraw", "--no-browser", f"--port={self.port}", f"--storage-port={self.socket_port}"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) def _start_zndraw(self): """ Start the ZnDraw client and connect to the server """ config = zndraw.zndraw.TimeoutConfig( connection=10, modifier=0.25, between_calls=0.1, emit_retries=3, call_retries=1, connect_retries=3, ) while True: try: self.r = znsocket.Client( address=f"{self.url}:{self.SOCKET_PORT}") break except BaseException: time.sleep(0.5) url = f"{self.url}:{self.SERVER_PORT}" self.zndraw = zndraw.zndraw.ZnDrawLocal( r=self.r, url=url, token=self.token, timeout=config) parsed_url = urllib.parse.urlparse( f"{self.zndraw.url}/token/{self.zndraw.token}") self.address = parsed_url._replace(scheme="http").geturl() def _show_jupyter(self): """ Show the visualizer in a Jupyter notebook """ from IPython.display import IFrame, display print(f"Showing ZnDraw at {self.address}") display(IFrame(src=self.address, width="100%", height="700px"))
[docs] def update(self): """ Update the visualizer with the current state of the system """ self.system.visualizer_params = self.params data = znjson.dumps( self.system, cls=znjson.ZnEncoder.from_converters( [EspressoConverter]) ) # Catch when the server is initializing an empty frame # len(self.zndraw) is a expensive socket call, so we try to avoid it if self.frame_count != 0 or len(self.zndraw) == 0: self.zndraw.append(data) else: self.zndraw.__setitem__(0, data) if self.frame_count == 0: self.zndraw.socket.sleep(1) x = self.system.box_l[0] / 2 y = self.system.box_l[1] / 2 z = self.system.box_l[2] / 2 z_dist = max([1.5 * y, 1.5 * x, 1.5 * z]) self.zndraw.camera = {'position': [ x, y, z_dist], 'target': [x, y, z]} self.zndraw.config.scene.frame_update = False if self.params["vector_field"] is not None: for key, value in self.arrow_config.items(): setattr(self.zndraw.config.arrows, key, value) self.frame_count += 1
[docs] def draw_constraints(self, shapes: list): """ Draw constraints on the visualizer """ if not isinstance(shapes, list): raise ValueError("Constraints must be given in a list") objects = [] for shape in shapes: shape_type = shape.__class__.__name__ mat = zndraw.draw.Material(color="#b0b0b0", opacity=0.8) if shape_type == "Cylinder": center = shape.center axis = shape.axis length = shape.length radius = shape.radius rotation_angles = zndraw.utils.direction_to_euler( axis, roll=np.pi / 2) objects.append(zndraw.draw.Cylinder(position=center, rotation=rotation_angles, radius_bottom=radius, radius_top=radius, height=length, material=mat)) elif shape_type == "Wall": dist = shape.dist normal = np.array(shape.normal) position = dist * normal helper = WallIntersection( plane_normal=normal, plane_point=position, box_l=self.system.box_l) corners = helper.get_intersections() base_position = np.copy(corners[0]) corners -= base_position # Rotate plane to align with z-axis, Custom2DShape only works # in the xy-plane unit_z = np.array([0, 0, 1]) r, _ = scipy.spatial.transform.Rotation.align_vectors( [unit_z], [normal]) rotated_corners = r.apply(corners) # Sort corners in a clockwise order, except the first corner angles = np.arctan2( rotated_corners[1:, 1], rotated_corners[1:, 0]) sorted_indices = np.argsort(angles) sorted_corners = rotated_corners[1:][sorted_indices] sorted_corners = np.vstack( [rotated_corners[0], sorted_corners])[:, :2] r, _ = scipy.spatial.transform.Rotation.align_vectors( [normal], [unit_z]) euler_angles = r.as_euler("xyz") # invert the z-axis, unsure why this is needed, maybe # different coordinate systems euler_angles[2] *= -1. objects.append(zndraw.draw.Custom2DShape( position=base_position, rotation=euler_angles, points=sorted_corners, material=mat)) elif shape_type == "Sphere": center = shape.center radius = shape.radius objects.append( zndraw.draw.Sphere(position=center, radius=radius, material=mat)) elif shape_type == "Rhomboid": a = shape.a b = shape.b c = shape.c corner = shape.corner objects.append( zndraw.draw.Rhomboid(position=corner, vectorA=a, vectorB=b, vectorC=c, material=mat)) elif shape_type == "Ellipsoid": center = shape.center a = shape.a b = shape.b objects.append(zndraw.draw.Ellipsoid(position=center, a=a, b=b, c=b, material=mat)) else: raise NotImplementedError( f"Shape of type {shape_type} isn't available in ZnDraw") self.zndraw.geometries = objects
[docs] class WallIntersection: """ Simple helper to calculate all Box edges that intersect with a plane. """ def __init__(self, plane_point, plane_normal, box_l): self.plane_point = plane_point self.plane_normal = plane_normal / np.linalg.norm(plane_normal) self.box_l = box_l # Create 8 vertices of the bounding box self.vertices = np.array([ [0, 0, 0], [0, 0, box_l[2]], [0, box_l[1], 0], [0, box_l[1], box_l[2]], [box_l[0], 0, 0], [box_l[0], 0, box_l[2]], [box_l[0], box_l[1], 0], [box_l[0], box_l[1], box_l[2]] ]) self.edges = [ (self.vertices[0], self.vertices[1]), (self.vertices[0], self.vertices[2]), (self.vertices[0], self.vertices[4]), (self.vertices[1], self.vertices[3]), (self.vertices[1], self.vertices[5]), (self.vertices[2], self.vertices[3]), (self.vertices[2], self.vertices[6]), (self.vertices[3], self.vertices[7]), (self.vertices[4], self.vertices[5]), (self.vertices[4], self.vertices[6]), (self.vertices[5], self.vertices[7]), (self.vertices[6], self.vertices[7]) ]
[docs] def plane_intersection_with_line(self, line_point1, line_point2): # Calculate the intersection point of a line and a plane line_dir = line_point2 - line_point1 denom = np.dot(self.plane_normal, line_dir) if np.abs(denom) > 1e-6: # Avoid division by zero t = np.dot(self.plane_normal, (self.plane_point - line_point1)) / denom if 0 <= t <= 1: return line_point1 + t * line_dir return None
[docs] def get_intersections(self): intersections = [] for edge in self.edges: intersection = self.plane_intersection_with_line(edge[0], edge[1]) if intersection is not None: intersections.append(intersection) return np.array(intersections)