import os
import numpy as np
from os.path import join as opj
from ...core.basejob import SingleJob
from ...core.errors import FileError, JobError, ResultsError, PTError
from ...core.functions import config, log, parse_heredoc
from ...core.private import sha256, UpdateSysPath
from ...core.results import Results
from ...core.settings import Settings
from ...mol.molecule import Molecule
from ...mol.atom import Atom
from ...tools.kftools import KFFile
from ...tools.units import Units
__all__ = ['AMSJob', 'AMSResults']
[docs]class AMSResults(Results):
"""A specialized |Results| subclass for accessing the results of |AMSJob|."""
def __init__(self, *args, **kwargs):
Results.__init__(self, *args, **kwargs)
self.rkfs = {}
[docs] def collect(self):
"""Collect files present in the job folder. Use parent method from |Results|, then create an instance of |KFFile| for each ``.rkf`` file present in the job folder. Collect these files in ``rkfs`` dictionary, with keys being filenames without ``.rkf`` extension.
The information about ``.rkf`` files generated by engines is taken from the main ``ams.rkf`` file.
This method is called automatically during the final part of the job execution and there is no need to call it manually.
"""
Results.collect(self)
rkfname = 'ams.rkf'
if rkfname in self.files:
main = KFFile(opj(self.job.path, rkfname))
n = main[('EngineResults','nEntries')]
for i in range(1, n+1):
title = main[('EngineResults','Title({})'.format(i))]
files = main[('EngineResults','Files({})'.format(i))].split('\x00')
if files[0].endswith('.rkf'):
key = files[0][:-4]
self.rkfs[key] = KFFile(opj(self.job.path, files[0]))
self.rkfs['ams'] = main
else:
log('WARNING: Main KF file {} not present in {}'.format(rkfname, self.job.path), 1)
[docs] def refresh(self):
"""Refresh the contents of ``files`` list.
Use the parent method from |Results|, then look at |KFFile| instances present in ``rkfs`` dictionary and check if they point to existing files. If not, try to reinstantiate them with current job path (that can happen while loading a pickled job after the entire job folder was moved).
"""
Results.refresh(self)
to_remove = []
for key,val in self.rkfs.items():
if not os.path.isfile(val.path):
if os.path.dirname(val.path) != self.job.path:
guessnewpath = opj(self.job.path, os.path.basename(val.path))
if os.path.isfile(guessnewpath):
self.rkfs[key] = KFFile(guessnewpath)
else:
to_remove.append(key)
else:
to_remove.append(key)
for i in to_remove:
del self.rkfs[i]
[docs] def engine_names(self):
"""Return a list of all names of engine specific ``.rkf`` files. The identifier of the main result file (``'ams'``) is not present in the returned list, only engine specific names are listed.
"""
self.refresh()
ret = list(self.rkfs.keys())
ret.remove('ams')
return ret
[docs] def rkfpath(self, file='ams'):
"""Return the absolute path of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
"""
return self._access_rkf(lambda x: x.path, file)
[docs] def readrkf(self, section, variable, file='ams'):
"""Read data from *section*/*variable* of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
The type of the returned value depends on the type of *variable* defined inside KF file. It can be: single int, list of ints, single float, list of floats, single boolean, list of booleans or string.
.. note::
If arguments *section* or *variable* are incorrect (not present in the chosen file), the returned value is ``None``. Please mind the fact that KF files are case sensitive.
"""
return self._access_rkf(lambda x: x.read(section, variable), file)
[docs] def read_rkf_section(self, section, file='ams'):
"""Return a dictionary with all variables from a given *section* of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
.. note::
If *section* is not present in the chosen file, the returned value is an empty dictionary. Please mind the fact that KF files are case sensitive.
"""
return self._access_rkf(lambda x: x.read_section(section), file)
[docs] def get_rkf_skeleton(self, file='ams'):
"""Return a dictionary with the structure of a chosen ``.rkf`` file. Each key corresponds to a section name with the value being a set of variable names present in that section.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
"""
return self._access_rkf(lambda x: x.get_skeleton(), file)
[docs] def get_molecule(self, section, file='ams'):
"""Return a |Molecule| instance stored in a given *section* of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
All data used by this method is taken from the chosen ``.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored.
"""
sectiondict = self.read_rkf_section(section, file)
if sectiondict:
return Molecule._mol_from_rkf_section(sectiondict)
[docs] def get_main_molecule(self):
"""Return a |Molecule| instance with the final coordinates.
All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored.
"""
return self.get_molecule('Molecule', 'ams')
[docs] def get_history_molecule(self, step):
"""Return a |Molecule| instance with coordinates taken from a particular *step* in the ``History`` section of ``ams.rkf`` file.
All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored.
"""
if 'ams' in self.rkfs:
main = self.rkfs['ams']
if 'History' not in main:
raise KeyError("'History' section not present in {}".format(main.path))
n = main.read('History', 'nEntries')
if step > n:
raise KeyError("Step {} not present in 'History' section of {}".format(step, main.path))
coords = main.read('History', f'Coords({step})')
coords = [coords[i:i+3] for i in range(0,len(coords),3)]
if ('History', f'SystemVersion({step})') in main:
version = main.read('History', f'SystemVersion({step})')
if 'SystemVersionHistory' in main:
if ('SystemVersionHistory', 'blockSize') in main:
blockSize = main.read('SystemVersionHistory', 'blockSize')
else:
blockSize = 1
block = (version - 1) // blockSize + 1
offset = (version - 1) % blockSize
system = main.read('SystemVersionHistory', f'SectionNum({block})', return_as_list=True)[offset]
else:
system = version
mol = self.get_molecule(f'ChemicalSystem({system})')
molsrc = f'ChemicalSystem({system})'
else:
mol = self.get_main_molecule()
molsrc = 'Molecule'
if len(mol) != len(coords):
raise ResultsError(f'Coordinates taken from "History%Coords({step})" have incompatible length with molecule from {molsrc} section')
for at, c in zip(mol, coords):
at.move_to(c, unit='bohr')
if ('History', f'LatticeVectors('+str(step)+')') in main:
lattice = Units.convert(main.read('History', f'LatticeVectors('+str(step)+')'), 'bohr', 'angstrom')
mol.lattice = [tuple(lattice[j:j+3]) for j in range(0,len(lattice),3)]
if all(('History', i) in main for i in [f'Bonds.Index({step})', f'Bonds.Atoms({step})', f'Bonds.Orders({step})']):
mol.bonds = []
index = main.read('History', f'Bonds.Index({step})')
atoms = main.read('History', f'Bonds.Atoms({step})')
orders = main.read('History', f'Bonds.Orders({step})')
for i in range(len(index)-1):
for j in range(index[i], index[i+1]):
mol.add_bond(mol[i+1], mol[atoms[j-1]], orders[j-1])
return mol
[docs] def get_history_variables(self, history_section='History') :
""" Return a set of keynames stored in the specified history section of the ``ams.rkf`` file.
The *history_section argument should be a string representing the name of the history section (``History`` or ``MDHistory``)*"""
if not 'ams' in self.rkfs: return
main = self.rkfs['ams']
keylist = [var for sec,var in main if sec==history_section]
# Now throw out all the last parts
return set([key.split('(')[0] for key in keylist if len(key.split('('))>1])
[docs] def get_history_property(self, varname, history_section='History') :
""" Return the values of *varname* in the history section *history_section*."""
if not 'ams' in self.rkfs: return
main = self.rkfs['ams']
nentries = main.read(history_section,'nEntries')
as_block = self._values_stored_as_blocks(main, varname, history_section)
if as_block :
nblocks = main.read(history_section,'nBlocks')
values = [main.read(history_section,f"{varname}({iblock})",return_as_list=True) for iblock in range(1,nblocks+1)]
values = [val for blockvals in values for val in blockvals if isinstance(blockvals,list)]
else :
values = [main.read(history_section,f"{varname}({step})") for step in range(1,nentries+1)]
return values
[docs] def get_property_at_step(self, step, varname, history_section='History') :
""" Return the value of *varname* in the history section *history_section at step *step*."""
if not 'ams' in self.rkfs: return
main = self.rkfs['ams']
as_block = self._values_stored_as_blocks(main, varname, history_section)
if as_block :
import numpy
blocksize = main.read(history_section,'blockSize')
iblock = int(numpy.ceil(step/blocksize))
value = main.read(history_section,f"{varname}({iblock})")[(step%blocksize)-1]
else :
value = main.read(history_section,f"{varname}({step})")
return value
def _values_stored_as_blocks(self, main, varname, history_section) :
"""Determines wether the values of varname in a trajectory rkf file are stored in blocks"""
nentries = main.read(history_section,'nEntries')
as_block = False
keylist = [var for sec,var in main if sec==history_section]
if 'nBlocks' in keylist :
if not f"{varname}({nentries})" in keylist :
as_block = True
return as_block
[docs] def get_engine_results(self, engine=None):
"""Return a dictionary with contents of ``AMSResults`` section from an engine results ``.rkf`` file.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(lambda x: x.read_section('AMSResults'), engine)
[docs] def get_engine_properties(self, engine=None):
"""Return a dictionary with all the entries from ``Properties`` section from an engine results ``.rkf`` file.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
def properties(kf):
n = kf.read('Properties', 'nEntries')
ret = {}
for i in range(1, n+1):
tp = kf.read('Properties', 'Type({})'.format(i)).strip()
stp = kf.read('Properties', 'Subtype({})'.format(i)).strip()
val = kf.read('Properties', 'Value({})'.format(i))
key = stp if stp.endswith(tp) else ('{} {}'.format(stp, tp) if stp else tp)
ret[key] = val
return ret
return self._process_engine_results(properties, engine)
[docs] def get_energy(self, unit='au', engine=None):
"""Return final energy, expressed in *unit*.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(lambda x: x.read('AMSResults', 'Energy'), engine) * Units.conversion_ratio('au', unit)
[docs] def get_gradients(self, energy_unit='au', dist_unit='au', engine=None):
"""Return the gradients of the final energy, expressed in *energy_unit* / *dist_unit*.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Gradients'), engine)).reshape(-1,3) * Units.conversion_ratio('au', energy_unit) / Units.conversion_ratio('au', dist_unit)
[docs] def get_stresstensor(self, engine=None):
"""Return the final stress tensor, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'StressTensor'), engine)).reshape(len(self.get_input_molecule().lattice),-1)
[docs] def get_hessian(self, engine=None):
"""Return the Hessian matrix, i.e. the second derivative of the total energy with respect to the nuclear coordinates, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Hessian'), engine)).reshape(3*len(self.get_input_molecule()),-1)
[docs] def get_elastictensor(self, engine=None):
"""Return the elastic tensor, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
et_flat = np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'ElasticTensor'), engine))
num_latvec = len(self.get_input_molecule().lattice)
if num_latvec == 1:
return et_flat.reshape(1,1)
elif num_latvec == 2:
return et_flat.reshape(3,3)
else:
return et_flat.reshape(6,6)
[docs] def get_frequencies(self, unit='cm^-1', engine=None):
"""Return a numpy array of vibrational frequencies, expressed in *unit*.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
freqs = self._process_engine_results(lambda x: x.read('Vibrations', 'Frequencies[cm-1]'), engine)
freqs = np.array(freqs) if isinstance(freqs,list) else np.array([freqs])
return freqs * Units.conversion_ratio('cm^-1', unit)
[docs] def get_charges(self, engine=None):
"""Return the atomic charges, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Charges'), engine))
[docs] def get_dipolemoment(self, engine=None):
"""Return the electric dipole moment, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'DipoleMoment'), engine))
[docs] def get_dipolegradients(self, engine=None):
"""Return the nuclear gradients of the electric dipole moment, expressed in atomic units. This is a (3*numAtoms x 3) matrix.
"""
return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'DipoleGradients'), engine)).reshape(-1,3)
[docs] def get_timings(self):
"""Return a dictionary with timing statistics of the job execution. Returned dictionary contains keys cpu, system and elapsed. The values are corresponding timings, expressed in seconds (Jim Boelrijk).
"""
ret = {}
cpu = self.grep_output('Total cpu time:')
system = self.grep_output('Total system time:')
elapsed = self.grep_output('Total elapsed time:')
ret['elapsed'] = float(elapsed[0].split()[-1])
ret['system'] = float(system[0].split()[-1])
ret['cpu'] = float(cpu[0].split()[-1])
return ret
[docs] def recreate_molecule(self):
"""Recreate the input molecule for the corresponding job based on files present in the job folder. This method is used by |load_external|.
If ``ams.rkf`` is present in the job folder, extract data from the ``InputMolecule`` section.
"""
if 'ams' in self.rkfs:
return self.get_input_molecule()
return None
[docs] def recreate_settings(self):
"""Recreate the input |Settings| instance for the corresponding job based on files present in the job folder. This method is used by |load_external|.
If ``ams.rkf`` is present in the job folder, extract user input and parse it back to a |Settings| instance using ``scm.input_parser`` module. Remove the ``system`` branch from that instance.
"""
if 'ams' in self.rkfs:
user_input = self.readrkf('General', 'user input')
try:
from scm.input_parser import InputParser
with InputParser() as parser:
inp = parser.to_settings('ams', user_input)
except:
log('Failed to recreate input settings from {}'.format(self.rkfs['ams'].path, 5))
return None
s = Settings()
s.input = inp
del s.input.ams.system
s.soft_update(config.job)
return s
return None
[docs] def ok(self):
"""Check if the execution of the associated :attr:`job` was successful or not.
See :meth:`Job.ok<scm.plams.core.basejob.Job.ok>` for more information."""
return self.job.ok()
[docs] def get_errormsg(self):
"""Tries to get an an error message for a associated :attr:`job`. This method returns ``None`` if the associated job was successful.
See :meth:`Job.get_errormsg<scm.plams.core.basejob.Job.errormsg>` for more information."""
return self.job.get_errormsg()
@property
def name(self):
"""Retrun the :attr:`job.name` of the job associated with this results instance."""
return self.job.name
#=========================================================================
def _access_rkf(self, func, file='ams'):
"""A skeleton method for accessing any of the ``.rkf`` files produced by AMS.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
The *func* argument has to be a function to call on a chosen ``.rkf`` file. It should take one argument, an instance of |KFFile|.
"""
#Try unique engine:
if file == 'engine':
names = self.engine_names()
if len(names) == 1:
return func(self.rkfs[names[0]])
else:
raise ValueError("You cannot use 'engine' as 'file' argument if the engine results file is not unique. Please use the real name of the file you wish to read")
#Try:
if file in self.rkfs:
return func(self.rkfs[file])
#Try harder:
filename = file + '.rkf'
self.refresh()
if filename in self.files:
self.rkfs[file] = KFFile(opj(self.job.path, filename))
return func(self.rkfs[file])
#Surrender:
raise FileError('File {} not present in {}'.format(filename, self.job.path))
def _process_engine_results(self, func, engine=None):
"""A generic method skeleton for processing any engine results ``.rkf`` file. *func* should be a function that takes one argument (an instance of |KFFile|) and returns arbitrary data.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
names = self.engine_names()
if engine is not None:
if engine in names:
return func(self.rkfs[engine])
else:
raise FileError('File {}.rkf not present in {}'.format(engine, self.job.path))
else:
if len(names) == 1:
return func(self.rkfs[names[0]])
elif len(names) == 0:
raise FileError('There is no engine .rkf present in {}'.format(self.job.path))
else:
raise ValueError("You need to specify the 'engine' argument when there are multiple engine result files present in the job folder")
#===========================================================================
#===========================================================================
#===========================================================================
[docs]class AMSJob(SingleJob):
"""A class representing a single computation with AMS driver. The corresponding results type is |AMSResults|.
"""
_result_type = AMSResults
_command = 'ams'
[docs] def get_runscript(self):
"""Generate the runscript. Returned string is of the form::
unset AMS_SWITCH_LOGFILE_AND_STDOUT
AMS_JOBNAME=jobname AMS_RESULTSDIR=. $AMSBIN/ams [-n nproc] <jobname.in [>jobname.out]
``-n`` flag is added if ``settings.runscript.nproc`` exists. ``[>jobname.out]`` is used based on ``settings.runscript.stdout_redirect``. If ``settings.runscript.preamble_lines`` exists, those lines will be added to the runscript verbatim before the execution of AMS.
"""
ret = 'unset AMS_SWITCH_LOGFILE_AND_STDOUT\n'
if 'preamble_lines' in self.settings.runscript:
for line in self.settings.runscript.preamble_lines:
ret += f'{line}\n'
ret += 'AMS_JOBNAME="{}" AMS_RESULTSDIR=. $AMSBIN/ams'.format(self.name)
if 'nproc' in self.settings.runscript:
ret += ' -n {}'.format(self.settings.runscript.nproc)
ret += ' <"{}"'.format(self._filename('inp'))
if self.settings.runscript.stdout_redirect:
ret += ' >"{}"'.format(self._filename('out'))
ret += '\n\n'
return AMSJob._slurm_env(self.settings) + ret
[docs] def check(self):
"""Check if ``termination status`` variable from ``General`` section of main KF file equals ``NORMAL TERMINATION``."""
try:
status = self.results.readrkf('General', 'termination status')
except:
return False
if 'NORMAL TERMINATION' in status:
if 'errors' in status:
log('Job {} reported errors. Please check the the output'.format(self._full_name()), 1)
return False
if 'warnings' in status:
log('Job {} reported warnings. Please check the the output'.format(self._full_name()), 1)
return True
return False
[docs] def get_errormsg(self):
"""Tries to get an an error message for a failed job. This method returns ``None`` for successful jobs."""
if self.ok():
return None
else:
# Something went wrong. The first place to check is the termination status on the ams.rkf.
# If the AMS driver stopped with a known error (called StopIt in the Fortran code), the error will be in there.
try:
msg = self.results.readrkf('General','termination status')
if msg == 'NORMAL TERMINATION with errors':
# Apparently this wasn't a hard stop in the middle of the job.
# Let's look for the last error in the logfile ...
msg = self.results.grep_file('ams.log', 'ERROR: ')[-1].partition('ERROR: ')[2]
except:
msg = 'Could not determine error message. Please check the output manually.'
return msg
#=========================================================================
def _serialize_input(self, special):
"""Transform the contents of ``settings.input`` branch into string with blocks, keys and values.
First, the contents of ``settings.input`` are extended with entries returned by :meth:`_serialize_molecule`. Then the contents of ``settings.input.ams`` are used to generate AMS text input. Finally, every other (than ``ams``) entry in ``settings.input`` is used to generate engine specific input.
Special values can be indicated with *special* argument, which should be a dictionary having types of objects as keys and functions translating these types to strings as values.
"""
def unspec(value):
"""Check if *value* is one of a special types and convert it to string if it is."""
for spec_type in special:
if isinstance(value, spec_type):
return special[spec_type](value)
return value
def serialize(key, value, indent, end='End'):
"""Given a *key* and its corresponding *value* from the |Settings| instance produce a snippet of the input file representing this pair.
If the value is a nested |Settings| instance, use recursive calls to build the snippet for the entire block. Indent the result with *indent* spaces.
"""
ret = ''
if isinstance(value, Settings):
ret += ' '*indent + key
if '_h' in value:
ret += ' ' + unspec(value['_h'])
ret += '\n'
i = 1
while ('_'+str(i)) in value:
ret += serialize('', value['_'+str(i)], indent+2)
i += 1
for el in value:
if not el.startswith('_'):
if key.lower().startswith('engine') and el.lower() == 'input':
ret += serialize(el, value[el], indent+2, 'EndInput')
# REB: For the hybrid engine. How todeal with the space in el (Engine DFTB)? Replace by underscore?
elif key.lower().startswith('engine') and el.lower().startswith('engine') :
engine = ' '.join(el.split('_'))
ret += serialize(engine, value[el], indent+2, end='EndEngine') + '\n'
else:
ret += serialize(el, value[el], indent+2)
if key.lower() == 'input': end='endinput'
ret += ' '*indent + end+'\n'
elif isinstance(value, list):
for el in value:
ret += serialize(key, el, indent)
elif value == '' or value is True:
ret += ' '*indent + key + '\n'
elif value is False or value is None:
pass
else:
ret += ' '*indent + key + ' ' + str(unspec(value)) + '\n'
return ret
fullinput = self.settings.input.copy()
#prepare contents of 'system' block(s)
more_systems = self._serialize_molecule()
if more_systems:
if 'system' in fullinput.ams:
#nonempty system block was already present in input.ams
system = fullinput.ams.system
system_list = system if isinstance(system, list) else [system]
system_list_set = Settings({(s._h if '_h' in s else ''):s for s in system_list})
more_systems_set = Settings({(s._h if '_h' in s else ''):s for s in more_systems})
system_list_set += more_systems_set
system_list = list(system_list_set.values())
system = system_list[0] if len(system_list) == 1 else system_list
fullinput.ams.system = system
else:
fullinput.ams.system = more_systems[0] if len(more_systems) == 1 else more_systems
txtinp = ''
ams = fullinput.find_case('ams')
#contents of the 'ams' block (AMS input) go first
for item in fullinput[ams]:
txtinp += serialize(item, fullinput[ams][item], 0) + '\n'
#and then engines
for engine in fullinput:
if engine != ams:
txtinp += serialize('Engine '+engine, fullinput[engine], 0, end='EndEngine') + '\n'
return txtinp
def _serialize_molecule(self):
"""Return a list of |Settings| instances containing the information about one or more |Molecule| instances stored in the ``molecule`` attribute.
Molecular charge is taken from ``molecule.properties.charge``, if present. Additional, atom-specific information to be put in ``atoms`` block after XYZ coordinates can be supplied with ``atom.properties.suffix``.
If the ``molecule`` attribute is a dictionary, the returned list is of the same length as the size of the dictionary. Keys from the dictionary are used as headers of returned ``system`` blocks.
"""
if self.molecule is None:
return Settings()
moldict = {}
if isinstance(self.molecule, Molecule):
moldict = {'':self.molecule}
elif isinstance(self.molecule, dict):
moldict = self.molecule
else:
raise JobError("Incorrect 'molecule' attribute of job {}. 'molecule' should be a Molecule, a dictionary or None, and not {}".format(self._full_name(), type(self.molecule)))
ret = []
for name, molecule in moldict.items():
newsystem = Settings()
if name:
newsystem._h = name
if len(molecule.lattice) in [1,2] and molecule.align_lattice():
log("The lattice of {} Molecule supplied for job {} did not follow the convention required by AMS. I rotated the whole system for you. You're welcome".format(name if name else 'main', self._full_name()), 3)
newsystem.Atoms._1 = [atom.str(symbol=self._atom_symbol(atom), space=18, decimal=10,
suffix=(atom.properties.suffix if 'suffix' in atom.properties else '')) for atom in molecule]
if molecule.lattice:
newsystem.Lattice._1 = ['{:16.10f} {:16.10f} {:16.10f}'.format(*vec) for vec in molecule.lattice]
if len(molecule.bonds)>0:
newsystem.BondOrders._1 = ['{} {} {}'.format(molecule.index(b.atom1), molecule.index(b.atom2), b.order) for b in molecule.bonds]
if 'charge' in molecule.properties:
newsystem.Charge = molecule.properties.charge
ret.append(newsystem)
return ret
#=========================================================================
@staticmethod
def _atom_symbol(atom):
"""Return the atomic symbol of *atom*. Ensure proper formatting for AMSuite input taking into account ``ghost`` and ``name`` entries in ``properties`` of *atom*."""
smb = atom.symbol if atom.atnum > 0 else '' #Dummy atom should have '' instead of 'Xx'
if 'ghost' in atom.properties and atom.properties.ghost:
smb = ('Gh.'+smb).rstrip('.')
if 'name' in atom.properties:
smb = (smb+'.'+str(atom.properties.name)).lstrip('.')
return smb
@staticmethod
def _tuple2rkf(arg):
"""Transform a pair ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a name of ``.rkf`` file (``ams`` or engine) to an absolute path to that ``.rkf`` file."""
if len(arg) == 2 and isinstance(arg[1], str):
if isinstance(arg[0], AMSJob):
return arg[0].results.rkfpath(arg[1])
if isinstance(arg[0], AMSResults):
return arg[0].rkfpath(arg[1])
return str(arg)
[docs] @staticmethod
def settings_to_mol(s: Settings) -> dict:
"""Pop the `s.input.ams.system` block from a settings instance and convert it into a dictionary of molecules.
The provided settings should be in the same style as the ones produced by the SCM input parser.
Dictionary keys are taken from the header of each system block.
The existing `s.input.ams.system` block is removed in the process, assuming it was present in the first place.
"""
def read_mol(settings_block: Settings) -> Molecule:
"""Retrieve single molecule from a single `s.input.ams.system` block."""
mol = Molecule()
for atom in settings_block.atoms._1:
# Extract arguments for Atom()
symbol, x, y, z, *comment = atom.split(maxsplit=4)
kwargs = {} if not comment else {'suffix': comment[0]}
coords = float(x), float(y), float(z)
try:
at = Atom(symbol=symbol, coords=coords, **kwargs)
except PTError: # It's either a ghost atom and/or an atom with a custom name
if symbol.startswith('Gh.'): # Ghost atom
kwargs['ghost'], symbol = symbol.split('.', maxsplit=1)
if '.' in symbol: # Atom with a custom name
symbol, kwargs['name'] = symbol.split('.', maxsplit=1)
at = Atom(symbol=symbol, coords=coords, **kwargs)
mol.add_atom(at)
# Add bonds
for bond in settings_block.bondorders._1:
_at1, _at2, _order = bond.split()
at1, at2, order = mol[int(_at1)], mol[int(_at2)], float(_order)
mol.add_bond(at1, at2, order)
# Set the lattice vector if applicable
if settings_block.lattice._1:
mol.lattice = [tuple(float(j) for j in i.split()) for i in settings_block.lattice._1]
# Set the molecular charge
if settings_block.charge:
mol.properties.charge = float(settings_block.charge)
mol.properties.name = str(settings_block._h)
return mol
# Raises a KeyError if the `system` key is absent
with s.suppress_missing():
try:
settings_list = s.input.ams.pop('system')
except KeyError: # The block s.input.ams.system is absent
return None
# Create a new dictionary with system headers as keys and molecules as values
moldict = {}
for settings_block in settings_list:
key = str(settings_block._h) if ('_h' in settings_block) else '' # Empty string used as default system name.
if key in moldict:
raise KeyError(f"Duplicate system headers found in s.input.ams.system: {repr(key)}")
moldict[key] = read_mol(settings_block)
return moldict
@staticmethod
def _slurm_env(settings):
"""Produce a string with environment variables declaration needed for running AMS on a SLURM managed system.
If the key ``slurm`` is present in ``settings.run`` and it's ``True`` the returned string is of the form:
``export SCM_MPIOPTIONS="-n X -N Y\n"
``X`` is taken from ``settings.run.cores`` (if not present, falls back to ``settings.runscript.nproc``).
``Y`` is taken from ``settings.run.nodes``
"""
if 'slurm' in settings.run and settings.run.slurm is True:
slurmenv = ''
if 'cores' in settings.run:
slurmenv += f'-n {settings.run.cores} '
elif 'nproc' in settings.runscript:
slurmenv += f'-n {settings.runscript.nproc} '
if 'nodes' in settings.run:
slurmenv += f'-N {settings.run.nodes} '
if slurmenv:
return f'export SCM_MPIOPTIONS="{slurmenv}"\n'
return ''