#!/usr/bin/env python
import array
import os
import struct
from scm.plams.core.errors import PlamsError
import numpy
from scm.plams.mol.molecule import Molecule
from scm.plams.trajectories.trajectoryfile import TrajectoryFile
__all__ = ["DCDTrajectoryFile"]
[docs]class DCDTrajectoryFile(TrajectoryFile):
"""
Class representing a DCD file containing a molecular trajectory
An instance of this class has the following attributes:
* ``file_object`` -- A Python :py:class:`file` object, referring to the actual DCD file
* ``position`` -- The frame to which the cursor is currently pointing in the DCD file
* ``mode`` -- Designates whether the file is in read or write mode ('rb' or 'wb')
* ``ntap`` -- The number of atoms in the molecular system (needs to be constant throughout)
An |DCDTrajectoryFile| object behaves very similar to a regular file object.
It has read and write methods (:meth:`read_next` and :meth:`write_next`)
that read and write from/to the position of the cursor in the ``file_object`` attribute.
If the file is in read mode, an additional method :meth:`read_frame` can be used that moves
the cursor to any frame in the file and reads from there.
The amount of information stored in memory is kept to a minimum, as only information from the current frame
is ever stored.
Reading and writing to and from the files can be done as follows::
>>> from scm.plams import DCDTrajectoryFile
>>> dcd = DCDTrajectoryFile('old.dcd')
>>> mol = dcd.get_plamsmol()
>>> dcdout = DCDTrajectoryFile('new.dcd',mode='wb',ntap=dcd.ntap)
>>> for i in range(dcd.get_length()) :
>>> crd,cell = dcd.read_frame(i,molecule=mol)
>>> dcdout.write_next(molecule=mol)
The above script reads information from the DCD file old.dcd into the |Molecule| object ``mol``
in a step-by-step manner.
The |Molecule| object is then passed to the :meth:`write_next` method of the new |DCDTrajectoryFile|
object corresponding to the new dcd file ``new.dcd``.
The exact same result can also be achieved by iterating over the instance as a callable
>>> dcd = DCDTrajectoryFile('old.dcd')
>>> mol = dcd.get_plamsmol()
>>> dcdout = DCDTrajectoryFile('new.dcd',mode='wb',ntap=dcd.ntap)
>>> for crd,cell in dcd(mol) :
>>> dcdout.write_next(molecule=mol)
This procedure requires all coordinate information to be passed to and from the |Molecule| object
for each frame, which can be time-consuming.
It is therefore also possible to bypass the |Molecule| object when reading through the frames::
>>> dcd = DCDTrajectoryFile('old.dcd')
>>> dcdout = DCDTrajectoryFile('new.dcd',mode='wb',ntap=dcd.ntap)
>>> for crd,cell in dcd :
>>> dcdout.write_next(coords=crd,cell=cell)
"""
[docs] def __init__(self, filename=None, mode="rb", fileobject=None, ntap=None):
"""
Initiates a DCDTrajectoryFile object
* ``filename`` -- The path to the DCD file
* ``mode`` -- The mode in which to open the DCD file ('rb' or 'wb')
* ``fileobject`` -- Optionally, a file object can be passed instead (filename needs to be set to None)
* ``ntap`` -- If the file is in write mode, the number of atoms needs to be passed here
"""
TrajectoryFile.__init__(self, filename, mode, fileobject, ntap)
# DCD specific attributes
self.celldata = False # Whether the cell data is in the file (only set in read-mode)
self.byte_order = "@" # native endian
self.timesteps = (0, 0, 0) # (nsteps,startstep,steps_between_saves)
self.delta = 2.0 # The timestep (if stored)
self.namnf = 0 # The number of free atoms (if stored)
self.stepsize = 0 # The number of bytes used to store a single conformation
self.intsize = struct.calcsize("i") # The number of bytes in an integer
self.floatsize = struct.calcsize("f") # The number of bytes in a float
# Skip to the trajectory part of the file
if self.mode == "rb":
self._read_header()
elif self.mode == "ab":
self._move_cursor_to_append_pos()
# elif self.mode == 'wb' :
# self._write_header()
[docs] def set_byteorder(self, byteorder):
"""
Specifies wether the file to be read from or written to is little endian or big endian
* ``byteorder`` -- Can be either '@', '<', or '>', denoting native endian, little endian, or big endian
"""
self.byte_order = byteorder
def _read_variable(self, size):
"""
Reads a variable from a binary file
:note::
Assumes that the variable has been written on a machine
with the same bitsizes for the data
"""
if size == "i":
varsize = self.intsize
if size == "f":
varsize = self.floatsize
else:
varsize = struct.calcsize(size)
var_char = self.file_object.read(varsize)
fmt = self.byte_order + size
var = struct.unpack(fmt, var_char)[0]
return var
def _write_variable(self, var, size):
"""
Writes a variable to a binary file
"""
var_char = struct.pack(size, var)
self.file_object.write(var_char)
def _read_header(self):
"""
Reads in the header information in the binary DCD file
"""
# This should be an int32
input_integer = self._read_variable("i")
if input_integer != 84:
print(input_integer)
print("BAD DCD FORMAT")
raise PlamsError
# Read CORD string
car4 = self.file_object.read(4).decode()
if car4 != "CORD":
print("BAD DCD FORMAT")
raise PlamsError
# Here we read the data of the timesteps
# NSET: The number of sets of coordinates
# ISTART: The starting timestep
# NSAVC: The number of timesteps between dcd saves
timesteps = array.array("i")
timesteps.fromfile(self.file_object, 3)
self.timesteps = tuple(timesteps)
# print 'timesteps: ',self.timesteps
# 5 blanc integers
dummies = array.array("i")
dummies.fromfile(self.file_object, 5)
# NAMNF: The number of free atoms
self.namnf = self._read_variable("i")
# DELTA: The timestep
self._read_variable("f")
# 10 blanc integers
dummies = array.array("i")
dummies.fromfile(self.file_object, 10)
# The end size of the first block
input_integer = self._read_variable("i")
if input_integer != 84:
print("BAD DCD FORMAT")
raise PlamsError
############################################################
# The size of the next block
input_integer = self._read_variable("i")
if (input_integer - 4) % 80 == 0:
# NTITLE: The number of 80 char title strings
ntitle = self._read_variable("i")
for _ in range(ntitle):
self.file_object.read(80).decode()
# The end size of this block
input_integer = self._read_variable("i")
else:
print("BAD DCD FORMAT")
raise PlamsError
############################################################
# This should be an int32
input_integer = self._read_variable("i")
if input_integer != 4:
print("BAD DCD FORMAT")
raise PlamsError
# Read in the number of atoms
self.ntap = self._read_variable("i")
# print 'ntap: ',self.ntap
if self.coords.shape == (0, 3):
self.coords = numpy.zeros((self.ntap, 3))
# This should be an int32
input_integer = self._read_variable("i")
if input_integer != 4:
print("BAD DCD FORMAT")
raise PlamsError
if self.namnf != 0:
# This should be an int32
input_integer = self._read_variable("i")
if input_integer != (self.ntap - self.namnf) * 4:
print("BAD DCD FORMAT")
raise PlamsError
# This should be an int32
dummies = array.array("i")
dummies.fromfile(self.file_object, (self.ntap - self.namnf))
# print 'list',list(dummies)
# This should be an int32
input_integer = self._read_variable("i")
if input_integer != (self.ntap - self.namnf) * 4:
print("BAD DCD FORMAT")
raise PlamsError
self.stepsize += 3 * self.ntap * self.floatsize
self.stepsize += 5 * self.intsize
if len(self.elements) != self.ntap:
self.elements = ["H"] * self.ntap
[docs] def read_next(self, molecule=None, read=True):
"""
Reads coordinates and lattice info from the current position of the cursor and returns it
* ``molecule`` -- |Molecule| object in which the new data needs to be stored
* ``read`` -- If set to False the cursor will move to the next frame without reading
"""
# Check for end of file
# This should be an int32
input_integer_char = self.file_object.read(self.intsize)
if len(input_integer_char) > 0:
if self.firsttime:
self.celldata = False
input_integer = struct.unpack(self.byte_order + "i", input_integer_char)[0]
if input_integer == 48:
self.celldata = True
else:
return None, None
if not read and not self.firsttime:
return self._move_cursor_without_reading()
# Read cell data
cell = numpy.zeros((3, 3))
if self.celldata:
cell_array = array.array("d")
try:
cell_array.fromfile(self.file_object, 6)
if self.byte_order != "@":
cell_array.byteswap()
except EOFError:
return None, None
cell[numpy.tril_indices(3)] = cell_array
self.file_object.read(self.floatsize)
# Coords
if self.firsttime:
input_integer = self._read_variable("i")
else:
self.file_object.read(self.intsize)
if self.firsttime:
if input_integer != 4 * self.ntap:
print("BAD DCD FORMAT")
raise PlamsError
xcoords = array.array("f")
try:
xcoords.fromfile(self.file_object, self.ntap)
# xcoords = numpy.fromfile(self.file,dtype='f',count=self.ntap)
if self.byte_order != "@":
xcoords.byteswap()
except EOFError:
return None, None
for i in range(2):
if self.firsttime:
input_integer = self._read_variable("i")
if input_integer != 4 * self.ntap:
print("BAD DCD FORMAT")
raise PlamsError
else:
self.file_object.read(self.intsize)
ycoords = array.array("f")
try:
ycoords.fromfile(self.file_object, self.ntap)
if self.byte_order != "@":
ycoords.byteswap()
except EOFError:
return None, None
for i in range(2):
if self.firsttime:
input_integer = self._read_variable("i")
if input_integer != 4 * self.ntap:
print("BAD DCD FORMAT")
raise PlamsError
else:
self.file_object.read(self.intsize)
zcoords = array.array("f")
try:
zcoords.fromfile(self.file_object, self.ntap)
if self.byte_order != "@":
zcoords.byteswap()
except EOFError:
return None, None
if self.firsttime:
input_integer = self._read_variable("i")
if input_integer != 4 * self.ntap:
print("BAD DCD FORMAT")
raise PlamsError
else:
self.file_object.read(self.intsize)
# Check that self.coords has not been changed
if not self.coords.shape == (self.ntap, 3):
raise PlamsError("The coordinate array has been changed outside the class")
self.coords[:, 0] = xcoords
self.coords[:, 1] = ycoords
self.coords[:, 2] = zcoords
if self.firsttime:
if self.celldata:
self.stepsize += 6 * struct.calcsize("d")
self.stepsize += self.floatsize
self.stepsize += self.intsize
self.firsttime = False
if isinstance(molecule, Molecule):
self._set_plamsmol(self.coords, cell, molecule)
self.position += 1
return self.coords, cell
def _is_endoffile(self):
"""
If the end of file is reached, return coords and cell as None
"""
test = self.file_object.read(self.stepsize)
return len(test) < self.stepsize
def _write_header(self, ntap=0):
"""
Write the header of the DCD file
"""
self.ntap = ntap
self.coords = numpy.zeros((self.ntap, 3))
# This should be an int32
self._write_variable(84, "i")
# Read CORD string
self.file_object.write("CORD".encode())
# Here we read the data of the timesteps
# NSET: The number of sets of coordinates
# ISTART: The starting timestep
# NSAVC: The number of timesteps between dcd saves
timesteps = array.array("i", [0, 1, 1])
self.file_object.write(timesteps)
# 5 blanc integers
dummies = array.array("i", [0, 0, 0, 0, 0])
self.file_object.write(dummies)
# NAMNF: The number of free atoms
self._write_variable(0, "i")
# DELTA: The timestep
self._write_variable(self.delta, "f")
# 10 blanc integers
dummies = array.array("i")
dummies.append(1)
for i in range(8):
dummies.append(0)
dummies.append(24)
self.file_object.write(dummies)
# The end size of the first block
self._write_variable(84, "i")
############################################################
# The size of the next block
self._write_variable(84, "i")
ntitle = 1
self._write_variable(ntitle, "i")
title = "REMARK Created with py_md"
nlet = len(title)
for i in range(80 - nlet):
title += " "
self.file_object.write(title.encode())
self._write_variable(84, "i")
############################################################
# This should be an int32
self._write_variable(4, "i")
# Read in the number of atoms
self._write_variable(self.ntap, "i")
# This should be an int32
self._write_variable(4, "i")
[docs] def write_next(self, coords=None, molecule=None, cell=[0.0, 0.0, 0.0], conect=None):
"""
Write frame to next position in trajectory file
* ``coords`` -- A list or numpy array of (``ntap``,3) containing the system coordinates
* ``molecule`` -- A molecule object to read the molecular data from
* ``cell`` -- A set of lattice vectors or cell diameters
* ``conect`` -- A dictionary containing connectivity info (not used)
.. note::
Either ``coords`` or ``molecule`` are mandatory arguments
"""
if isinstance(molecule, Molecule):
coords, cell = self._read_plamsmol(molecule)[:2]
cell = self._convert_cell(cell)
# If this is the first step, write the header
if self.position == 0:
self._write_header(len(coords))
if self.ntap != len(coords):
print("BAD coordinate list")
return
# Invert the coords list [xcoords,ycoords,zcoords]
# coords = numpy.array(coords).transpose()
# This should be an int32
self._write_variable(48, "i")
cell = array.array("d", cell)
self.file_object.write(cell)
self._write_variable(0, "f")
# Coords
self._write_variable(4 * self.ntap, "i")
xcoords = array.array("f", coords[:, 0])
self.file_object.write(xcoords)
for i in range(2):
self._write_variable(4 * self.ntap, "i")
ycoords = array.array("f", coords[:, 1])
self.file_object.write(ycoords)
for i in range(2):
self._write_variable(4 * self.ntap, "i")
zcoords = array.array("f", coords[:, 2])
self.file_object.write(zcoords)
self._write_variable(4 * self.ntap, "i")
self.position += 1
def _convert_cell(self, cell):
"""
Check format of cell, and convert to required lower triangle
"""
if cell is None:
cell = numpy.zeros((3, 3))[numpy.tril_indices(3)]
elif len(cell) == 3 and isinstance(cell[0], float) or isinstance(cell[0], numpy.float64):
cell = [cell[0], 0.0, cell[1], 0.0, 0.0, cell[2]]
elif len(cell) < 3:
newcell = numpy.zeros((3, 3))
newcell[: len(cell)] = cell
cell = newcell[numpy.tril_indices(3)]
else:
cell = numpy.array(cell)
cell = cell[numpy.tril_indices(3)]
return cell
def _rewind_to_first_frame(self):
"""
Rewind the file to the first frame
"""
self.file_object.seek(0)
self.firsttime = True
self.position = 0
self.stepsize = 0
self._read_header()
def _rewind_n_frames(self, nframes):
"""
Rewind the file by nframes frames
"""
whence = os.SEEK_CUR
stepsize = self.stepsize + self.intsize
self.file_object.seek(-nframes * stepsize, whence)
self.position = self.position - nframes