Source code for espressomd.MDA_ESP

# Copyright (C) 2010-2022 The ESPResSo project
#
# This file is part of ESPResSo.
#
# ESPResSo is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ESPResSo is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
This modules exposes ESPResSo's coordinates and particle attributes
to MDAnalysis without the need to save any information to files.

The main class is :class:`Stream`, which is used to initialize the stream of
data to MDAnalysis' readers. These are the topology reader :class:`ESPParser`
and the coordinates reader :class:`ESPReader`.

A minimal working example is the following:

>>> import espressomd
>>> import espressomd.MDA_ESP
>>> import MDAnalysis as mda
>>> # system setup
>>> system = espressomd.System(box_l=[10., 10., 10.])
>>> system.time_step = 1.
>>> system.cell_system.skin = 1.
>>> system.part.add(pos=[1., 2., 3.])
>>> # set up the stream
>>> eos = espressomd.MDA_ESP.Stream(system)
>>> # feed Universe with a topology and coordinates
>>> u = mda.Universe(eos.topology, eos.trajectory)
>>> print(u)
<Universe with 1 atoms>

"""

try:
    import cStringIO as StringIO
    StringIO = StringIO.StringIO
except ImportError:
    from io import StringIO

import numpy as np
import MDAnalysis

import setuptools
assert setuptools.version.pkg_resources.packaging.specifiers.SpecifierSet('>=1.0.0,<2.0.0').contains(MDAnalysis.__version__), \
    f'MDAnalysis version {MDAnalysis.__version__} is not supported'

from MDAnalysis.lib import util
from MDAnalysis.coordinates.core import triclinic_box
from MDAnalysis.coordinates.core import triclinic_vectors
from MDAnalysis.lib.util import NamedStream
from MDAnalysis.topology.base import TopologyReaderBase
from MDAnalysis.coordinates import base
from MDAnalysis.coordinates.base import SingleFrameReaderBase
from MDAnalysis.core.topology import Topology

from MDAnalysis.core.topologyattrs import (
    Atomnames, Atomids, Atomtypes, Masses,
    Resids, Resnums, Segids, Resnames, AltLocs,
    ICodes, Occupancies, Tempfactors, Charges, Bonds, Angles, Dihedrals
)


[docs]class Stream: """ Create an object that provides a MDAnalysis topology and a coordinate reader >>> eos = espressomd.MDA_ESP.Stream(system) >>> u = mda.Universe(eos.topology, eos.trajectory) Parameters ---------- system : :obj:`espressomd.system.System` """ def __init__(self, system): self.topology = ESPParser(None, espresso=system).parse() self.system = system @property def trajectory(self): """ Particles' coordinates at the current time Returns ------- stream : :class:`MDAnalysis.lib.util.NamedStream` A stream in the format that can be parsed by :class:`ESPReader` """ # time _xyz = str(self.system.time) + '\n' # number of particles _xyz += str(len(self.system.part)) + '\n' # box edges _xyz += str(self.system.box_l) + '\n' # configuration for _p in self.system.part: _xyz += str(_p.pos) + '\n' for _p in self.system.part: _xyz += str(_p.v) + '\n' for _p in self.system.part: _xyz += str(_p.f) + '\n' return NamedStream(StringIO(_xyz), "__.ESP")
[docs]class ESPParser(TopologyReaderBase): """ An MDAnalysis reader of ESPResSo's topology. """ format = 'ESP' def __init__(self, filename, **kwargs): # pylint: disable=unused-argument self.kwargs = kwargs
[docs] def parse(self): """ Access ESPResSo data and return the topology object. Returns ------- top : :class:`MDAnalysis.core.topology.Topology` a topology object """ espresso = self.kwargs['espresso'] names = [] atomtypes = [] masses = [] charges = [] bonds = [] angles = [] dihedrals = [] for p in espresso.part: names.append("A" + repr(p.type)) atomtypes.append("T" + repr(p.type)) masses.append(p.mass) charges.append(p.q) for bond in p.bonds: partner_ids = bond[1:] n_partner = len(partner_ids) if n_partner == 1: bonds.append((p.id, partner_ids[0])) elif n_partner == 2: angles.append((partner_ids[0], p.id, partner_ids[1])) elif n_partner == 3: dihedrals.append( (partner_ids[0], p.id, partner_ids[1], partner_ids[2])) else: continue natoms = len(espresso.part) attrs = [Atomnames(np.array(names, dtype=object)), Atomids(np.arange(natoms) + 1), Atomtypes(np.array(atomtypes, dtype=object)), Masses(masses), Resids(np.array([1])), Resnums(np.array([1])), Segids(np.array(['System'], dtype=object)), AltLocs(np.array([' '] * natoms, dtype=object)), Resnames(np.array(['R'], dtype=object)), Occupancies(np.zeros(natoms)), Tempfactors(np.zeros(natoms)), ICodes(np.array([' '], dtype=object)), Charges(np.array(charges)), Bonds(bonds), Angles(angles), Dihedrals(dihedrals) ] top = Topology(natoms, 1, 1, attrs=attrs) return top
[docs]class Timestep(base.Timestep): _ts_order_x = [0, 3, 4] _ts_order_y = [5, 1, 6] _ts_order_z = [7, 8, 2] def _init_unitcell(self): return np.zeros(9, dtype=np.float32) @property def dimensions(self): # This information now stored as _ts_order_x/y/z to keep DRY x = self._unitcell[self._ts_order_x] y = self._unitcell[self._ts_order_y] z = self._unitcell[self._ts_order_z] # this ordering is correct! (checked it, OB) return triclinic_box(x, y, z) @dimensions.setter def dimensions(self, box): x, y, _ = triclinic_vectors(box) np.put(self._unitcell, self._ts_order_x, x) np.put(self._unitcell, self._ts_order_y, y)
[docs]class ESPReader(SingleFrameReaderBase): """ An MDAnalysis single frame reader for the stream provided by :class:`Stream`. """ format = 'ESP' units = {'time': None, 'length': 'nm', 'velocity': 'nm/ps'} _Timestep = Timestep def _read_first_frame(self): with util.openany(self.filename, 'rt') as espfile: n_atoms = 1 for pos, line in enumerate(espfile, start=-3): if pos == -3: time = float(line[1:-1]) elif pos == -2: n_atoms = int(line) self.n_atoms = n_atoms positions = np.zeros( self.n_atoms * 3, dtype=np.float32).reshape(self.n_atoms, 3) velocities = np.zeros( self.n_atoms * 3, dtype=np.float32).reshape(self.n_atoms, 3) forces = np.zeros( self.n_atoms * 3, dtype=np.float32).reshape(self.n_atoms, 3) self.ts = ts = self._Timestep( self.n_atoms, **self._ts_kwargs) self.ts.time = time elif pos == -1: self.ts._unitcell[:3] = np.array( list(map(float, line[1:-2].split()))) elif pos < n_atoms: positions[pos] = np.array( list(map(float, line[1:-2].split()))) elif pos < 2 * n_atoms: velocities[pos - n_atoms] = np.array( list(map(float, line[1:-2].split()))) else: forces[pos - 2 * n_atoms] = np.array( list(map(float, line[1:-2].split()))) ts.positions = np.copy(positions) ts.velocities = np.copy(velocities) ts.forces = np.copy(forces)