import os
from os.path import join as opj
from typing import Dict, List, Literal, Set, Union
import numpy as np
from scm.plams.core.basejob import SingleJob
from scm.plams.core.errors import FileError, JobError, PlamsError, PTError, ResultsError
from scm.plams.core.functions import config, log, parse_heredoc
from scm.plams.core.private import sha256
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.mol.atom import Atom
from scm.plams.mol.bond import Bond
from scm.plams.mol.molecule import Molecule
from scm.plams.tools.converters import gaussian_output_to_ams, qe_output_to_ams, vasp_output_to_ams
from scm.plams.tools.kftools import KFFile, KFReader
from scm.plams.tools.units import Units
try:
from scm.pisa.block import DriverBlock
_has_scm_pisa = True
except ImportError:
_has_scm_pisa = False
try:
from scm.libbase import UnifiedChemicalSystem as ChemicalSystem
_has_scm_unichemsys = True
except ImportError:
_has_scm_unichemsys = False
try:
from watchdog.events import FileModifiedEvent, PatternMatchingEventHandler
from watchdog.observers import Observer
_has_watchdog = True
class AMSJobLogTailHandler(PatternMatchingEventHandler):
def __init__(self, job, jobmanager):
super().__init__(
patterns=[os.path.join(jobmanager.workdir, f"{job.name}*", "ams.log")],
ignore_patterns=["*.rkf", "*.out"],
ignore_directories=True,
case_sensitive=True,
)
self._job = job
self._seekto = 0
def on_any_event(self, event):
if (
self._job.path is not None
and event.src_path == os.path.join(self._job.path, "ams.log")
and isinstance(event, FileModifiedEvent)
):
try:
with open(event.src_path, "r") as f:
f.seek(self._seekto)
while True:
line = f.readline()
if not line:
break
log(f"{self._job.name}: " + line[25:-1])
self._seekto = f.tell()
except FileNotFoundError:
self._seekto = 0
def trigger(self):
if self._job.path is None:
return
src_path = os.path.join(self._job.path, "ams.log")
self.on_any_event(FileModifiedEvent(src_path))
except ImportError:
_has_watchdog = False
__all__ = ["AMSJob", "AMSResults"]
[docs]class AMSResults(Results):
"""A specialized |Results| subclass for accessing the results of |AMSJob|."""
def __init__(self, *args, **kwargs):
Results.__init__(self, *args, **kwargs)
self.rkfs = {}
[docs] def collect(self):
"""Collect files present in the job folder. Use parent method from |Results|, then create an instance of |KFFile| for each ``.rkf`` file present in the job folder. Collect these files in ``rkfs`` dictionary, with keys being filenames without ``.rkf`` extension.
The information about ``.rkf`` files generated by engines is taken from the main ``ams.rkf`` file.
This method is called automatically during the final part of the job execution and there is no need to call it manually.
"""
Results.collect(self)
self.collect_rkfs()
def collect_rkfs(self):
rkfname = "ams.rkf"
if rkfname in self.files:
main = KFFile(opj(self.job.path, rkfname))
n = main[("EngineResults", "nEntries")]
for i in range(1, n + 1):
files = main[("EngineResults", "Files({})".format(i))].split("\x00")
if files[0].endswith(".rkf"):
key = files[0][:-4]
self.rkfs[key] = KFFile(opj(self.job.path, files[0]))
self.rkfs["ams"] = main
else:
log("WARNING: Main KF file {} not present in {}".format(rkfname, self.job.path), 1)
def _copy_to(self, newresults):
super()._copy_to(newresults)
newresults.rkfs = {}
newresults.collect_rkfs()
[docs] def refresh(self):
"""Refresh the contents of ``files`` list.
Use the parent method from |Results|, then look at |KFFile| instances present in ``rkfs`` dictionary and check if they point to existing files. If not, try to reinstantiate them with current job path (that can happen while loading a pickled job after the entire job folder was moved).
"""
Results.refresh(self)
to_remove = []
for key, val in self.rkfs.items():
if not os.path.isfile(val.path):
if os.path.dirname(val.path) != self.job.path:
guessnewpath = opj(self.job.path, os.path.basename(val.path))
if os.path.isfile(guessnewpath):
self.rkfs[key] = KFFile(guessnewpath)
else:
to_remove.append(key)
else:
to_remove.append(key)
for i in to_remove:
del self.rkfs[i]
[docs] def engine_names(self):
"""Return a list of all names of engine specific ``.rkf`` files. The identifier of the main result file (``'ams'``) is not present in the returned list, only engine specific names are listed."""
self.refresh()
ret = list(self.rkfs.keys())
ret.remove("ams")
return ret
[docs] def read_hybrid_term_rkf(self, section: str, variable: str, term: int, file="engine"):
"""Reads a Hybrid-termX-subengine.rkf file.
The engine.rkf file contains a section EngineResults with Files(1), Files(2) etc. that point
to the individual term .rkf files.
This method reads the corresponding individual term .rkf file.
Example: job.results.read_hybrid_term_rkf("AMSResults", "Energy", file="engine", term=1)
"""
kf_file = self.readrkf("EngineResults", f"Files({term})", file=file)
kf = KFReader(os.path.join(self.job.path, kf_file))
return kf.read(section, variable)
[docs] def rkfpath(self, file: str = "ams"):
"""Return the absolute path of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
"""
return self._access_rkf(lambda x: x.path, file)
[docs] def readrkf(self, section, variable, file: str = "ams"):
"""Read data from *section*/*variable* of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
The type of the returned value depends on the type of *variable* defined inside KF file. It can be: single int, list of ints, single float, list of floats, single boolean, list of booleans or string.
.. note::
If arguments *section* or *variable* are incorrect (not present in the chosen file), the returned value is ``None``. Please mind the fact that KF files are case sensitive.
"""
return self._access_rkf(lambda x: x.read(section, variable), file)
[docs] def read_rkf_section(self, section: str, file: str = "ams"):
"""Return a dictionary with all variables from a given *section* of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
.. note::
If *section* is not present in the chosen file, the returned value is an empty dictionary. Please mind the fact that KF files are case sensitive.
"""
return self._access_rkf(lambda x: x.read_section(section), file)
[docs] def get_rkf_skeleton(self, file: str = "ams") -> Dict[str, Set[str]]:
"""Return a dictionary with the structure of a chosen ``.rkf`` file. Each key corresponds to a section name with the value being a set of variable names present in that section.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
"""
return self._access_rkf(lambda x: x.get_skeleton(), file)
[docs] def get_molecule(self, section: str, file: str = "ams"):
"""Return a |Molecule| instance stored in a given *section* of a chosen ``.rkf`` file.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
All data used by this method is taken from the chosen ``.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored.
"""
sectiondict = self.read_rkf_section(section, file)
if sectiondict:
return Molecule._mol_from_rkf_section(sectiondict)
def get_ase_atoms(self, section: str, file: str = "ams"):
from ase import Atoms
sectiondict = self.read_rkf_section(section, file)
bohr2angstrom = 0.529177210903
nLatticeVectors = sectiondict.get("nLatticeVectors", 0)
pbc = [True] * nLatticeVectors + [False] * (3 - nLatticeVectors)
if nLatticeVectors > 0:
cell = np.zeros((3, 3))
lattice = np.array(sectiondict.get("LatticeVectors")).reshape(-1, 3)
cell[: lattice.shape[0], : lattice.shape[1]] = lattice * bohr2angstrom
else:
cell = None
atomsymbols = sectiondict["AtomSymbols"].split()
positions = np.array(sectiondict["Coords"]).reshape(-1, 3) * bohr2angstrom
return Atoms(symbols=atomsymbols, positions=positions, pbc=pbc, cell=cell)
[docs] def get_main_molecule(self):
"""Return a |Molecule| instance with the final coordinates.
All data used by this method is taken from ``ams.rkf`` file.
The ``molecule`` attribute of the corresponding job is ignored.
"""
return self.get_molecule("Molecule", "ams")
[docs] def get_main_ase_atoms(self, get_results: bool = False):
"""Return an ase.Atoms instance with the final coordinates.
An alternative is to call toASE(results.get_main_molecule()) to convert a Molecule to ASE Atoms.
get_results : bool
If True, the returned Atoms will have a SinglePointCalculator as their calculator, allowing you to call .get_potential_energy(), .get_forces(), and .get_stress() on the returned Atoms object.
"""
from ase.calculators.singlepoint import SinglePointCalculator
atoms = self.get_ase_atoms("Molecule", "ams")
if get_results:
atoms.set_calculator(SinglePointCalculator(atoms))
try:
energy = self.get_energy(unit="eV")
atoms.calc.results["energy"] = energy
except KeyError:
pass
try:
forces = (-1) * np.array(self.get_gradients(dist_unit="angstrom", energy_unit="eV")).reshape(-1, 3)
atoms.calc.results["forces"] = forces
except KeyError:
pass
try:
stress = np.array(self.get_stresstensor()).ravel() * Units.convert(1.0, "hartree/bohr^3", "eV/ang^3")
if len(stress) == 9:
stress = stress.reshape(3, 3)
atoms.calc.results["stress"] = np.array(
[stress[0][0], stress[1][1], stress[2][2], stress[1][2], stress[0][2], stress[0][1]]
)
except KeyError:
pass
return atoms
[docs] def get_history_molecule(self, step: int):
"""Return a |Molecule| instance with coordinates taken from a particular *step* in the ``History`` section of ``ams.rkf`` file.
All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored.
"""
if "ams" in self.rkfs:
main = self.rkfs["ams"]
if not self.is_valid_stepnumber(main, step):
return
coords = main.read("History", f"Coords({step})")
coords = [coords[i : i + 3] for i in range(0, len(coords), 3)]
if ("History", f"SystemVersion({step})") in main:
system = self.get_system_version(main, step)
mol = self.get_molecule(f"ChemicalSystem({system})")
molsrc = f"ChemicalSystem({system})"
else:
mol = self.get_main_molecule()
molsrc = "Molecule"
assert mol is not None
if len(mol) != len(coords):
raise ResultsError(
f'Coordinates taken from "History%Coords({step})" have incompatible length with molecule from {molsrc} section'
)
for at, c in zip(mol, coords):
at.move_to(c, unit="bohr")
if ("History", "LatticeVectors(" + str(step) + ")") in main:
lattice = Units.convert(main.read("History", "LatticeVectors(" + str(step) + ")"), "bohr", "angstrom")
mol.lattice = [tuple(lattice[j : j + 3]) for j in range(0, len(lattice), 3)]
# Bonds from the reference molecule are probably outdated. Let us never use them ...
mol.delete_all_bonds()
# ... but instead use bonds from the history section if they are available:
if all(
("History", i) in main
for i in [f"Bonds.Index({step})", f"Bonds.Atoms({step})", f"Bonds.Orders({step})"]
):
index = main.read("History", f"Bonds.Index({step})")
if not isinstance(index, list):
index = [index]
atoms = main.read("History", f"Bonds.Atoms({step})")
if not isinstance(atoms, list):
atoms = [atoms]
orders = main.read("History", f"Bonds.Orders({step})")
if not isinstance(orders, list):
orders = [orders]
for i in range(len(index) - 1):
for j in range(index[i], index[i + 1]):
mol.add_bond(mol[i + 1], mol[atoms[j - 1]], orders[j - 1])
if ("History", f"Bonds.CellShifts({step})") in main:
assert mol.lattice
cellShifts = main.read("History", f"Bonds.CellShifts({step})")
ndim = len(mol.lattice)
for i, b in enumerate(mol.bonds):
b.properties.suffix = " ".join(
[f"{cellShifts[ndim*i+j]}" for j in range(min(len(mol.lattice), ndim))]
)
return mol
[docs] def is_valid_stepnumber(self, main, step: int):
"""
Check if the requested step number is in the results file
"""
if "History" not in main:
raise KeyError("'History' section not present in {}".format(main.path))
n = main.read("History", "nEntries")
if step > n:
raise KeyError("Step {} not present in 'History' section of {}".format(step, main.path))
return True
[docs] def get_system_version(self, main, step: int):
"""
Determine which Molecule version is requested
"""
if ("History", f"SystemVersion({step})") in main:
version = main.read("History", f"SystemVersion({step})")
if "SystemVersionHistory" in main:
if ("SystemVersionHistory", "blockSize") in main:
blockSize = main.read("SystemVersionHistory", "blockSize")
else:
blockSize = 1
block = (version - 1) // blockSize + 1
offset = (version - 1) % blockSize
system = main.read("SystemVersionHistory", f"SectionNum({block})", return_as_list=True)[offset]
else:
system = version
return system
[docs] def get_history_variables(self, history_section: str = "History"):
"""Return a set of keynames stored in the specified history section of the ``ams.rkf`` file.
The *history_section argument should be a string representing the name of the history section (``History`` or ``MDHistory``)*
"""
if not "ams" in self.rkfs:
return
main = self.rkfs["ams"]
keylist = [var for sec, var in main if sec == history_section]
# Now throw out all the last parts
return set([key.split("(")[0] for key in keylist if len(key.split("(")) > 1])
[docs] def get_history_length(self, history_section: str = "History") -> int:
"""Returns the number of entries (nEntries) in the history section on the ams.rkf file."""
return self.readrkf(history_section, "nEntries")
[docs] def get_history_property(self, varname: str, history_section: str = "History"):
"""Return the values of *varname* in the history section *history_section*."""
if not "ams" in self.rkfs:
return
main = self.rkfs["ams"]
if history_section not in main:
raise KeyError(f"The requested section '{history_section}' does not exist in {main.path}")
if (history_section, "nScanCoord") in main: # PESScan
nentries = main.read(history_section, "nScanCoord")
as_block = False
elif (history_section, "nEntries") in main:
nentries = main.read(history_section, "nEntries")
as_block = self._values_stored_as_blocks(main, varname, history_section)
if as_block:
nblocks = main.read(history_section, "nBlocks")
values = [
main.read(history_section, f"{varname}({iblock})", return_as_list=True)
for iblock in range(1, nblocks + 1)
]
values = [val for blockvals in values for val in blockvals if isinstance(blockvals, list)]
else:
values = [main.read(history_section, f"{varname}({step})") for step in range(1, nentries + 1)]
return values
[docs] def get_property_at_step(self, step: int, varname: str, history_section: str = "History"):
"""Return the value of *varname* in the history section *history_section at step *step*."""
if not "ams" in self.rkfs:
return
main = self.rkfs["ams"]
as_block = self._values_stored_as_blocks(main, varname, history_section)
if as_block:
blocksize = main.read(history_section, "blockSize")
iblock = int(np.ceil(step / blocksize))
value = main.read(
history_section, f"{varname}({iblock})"
) # this can return something that isn't a list, for example an int
try:
value = value[(step % blocksize) - 1]
except TypeError: # TypeError: 'int' object is not subscriptable
pass
else:
value = main.read(history_section, f"{varname}({step})")
return value
def _values_stored_as_blocks(self, main, varname: str, history_section: str):
"""Determines wether the values of varname in a trajectory rkf file are stored in blocks"""
nentries = main.read(history_section, "nEntries")
as_block = False
# This is extremely slow, because looping over main is very slow.
# This is because the (sec,var) tuples are first stored in a set, then sorted, and only then yielded
keylist = [var for sec, var in main if sec == history_section]
if "nBlocks" in keylist:
if not f"{varname}({nentries})" in keylist:
as_block = True
return as_block
[docs] def get_atomic_temperatures_at_step(self, step: int, history_section: str = "MDHistory"):
"""
Get all the atomic temperatures for step `step`
Note: Numbering of steps starts at 1
"""
if not "ams" in self.rkfs:
return
main = self.rkfs["ams"]
if not self.is_valid_stepnumber(main, step):
return
# Read the masses
molname = "Molecule"
if ("History", f"SystemVersion({step})") in main:
system = self.get_system_version(main, step)
molname = "ChemicalSystem(%i)" % (system)
masses = np.array(main.read(molname, "AtomMasses"))
nats = len(masses)
# Read the velocities
velocities = np.array(self.get_property_at_step(step, "Velocities", history_section)).reshape((nats, 3))
# Convert to SI units and compute temperatures
m = masses * 1.0e-3 / Units.constants["NA"]
vels = velocities * Units.conversion_ratio("Bohr", "Angstrom") * 1.0e5
temperatures = (m.reshape((nats, 1)) * vels**2).sum(axis=1)
temperatures /= 3 * Units.constants["k_B"]
return temperatures
[docs] def get_band_structure(self, bands=None, unit: str = "hartree", only_high_symmetry_points: bool = False):
"""
Extracts the electronic band structure from DFTB or BAND calculations. The returned data can be plotted with ``plot_band_structure``.
Note: for unrestricted calculations bands 0, 2, 4, ... are spin-up and bands 1, 3, 5, ... are spin-down.
Returns: ``x``, ``y_spin_up``, ``y_spin_down``, ``labels``, ``fermi_energy``
``x``: 1D array of float
``y_spin_up``: 2D array of shape (len(x), len(bands)). Every column is a separate band. In units of ``unit``
``y_spin_down``: 2D array of shape (len(x), len(bands)). Every column is a separate band. In units of ``unit``. If the calculation was restricted this is identical to y_spin_up.
``labels``: 1D list of str of length len(x). If a point is not a high-symmetry point then the label is an empty string.
``fermi_energy``: float. The Fermi energy (in units of ``unit``).
Arguments below.
bands: list of int or None
If None, all bands are returned. Note: the band indices start with 0.
unit: str
Unit of the returned band energies
only_high_symmetry_points: bool
Return only the first point of each edge.
"""
read_labels = True
nBands = self.readrkf("band_curves", "nBands", file="engine")
nEdges = self.readrkf("band_curves", "nEdges", file="engine")
try:
nSpin = self.readrkf("band_curves", "nSpin", file="engine")
except KeyError:
nSpin = 1
if bands is None:
bands = np.arange(nBands)
spindown_bands = bands
if nSpin == 2:
spindown_bands = np.array(bands) + nBands
prevmaxx = 0
complete_spinup_data = []
complete_spindown_data = []
labels = []
x = []
for i in range(nEdges):
my_x = self.readrkf("band_curves", f"Edge_{i+1}_xFor1DPlotting", file="engine")
my_x = np.array(my_x) + prevmaxx
prevmaxx = np.max(my_x)
if read_labels:
my_labels = self.readrkf("band_curves", f"Edge_{i+1}_labels", file="engine").split()
if len(my_labels) == 2:
if only_high_symmetry_points:
labels += [my_labels[0]] # only the first point of the curve
else:
labels += [my_labels[0]] + [""] * (len(my_x) - 2) + [my_labels[1]]
if only_high_symmetry_points:
x.append(my_x[0:1])
else:
x.append(my_x)
A = self.readrkf("band_curves", f"Edge_{i+1}_bands", file="engine")
A = np.array(A).reshape(-1, nBands * nSpin)
spinup_data = A[:, bands]
spindown_data = A[:, spindown_bands]
if only_high_symmetry_points:
spinup_data = np.reshape(spinup_data[0, :], (-1, len(bands)))
spindown_data = np.reshape(spindown_data[0, :], (-1, len(spindown_bands)))
complete_spinup_data.append(spinup_data)
complete_spindown_data.append(spindown_data)
complete_spinup_data = np.concatenate(complete_spinup_data)
complete_spinup_data = Units.convert(complete_spinup_data, "hartree", unit)
complete_spindown_data = np.concatenate(complete_spindown_data)
complete_spindown_data = Units.convert(complete_spindown_data, "hartree", unit)
x = np.concatenate(x).ravel()
fermi_energy = self.readrkf("BandStructure", "FermiEnergy", file="engine")
fermi_energy = Units.convert(fermi_energy, "hartree", unit)
return x, complete_spinup_data, complete_spindown_data, labels, fermi_energy
[docs] def get_engine_results(self, engine: str = None):
"""Return a dictionary with contents of ``AMSResults`` section from an engine results ``.rkf`` file.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(lambda x: x.read_section("AMSResults"), engine)
[docs] def get_engine_properties(self, engine: str = None):
"""Return a dictionary with all the entries from ``Properties`` section from an engine results ``.rkf`` file.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
def properties(kf):
if not "Properties" in kf:
return {}
# There are two *kinds* of "Properties" sections:
# - ADF simply has a set of variables in the properties section
# - DFTB and some other engines have a *scheme* for storing "Properties". See the fortran 'RKFileModule' (RKFile.f90)
# for more details on this.
ret = {}
if ("Properties", "nEntries") in kf:
# This is a 'RKFileModule' properties section:
n = kf.read("Properties", "nEntries")
for i in range(1, n + 1):
tp = kf.read("Properties", "Type({})".format(i)).strip()
stp = kf.read("Properties", "Subtype({})".format(i)).strip()
val = kf.read("Properties", "Value({})".format(i))
key = stp if stp.endswith(tp) else ("{} {}".format(stp, tp) if stp else tp)
ret[key] = val
else:
skeleton = kf.get_skeleton()
for variable in skeleton["Properties"]:
ret[variable] = kf.read("Properties", variable)
return ret
return self._process_engine_results(properties, engine)
[docs] def get_energy(self, unit: str = "hartree", engine: str = None):
"""Return final energy, expressed in *unit*. The final energy is found in AMSResults%Energy of the engine rkf file. You can find the meaning of final energy in the engine documentation.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(lambda x: x.read("AMSResults", "Energy"), engine) * Units.conversion_ratio(
"au", unit
)
[docs] def get_energy_uncertainty(self, unit: str = "hartree", engine: str = None):
"""Return final energy uncertainty, expressed in *unit*. The final energy is found in AMSResults%EnergyU of the engine rkf file. You can find the meaning of final energy uncertainty in the engine documentation.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(
lambda x: x.read("AMSResults", "EnergyUncertainty"), engine
) * Units.conversion_ratio("au", unit)
[docs] def get_gradients(self, energy_unit: str = "hartree", dist_unit: str = "bohr", engine: str = None) -> np.ndarray:
"""Return the gradients of the final energy, expressed in *energy_unit* / *dist_unit*.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return (
np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "Gradients"), engine)).reshape(-1, 3)
* Units.conversion_ratio("au", energy_unit)
/ Units.conversion_ratio("au", dist_unit)
)
[docs] def get_gradients_uncertainty(
self, energy_unit: str = "hartree", dist_unit: str = "bohr", engine: str = None
) -> np.ndarray:
"""Return the uncertainty of the gradients of the final energy, expressed in *energy_unit* / *dist_unit*.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return (
np.asarray(
self._process_engine_results(lambda x: x.read("AMSResults", "GradientsUncertainty"), engine)
).reshape(-1, 3)
* Units.conversion_ratio("au", energy_unit)
/ Units.conversion_ratio("au", dist_unit)
)
[docs] def get_gradients_magnitude_uncertainty(
self, energy_unit: str = "hartree", dist_unit: str = "bohr", engine: str = None
) -> np.ndarray:
"""Return the uncertainty of the magnitude of the gradients of the final energy, expressed in *energy_unit* / *dist_unit*.
This is computed using error propagation.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return (
np.asarray(
self._process_engine_results(lambda x: x.read("AMSResults", "GradientsMagnitudeUncertainty"), engine)
).reshape(-1)
* Units.conversion_ratio("au", energy_unit)
/ Units.conversion_ratio("au", dist_unit)
)
[docs] def get_stresstensor(self, engine: str = None) -> np.ndarray:
"""Return the final stress tensor, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "StressTensor"), engine)).reshape(
len(self.get_input_molecule().lattice), -1
)
[docs] def get_hessian(self, engine: str = None) -> np.ndarray:
"""Return the Hessian matrix, i.e. the second derivative of the total energy with respect to the nuclear coordinates, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "Hessian"), engine)).reshape(
3 * len(self.get_input_molecule()), -1
)
[docs] def get_elastictensor(self, engine: str = None):
"""Return the elastic tensor, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
et_flat = np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "ElasticTensor"), engine))
num_latvec = len(self.get_input_molecule().lattice)
if num_latvec == 1:
return et_flat.reshape(1, 1)
elif num_latvec == 2:
return et_flat.reshape(3, 3)
else:
return et_flat.reshape(6, 6)
[docs] def get_frequencies(self, unit: str = "cm^-1", engine: str = None):
"""Return a numpy array of vibrational frequencies, expressed in *unit*.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
freqs = self._process_engine_results(lambda x: x.read("Vibrations", "Frequencies[cm-1]"), engine)
freqs = np.array(freqs) if isinstance(freqs, list) else np.array([freqs])
return freqs * Units.conversion_ratio("cm^-1", unit)
[docs] def get_frequency_spectrum(
self,
engine: str = None,
broadening_type: Literal["gaussian", "lorentzian"] = "gaussian",
broadening_width=40,
min_x=0,
max_x=4000,
x_spacing=0.5,
post_process: Literal["max_to_1"] = None,
):
"""Return the "frequency spectrum" (i.e. the frequencies broaden with intensity equal to 1). Units: frequencies are in cm-1, the intensities by the default are in mode counts units but post_process is equal to max_to_1 are in arbitrary units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
from scm.plams.tools.postprocess_results import broaden_results
frequencies = self.get_frequencies(engine=engine)
intensities = frequencies * 0 + 1
x_data, y_data = broaden_results(
centers=frequencies,
areas=intensities,
broadening_width=broadening_width,
broadening_type=broadening_type,
x_data=(min_x, max_x, x_spacing),
post_process=post_process,
)
if post_process == "max_to_1":
y_data /= np.max(y_data)
return x_data, y_data
[docs] def get_force_constants(self, engine: str = None):
"""Return a numpy array of force constants, expressed in atomic units (Hartree/bohr^2).
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
forceConstants = self._process_engine_results(lambda x: x.read("Vibrations", "ForceConstants"), engine)
forceConstants = np.array(forceConstants) if isinstance(forceConstants, list) else np.array([forceConstants])
return forceConstants
[docs] def get_normal_modes(self, engine: str = None):
"""Return a numpy array of normal modes with shape: (num_normal_modes, num_atoms, 3), expressed in dimensionless units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
normal_modes_list = []
num_normal_modes = self._process_engine_results(lambda x: x.read("Vibrations", "nNormalModes"), engine)
for i in range(num_normal_modes):
n_mode = np.array(
self._process_engine_results(lambda x: x.read("Vibrations", f"NoWeightNormalMode({i+1})"), engine)
).reshape(-1, 3)
normal_modes_list.append(n_mode)
return np.array(normal_modes_list).reshape(num_normal_modes, -1, 3)
[docs] def get_charges(self, engine: str = None):
"""Return the atomic charges, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "Charges"), engine))
[docs] def get_dipolemoment(self, engine: str = None):
"""Return the electric dipole moment, expressed in atomic units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "DipoleMoment"), engine))
[docs] def get_dipolegradients(self, engine: str = None):
"""Return the nuclear gradients of the electric dipole moment, expressed in atomic units. This is a (3*numAtoms x 3) matrix.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(
self._process_engine_results(lambda x: x.read("AMSResults", "DipoleGradients"), engine)
).reshape(-1, 3)
[docs] def get_polarizability(self, engine: str = None):
"""Return the polarizability, expressed in atomic units [(e*bohr)^2/hartree]. This is a (3 x 3) matrix.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
p_components = np.asarray(
self._process_engine_results(lambda x: x.read("AMSResults", "Polarizability"), engine)
)
if p_components.shape == (6,):
polarizability_matrix = np.array(
[
[p_components[0], p_components[1], p_components[3]],
[p_components[1], p_components[2], p_components[4]],
[p_components[3], p_components[4], p_components[5]],
]
)
elif p_components.shape == (3, 3):
polarizability_matrix = p_components
else:
raise ValueError(
f"AMSResults-Polarizability shape is {p_components.shape} not in agreement with the option as inputs (6,) [xx,xy,yy,xz,zy,zz] or (3,3)"
)
return polarizability_matrix
[docs] def get_zero_point_energy(self, unit: str = "hartree", engine: str = None):
"""Return zero point energy, expressed in *unit*. The zero point energy is found in Vibrations%ZeroPointEnergy of the engine rkf file.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(
lambda x: x.read("Vibrations", "ZeroPointEnergy"), engine
) * Units.conversion_ratio("au", unit)
[docs] def get_ir_intensities(self, engine: str = None):
"""Return the IR intensities in km/mol unit (kilometers/mol).
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(
self._process_engine_results(lambda x: x.read("Vibrations", "Intensities[km/mol]"), engine)
).reshape(
-1,
)
[docs] def get_ir_spectrum(
self,
engine: str = None,
broadening_type: Literal["gaussian", "lorentzian"] = "gaussian",
broadening_width=40,
min_x=0,
max_x=4000,
x_spacing=0.5,
post_process: Literal["all_intensities_to_1", "max_to_1"] = None,
):
"""Return the IR spectrum. Units: frequencies are in cm-1, the intensities by the default are in km/mol units (kilometers/mol) but if post_process is all_intensities_to_1 the units are in modes counts otherwise if equal to max_to_1 are in arbitrary units.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
from scm.plams.tools.postprocess_results import broaden_results
frequencies = self.get_frequencies(engine=engine)
if post_process == "all_intensities_to_1":
intensities = frequencies * 0 + 1
else:
intensities = self.get_ir_intensities(engine=engine)
x_data, y_data = broaden_results(
centers=frequencies,
areas=intensities,
broadening_width=broadening_width,
broadening_type=broadening_type,
x_data=(min_x, max_x, x_spacing),
post_process=post_process,
)
if post_process == "max_to_1":
y_data /= np.max(y_data)
return x_data, y_data
[docs] def get_n_spin(self, engine: str = None):
"""n_spin is 1 in case of spin-restricted or spin-orbit coupling calculations, and 2 in case of spin-unrestricted calculations
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(lambda x: x.read("AMSResults", "nSpin"))
[docs] def get_orbital_energies(self, unit: str = "Hartree", engine: str = None):
"""Return the orbital energies in a numpy array of shape [nSpin,nOrbitals] (nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted)
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return Units.convert(
np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "orbitalEnergies"), engine)).reshape(
self.get_n_spin(), -1
),
"Hartree",
unit,
)
[docs] def get_orbital_occupations(self, engine: str = None):
"""Return the orbital occupations in a numpy array of shape [nSpin,nOrbitals]. For spin restricted calculations, the occupations will be between 0 and 2. For spin unrestricted or spin-orbit coupling the values will be between 0 and 1.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return np.asarray(
self._process_engine_results(lambda x: x.read("AMSResults", "orbitalOccupations"), engine)
).reshape(self.get_n_spin(), -1)
[docs] def get_homo_energies(self, unit: str = "Hartree", engine: str = None):
"""
Return the homo energies per spin as a numpy array of size [nSpin]. nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted. See also :func:`~are_orbitals_fractionally_occupied`.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return Units.convert(
np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "HOMOEnergy"), engine)).reshape(-1),
"Hartree",
unit,
)
[docs] def get_lumo_energies(self, unit: str = "Hartree", engine: str = None):
"""
Return the lumo energies per spin as a numpy array of size [nSpin]. nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted. See also :func:`~are_orbitals_fractionally_occupied`.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return Units.convert(
np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "LUMOEnergy"), engine)).reshape(-1),
"Hartree",
unit,
)
[docs] def get_smallest_homo_lumo_gap(self, unit: str = "Hartree", engine: str = None):
"""
Returns a float containing the smallest HOMO-LUMO gap irrespective of spin (i.e. min(LUMO) - max(HOMO)). See also :func:`~are_orbitals_fractionally_occupied`.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return Units.convert(
self._process_engine_results(lambda x: x.read("AMSResults", "SmallestHOMOLUMOGap"), engine), "Hartree", unit
)
[docs] def are_orbitals_fractionally_occupied(self, engine: str = None):
"""
Returns a boolean indicating whether fractional occupations were detected. If that is the case, then the 'HOMO' and 'LUMO' labels are not well defined, since the demarcation between 'occupied' and 'empty' is somewhat arbitrary. See the AMSDriver documentation for more info.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
return self._process_engine_results(lambda x: x.read("AMSResults", "fractionalOccupation"), engine)
[docs] def get_timings(self):
"""Return a dictionary with timing statistics of the job execution. Returned dictionary contains keys cpu, system and elapsed. The values are corresponding timings, expressed in seconds."""
ret = {}
try:
# new AMS versions store timings on ams.rkf
ret["elapsed"] = self.readrkf("General", "ElapsedTime")
ret["system"] = self.readrkf("General", "SysTime")
ret["cpu"] = self.readrkf("General", "CPUTime")
except:
# fall back to reading output, was needed for old AMS versions
cpu = self.grep_output("Total cpu time:")
system = self.grep_output("Total system time:")
elapsed = self.grep_output("Total elapsed time:")
ret["elapsed"] = float(elapsed[0].split()[-1])
ret["system"] = float(system[0].split()[-1])
ret["cpu"] = float(cpu[0].split()[-1])
return ret
[docs] def get_forcefield_params(self, engine: str = None):
"""
Read all force field data from a forcefield.rkf file into self
* ``filename`` -- Name of the RKF file that contains ForceField data
"""
from scm.plams.interfaces.adfsuite.forcefieldparams import forcefield_params_from_kf
return self._process_engine_results(forcefield_params_from_kf, engine)
[docs] def get_exit_condition_message(self) -> str:
"""Tries to get the error message if the driver was stopped by an exit condition.
Returns an empty string if no message was provided and the driver was not stopped by an exit condition"""
try:
msg = self.readrkf("History", "ExitConditionMsg")
assert isinstance(msg, str)
return msg
except:
return ""
def get_poissonratio(self, engine: str = None):
return self._process_engine_results(lambda x: x.read("AMSResults", "PoissonRatio"), engine)
def get_youngmodulus(self, unit: str = "au", engine: str = None):
return self._process_engine_results(
lambda x: x.read("AMSResults", "YoungModulus"), engine
) * Units.conversion_ratio("au", unit)
def get_shearmodulus(self, unit: str = "au", engine: str = None):
return self._process_engine_results(
lambda x: x.read("AMSResults", "ShearModulus"), engine
) * Units.conversion_ratio("au", unit)
def get_bulkmodulus(self, unit: str = "au", engine: str = None):
return self._process_engine_results(
lambda x: x.read("AMSResults", "BulkModulus"), engine
) * Units.conversion_ratio("au", unit)
[docs] def get_pesscan_results(self, molecules: bool = True):
"""
For PESScan jobs, this functions extracts information about the scan coordinates and energies.
molecules : bool
Whether to return a Molecule at each PES point.
Returns a dictionary, with the following keys:
'RaveledScanCoords': a list of str with the names of the scan coordinates: ['sc_1_1','sc_1_2','sc_2_1','sc_2_2',...]
'nRaveledScanCoords': the length of the previous list
'RaveledUnits': a list of str with the units: ['bohr', 'bohr', 'radian', 'bohr', ...]
'RaveledPESCoords': a nested list with the values of the scan coordinates: [[val_1_1_1,val_1_1_2,...],[val_1_2_1,val_1_2_2,...],[val_2_1_1,val_2_1_2,...],[val_2_2_1,val_2_2_2]]
'ScanCoords': a nested list: [['sc_1_1','sc_1_2'],['sc_2_1','sc_2_2'],...]
'nScanCoords': length of the previous list
'Units': a nested list with the units: [['bohr', 'bohr'],['radian', 'bohr'], ...]
'OrigScanCoords': a list of str in the newline-separated format stored on the .rkf file: ['sc_1_1\\nsc_1_2','sc_2_1\\nsc_2_2',...]
'nPESPoints': int, number of PES points
'PES': list of float, the energies at each PES point
'Converged': list of bool, whether the geometry optimization at each PES point converged
'Molecules': list of Molecule, the structure at each PES point. Only if the property molecules == True
'HistoryIndices': list of int, the indices (1-based) in the History section which correspond to the Molecules and PES.
'ConstrainedAtoms': set of int, all atom indices (1-based) that were part of any PESScan scan coordinates (other constrained atoms are not included)
'Properties': list of dict. The dictionary keys are what can be found on the AMSResults section of the engine .rkf file. These will only be populated if "CalcPropertiesAtPESPoints" is set to Yes when running the PES scan.
"""
import re
from natsort import natsorted
def tolist(x):
if isinstance(x, list):
return x
else:
return [x]
nScanCoord = self.readrkf("PESScan", "nScanCoord")
pes = tolist(self.readrkf("PESScan", "PES"))
origscancoords = tolist(self.get_history_property("ScanCoord", history_section="PESScan"))
# one scan coordinate may have several variables
scancoords = [x.split("\n") for x in origscancoords]
units = []
pescoords = tolist(self.readrkf("PESScan", "PESCoords"))
pescoords = np.array(pescoords).reshape(-1, sum(len(x) for x in scancoords))
pescoords = np.transpose(pescoords)
units = []
for i in range(nScanCoord):
units.append([])
for j in range(len(scancoords[i])):
if (
scancoords[i][j] in ["a", "b", "c"]
or "Dist" in scancoords[i][j]
or "Coordinate" in scancoords[i][j]
):
units[-1].append("bohr")
elif "Volume" in scancoords[i][j]:
units[-1].append("bohr^3")
elif "Area" in scancoords[i][j]:
units[-1].append("bohr^2")
else:
units[-1].append("radian")
converged = tolist(self.readrkf("PESScan", "GOConverged"))
converged = [bool(x) for x in converged]
historyindices = tolist(self.readrkf("PESScan", "HistoryIndices"))
ret = {}
raveled_scancoords = [x[i] for x in scancoords for i in range(len(x))] # 1d list
raveled_units = [x[i] for x in units for i in range(len(x))] # 1d list
constrained_atoms = []
for sc in raveled_scancoords:
constrained_atoms.extend(re.findall(r"\((\d+)\)", sc))
try:
constrained_atoms = set(int(x) for x in constrained_atoms)
except ValueError:
constrained_atoms = set()
ret["RaveledScanCoords"] = raveled_scancoords
ret["nRaveledScanCoords"] = len(raveled_scancoords)
ret["ConstrainedAtoms"] = constrained_atoms
ret["ScanCoords"] = scancoords
ret["nScanCoords"] = len(scancoords)
ret["OrigScanCoords"] = origscancoords # newline delimiter for joint scan coordinates
ret["RaveledPESCoords"] = pescoords.tolist()
ret["Units"] = units
ret["RaveledUnits"] = raveled_units
ret["Converged"] = converged
ret["PES"] = pes
ret["nPESPoints"] = len(pes)
ret["HistoryIndices"] = historyindices
if molecules:
mols = []
for ind in historyindices:
mols.append(self.get_history_molecule(ind))
ret["Molecules"] = mols
ret["Properties"] = []
for key in natsorted(title for title in self.rkfs.keys() if title.startswith("PESPoint")):
amsresults = self.rkfs[key].read_section("AMSResults")
ret["Properties"].append(amsresults)
return ret
[docs] def get_neb_results(self, molecules: bool = True, unit: str = "au"):
"""
Returns a dictionary with results from a NEB calculation.
molecules : bool
Whether to include the 'Molecules' key in the return result
unit : str
Energy unit for the Energies, LeftBarrier, RightBarrier, and ReactionEnergy
Returns: dict
'nImages': number of images (excluding end points)
'nIterations': number of iterations
'Energies': list of energies (including end points)
'Climbing': bool, whether climbing image NEB was used
'LeftBarrier': float, left reaction barrier
'RightBarrier': float, right reaction barrier
'ReactionEnergy': float, reaction energy
'HistoryIndices': list of int, same length as 'Energies', contains indices in the History section
'Molecules': list of Molecule (including end points)
"""
def tolist(x):
if isinstance(x, list):
return x
else:
return [x]
ret = {}
conversion_ratio = Units.conversion_ratio("au", unit)
ret["nImages"] = self.readrkf("NEB", "nebImages")
ret["nIterations"] = self.readrkf("NEB", "nebIterations")
ret["Climbing"] = bool(self.readrkf("NEB", "climbing"))
ret["HighestIndex"] = self.readrkf("NEB", "highestIndex")
ret["LeftBarrier"] = self.readrkf("NEB", "LeftBarrier") * conversion_ratio
ret["RightBarrier"] = self.readrkf("NEB", "RightBarrier") * conversion_ratio
ret["ReactionEnergy"] = self.readrkf("NEB", "ReactionEnergy") * conversion_ratio
history_dim = tolist(self.readrkf("NEB", "historyIndex@dim")) # nimages, randombign
history_dim.reverse() # randombign, nimages
history_indices_matrix = tolist(self.readrkf("NEB", "historyIndex")) # this matrix is padded with -1 values
history_indices_matrix = np.array(history_indices_matrix).reshape(history_dim)
history_indices = np.max(history_indices_matrix, axis=0, keepdims=False).tolist()
if any(x == -1 for x in history_indices):
raise ValueError("Found -1 in the 'converged' part of historyIndex. This should not happen!")
ret["HistoryIndices"] = history_indices
ret["Energies"] = [self.get_property_at_step(ind, "Energy") * conversion_ratio for ind in ret["HistoryIndices"]]
if molecules:
ret["Molecules"] = [self.get_history_molecule(ind) for ind in ret["HistoryIndices"]]
return ret
[docs] def get_irc_results(self, molecules: bool = True, unit: str = "au"):
"""
Returns a dictionary with results from an IRC calculation.
molecules : bool
Whether to include the 'Molecules' key in the return result
unit : str
Energy unit for the Energies, LeftBarrier, RightBarrier
Returns: dict
'Energies': list of energies
'RelativeEnergies': list of energies relative to the first point
'LeftBarrier': float, left reaction barrier
'RightBarrier': float, right reaction barrier
'Molecules': list of Molecule (including end points)
'IRCDirection': IRC direction
'Energies': Energies (in ``unit``)
'PathLength': Path length in angstrom
'IRCIteration': list of int
'IRCGradMax': list of float
'IRCGradRms': list of float
'ArcLength': list of float
"""
from itertools import compress
def tolist(x):
if isinstance(x, list):
return x
else:
return [x]
sec = "History"
items = [
"IRCDirection",
"Energy",
"PathLength",
"IRCIteration",
"IRCGradMax",
"IRCGradRms",
"ArcLength",
"HistoryIndices",
"Molecules",
]
d = {}
forw = {}
back = {}
reformed = {}
forw_mask = None
back_mask = None
converged = self.get_history_property("Converged", history_section=sec)
converged = tolist(converged)
converged_mask = [x != 0 for x in converged]
history_indices = [i for i, x in enumerate(converged_mask, 1) if x] # raw, not rearranged
for k in items:
# first half is forward direction, second half is backward direction
# second half should be reversed, path length made negative.
if k == "Molecules":
if not molecules:
continue
d[k] = [self.get_history_molecule(ind) for ind in history_indices] # rearrangement happens later
elif k == "HistoryIndices":
d[k] = history_indices # rearrangement happens later
else:
d[k] = self.get_history_property(k, history_section=sec)
d[k] = tolist(d[k])
d[k] = list(compress(d[k], converged_mask))
if k == "IRCDirection":
forw_mask = [x == 1 for x in d[k]]
back_mask = [x != 1 for x in d[k]]
d[k] = ["Forward" if x == 1 else "Backward" if x == 2 else x for x in d[k]]
forw[k] = list(compress(d[k], forw_mask))
back[k] = list(compress(d[k], back_mask))
back[k].reverse()
if k == "PathLength":
# print backwards direction as negative numbers
back[k] = [-x for x in back[k]]
reformed[k] = back[k] + forw[k]
conversion_ratio = Units.convert(1.0, "au", unit)
reformed["Energies"] = [x * conversion_ratio for x in reformed["Energy"]]
del reformed["Energy"]
max_energy = max(reformed["Energies"])
if "Forward" in reformed["IRCDirection"]:
reformed["LeftBarrier"] = max_energy - reformed["Energies"][0]
if "Backward" in reformed["IRCDirection"]:
reformed["RightBarrier"] = max_energy - reformed["Energies"][-1]
reformed["RelativeEnergies"] = [x - reformed["Energies"][0] for x in reformed["Energies"]]
return reformed
[docs] def get_time_step(self, history_section: Literal["BinLog", "MDHistory"] = "MDHistory"):
"""Returns the time step between adjacent frames (NOT the TimeStep in the settings, but Timestep*SamplingFreq) in femtoseconds for MD simulation jobs"""
time1 = self.get_property_at_step(1, "Time", history_section=history_section)
time2 = self.get_property_at_step(2, "Time", history_section=history_section)
time_step = time2 - time1
return time_step
def _get_integer_start_end_every_max(self, start_fs, end_fs, every_fs, max_dt_fs, time_step=None):
"""converts from femtoseconds to integers"""
if time_step is None:
time_step = self.get_time_step()
start_step = round(start_fs / time_step)
end_step = round(end_fs / time_step) if end_fs is not None else None
every = round(every_fs / time_step) if every_fs is not None else 1
max_dt = round((max_dt_fs / time_step) / every) if max_dt_fs is not None else None
return start_step, end_step, every, max_dt
[docs] def get_velocity_acf(
self,
start_fs=0,
end_fs=None,
every_fs=None,
max_dt_fs=None,
atom_indices=None,
x=True,
y=True,
z=True,
normalize=False,
):
"""
Calculate the velocity autocorrelation function. Works only for trajectories with a constant number of atoms.
start_fs : float
Start time in femtoseconds. Defaults to 0 (first frame)
end_fs : float
End time in femtoseconds. Defaults to the end of the trajectory.
max_dt_fs : float
Maximum correlation time in femtoseconds.
atom_indices: list of int
Atom indices for which to calculate the velocity autocorrelation function. Defaults to all atoms. The atom indices start with 1.
x : bool
Whether to use the velocities along x
y : bool
Whether to use the velocities along y
z : bool
Whether to use the velocities along z
normalize : bool
Whether to normalize the velocity autocorrelation function so that it is 1 at time 0.
Returns: 2-tuple (times, C)
``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the autocorrelation function. If not ``normalize`` then the unit is angstrom^2/fs^2.
"""
from scm.plams.trajectories.analysis import autocorrelation
nEntries = self.readrkf("MDHistory", "nEntries")
time_step = self.get_time_step()
start_step, end_step, every, max_dt = self._get_integer_start_end_every_max(
start_fs, end_fs, every_fs, max_dt_fs
)
data = self.get_history_property("Velocities", history_section="MDHistory")
data = np.array(data).reshape(nEntries, -1, 3)[start_step:end_step:every]
if atom_indices is not None:
zero_based_atom_indices = [x - 1 for x in atom_indices]
data = data[:, zero_based_atom_indices, :]
n_dimensions = 3
if not x or not y or not z:
components = []
if x:
components += [0]
if y:
components += [1]
if z:
components += [2]
data = data[:, :, components]
n_dimensions = len(components)
data *= Units.convert(1.0, "bohr", "angstrom") # convert from bohr/fs to ang/fs
vacf = (
autocorrelation(data, max_dt=max_dt, normalize=normalize) * n_dimensions
) # multiply by n_dimensions to undo the averaging per component and instead average per atom
times = np.arange(len(vacf)) * time_step * every
return times, vacf
def get_dipole_history(self, dipole_unit="e*bohr"):
dipole_x = self.get_history_property(history_section="BinLog", varname="DipoleMoment_x")
dipole_y = self.get_history_property(history_section="BinLog", varname="DipoleMoment_y")
dipole_z = self.get_history_property(history_section="BinLog", varname="DipoleMoment_z")
data = np.column_stack((dipole_x, dipole_y, dipole_z))
data *= Units.convert(1.0, "e*bohr", dipole_unit)
return data
[docs] def get_dipole_derivatives_acf(
self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, x=True, y=True, z=True, normalize=False
):
"""
Calculate the velocity autocorrelation function. Works only for trajectories with a constant number of atoms.
start_fs : float
Start time in femtoseconds. Defaults to 0 (first frame)
end_fs : float
End time in femtoseconds. Defaults to the end of the trajectory.
max_dt_fs : float
Maximum correlation time in femtoseconds.
x : bool
Whether to use the velocities along x
y : bool
Whether to use the velocities along y
z : bool
Whether to use the velocities along z
normalize : bool
Whether to normalize the velocity autocorrelation function so that it is 1 at time 0.
Returns: 2-tuple (times, C)
``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the autocorrelation function. If not ``normalize`` then the unit is angstrom^2/fs^2.
"""
from scm.plams.trajectories.analysis import autocorrelation
# nEntries = self.readrkf('BinLog', 'nEntries')
time_step = self.get_time_step(history_section="BinLog")
data = self.get_dipole_history()
start_step, end_step, every, max_dt = self._get_integer_start_end_every_max(
start_fs, end_fs, every_fs, max_dt_fs, time_step
)
data = data[start_step:end_step:every, :]
n_dimensions = 3
if not x or not y or not z:
components = []
if x:
components += [0]
if y:
components += [1]
if z:
components += [2]
data = data[:, components]
n_dimensions = len(components)
data_deriv = np.diff(data, axis=0) / time_step
if max_dt is None: # default value is 5000 or num_timesteps // 2
num_timesteps = data_deriv.shape[0]
max_dt = num_timesteps // 2
if max_dt > 5000:
max_dt = 5000
dipole_deriv_acf = (
autocorrelation(data_deriv, max_dt=max_dt, normalize=normalize) * n_dimensions
) # multiply by n_dimensions to undo the averaging per component and instead average per atom
times = np.arange(len(dipole_deriv_acf)) * time_step * every
return times, dipole_deriv_acf
[docs] def get_diffusion_coefficient_from_velocity_acf(self, times=None, acf=None, n_dimensions=3):
"""
Diffusion coefficient by integration of the velocity autocorrelation function
If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated.
times: 1D numpy array
1D numpy array with the times
acf: 1D numpy array
1D numpy array with the velocity autocorrelation function in ang^2/fs^2
n_dimensions: int
Number of dimensions that were used to calculate the autocorrelation function.
Returns: 2-tuple (times, diffusion_coefficient)
``times`` are the times in femtoseconds. ``diffusion_coefficient`` is a 1D numpy array with the diffusion coefficients in m^2/s.
"""
from scipy.integrate import cumtrapz
if times is None or acf is None:
times, acf = self.get_velocity_acf(normalize=False)
diffusion_coefficient = (1.0 / n_dimensions) * cumtrapz(acf, times) # ang^2/fs
diffusion_coefficient *= 1e-20 / 1e-15
new_times = times[:-1]
return np.array(new_times), diffusion_coefficient
[docs] def get_power_spectrum(self, times=None, acf=None, max_dt_fs=None, max_freq=None, number_of_points=None):
"""
Calculates a power spectrum from the velocity autocorrelation function.
If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated.
times: 1D numpy array of float
The ``times`` returned by ``AMSResults.get_velocity_acf()``.
acf: 1D numpy arra of float
The ``vacf`` returned by ``AMSResults.get_velocity_acf()``
max_dt_fs : float
Maximum correlation time in femtoseconds.
max_freq: float
Maximum frequency (in cm^-1) of the returned power spectrum. Defaults to 5000 cm^-1.
number_of_points: int
Number of points in the returned power spectrum. Defaults to a number so that the spacing between points is about 1 cm^-1.
Returns: 2-tuple (frequencies, intensities)
``frequencies`` : Frequencies in cm^-1 (1D numpy array). ``intensities``: Intensities (1D numpy array).
"""
from scm.plams.trajectories.analysis import power_spectrum
if times is None or acf is None:
times, acf = self.get_velocity_acf(max_dt_fs=max_dt_fs, normalize=True)
return power_spectrum(times, acf, max_freq=max_freq, number_of_points=number_of_points)
[docs] def get_ir_spectrum_md(
self,
times: np.ndarray = None,
acf: np.ndarray = None,
max_dt_fs: float = None,
max_freq: float = 5000,
number_of_points: int = None,
):
"""
Calculates a ir spectrum from the dipole moment autocorrelation function.
If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated.
times: 1D numpy array of float
The ``times`` returned by ``AMSResults.get_dipole_derivatives_acf()``.
acf: 1D numpy array of float
The ``acf`` returned by ``AMSResults.get_dipole_derivatives_acf()``
max_dt_fs : float
Maximum correlation time in femtoseconds.
max_freq: float
Maximum frequency (in cm^-1) of the returned power spectrum. Defaults to 5000 cm^-1.
number_of_points: int
Number of points in the returned power spectrum. Defaults to a number so that the spacing between points is about 1 cm^-1.
Returns: 2-tuple (frequencies, intensities)
``frequencies`` : Frequencies in cm^-1 (1D numpy array). ``intensities``: Intensities (1D numpy array).
"""
from scm.plams.trajectories.analysis import power_spectrum
if times is None or acf is None:
times, acf = self.get_dipole_derivatives_acf(max_dt_fs=max_dt_fs, normalize=True)
return power_spectrum(times, acf, max_freq=max_freq, number_of_points=number_of_points)
@staticmethod
def _get_green_kubo_viscosity(pressuretensor, time_step, max_dt, volume, temperature, xy=True, yz=True, xz=True):
from scipy.integrate import cumtrapz
from scm.plams.tools.units import Units
from scm.plams.trajectories.analysis import autocorrelation
data = np.array(pressuretensor)
components = []
if yz:
components += [3]
if xz:
components += [4]
if xy:
components += [5]
data = data[:, components]
acf = autocorrelation(data, max_dt=max_dt)
times = np.arange(len(acf)) * time_step
integrated_acf = cumtrapz(acf, times, initial=0)
integrated_times = np.arange(len(integrated_acf)) * time_step
k_B = Units.constants["k_B"]
au2Pa = Units.convert(1.0, "hartree/bohr^3", "Pa")
viscosity = (volume * 1e-30) / (k_B * temperature) * integrated_acf * au2Pa**2 * 1e-15 * 1e3
return integrated_times, viscosity
[docs] def get_green_kubo_viscosity(
self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, xy=True, yz=True, xz=True, pressuretensor=None
):
"""
Calculates the viscosity using the Green-Kubo relation (integrating the off-diagonal pressure tensor autocorrelation function).
start_fs : float
Start time in femtoseconds. Defaults to 0 (first frame)
end_fs : float
End time in femtoseconds. Defaults to the end of the trajectory.
max_dt_fs : float
Maximum correlation time in femtoseconds.
xy : bool
Whether to use the xy off-diagonal elements
yz : bool
Whether to use the yz off-diagonal elements
xz : bool
Whether to use the xz off-diagonal elements
pressuretensor : np.array shape (N,6)
Pressure tensor in atomic units. If not specified, it will be read from the ams.rkf file.
Returns: 2-tuple (times, C)
``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the viscosity (in mPa*s) integral. It should converge to the viscosity values as the time increases.
"""
from scipy.integrate import cumtrapz
from scm.plams.tools.units import Units
from scm.plams.trajectories.analysis import autocorrelation
time_step = self.get_time_step()
start_step, end_step, every, max_dt = self._get_integer_start_end_every_max(
start_fs, end_fs, every_fs, max_dt_fs
)
data = pressuretensor
if data is None:
data = self.get_history_property("PressureTensor", "MDHistory")
data = [
x for x in data if x is not None
] # None might appear in currently running trajectories if the job was loaded with load_external
data = np.array(data)[start_step:end_step:every]
components = []
if yz:
components += [3]
if xz:
components += [4]
if xy:
components += [5]
data = data[:, components]
acf = autocorrelation(data, max_dt=max_dt)
times = np.arange(len(acf)) * time_step * every
integrated_acf = cumtrapz(acf, times)
integrated_times = np.arange(len(integrated_acf)) * time_step * every
V = self.get_main_molecule().unit_cell_volume()
try:
T = self.get_history_property("Temperature", "MDHistory")
T = np.array(T)[start_step:end_step:every]
mean_T = np.mean(T)
except KeyError: # might be triggered for currently running trajectories, then just use the first temperature
mean_T = self.get_property_at_step(1, "Temperature", "MDHistory")
k_B = Units.constants["k_B"]
au2Pa = Units.convert(1.0, "hartree/bohr^3", "Pa")
viscosity = (V * 1e-30) / (k_B * mean_T) * integrated_acf * au2Pa**2 * 1e-15 * 1e3
return integrated_times, viscosity
[docs] def get_density_along_axis(
self,
axis: str = "z",
density_type: str = "mass",
start_fs=0,
end_fs=None,
every_fs=None,
bin_width=0.1,
atom_indices=None,
):
"""
Calculates the density of atoms along a Cartesian coordinate axis.
This only works if the axis is perpendicular to the other two axes. The
system must be 3D-periodic and the number of atoms cannot change during
the trajectory.
axis : str
'x', 'y', or 'z'
density_type : str
'mass' gives the density in g/cm^3. 'number' gives the number density in ang^-3, 'histogram' gives the number of atoms per frame (so the number depends on the bin size)
start_fs : float
Start time in fs
end_fs : float
End time in fs
every_fs : float
Use data every every_fs timesteps
bin_width : float
Bin width of the returned coordinates (in angstrom).
atom_indices: list of int
Indices (starting with 1) for the atoms to use for calculating the density.
Returns: 2-tuple (coordinates, density)
``coordinates`` is a 1D array with the coordinates (in angstrom). ``density`` is a 1D array with the densities (in g/cm^3 or ang^-3 depending on ``density_type``).
"""
assert axis in ["x", "y", "z"], f"Unknown axis: {axis}. Must be 'x', 'y', or 'z'"
assert density_type in [
"mass",
"number",
"histogram",
], f"Unknown density_type: {density_type}. Must be 'mass' or 'number'"
main_mol = self.get_main_molecule()
histogram = density_type == "histogram"
bohr2ang = Units.convert(1.0, "bohr", "angstrom")
start_step, end_step, every, _ = self._get_integer_start_end_every_max(start_fs, end_fs, every_fs, None)
nEntries = self.readrkf("History", "nEntries")
coords = np.array(self.get_history_property("Coords")).reshape(nEntries, -1, 3)
coords = coords[start_step:end_step:every]
nEntries = len(coords)
axis2index = {"x": 0, "y": 1, "z": 2}
index = axis2index[axis]
if not histogram:
other_indices = [0, 1, 2]
other_indices.remove(index)
assert (
len(main_mol.lattice) == 3
), f"get_density_along_axis with density_type='mass' or 'number' can only be called for 3d-periodic systems. Current periodicity: {len(main_mol.lattice)}. Use density_type='histogram' instead."
assert (
np.abs(np.dot(main_mol.lattice[other_indices[0]], main_mol.lattice[index])) < 1e-6
), f"Axis {axis} must be perpendicular to the other two axes"
assert (
np.abs(np.dot(main_mol.lattice[other_indices[1]], main_mol.lattice[index])) < 1e-6
), f"Axis {axis} must be perpendicular to the other two axes"
assert (
np.abs(main_mol.lattice[index][other_indices[0]]) < 1e-6
), f"Density along {axis} requires that lattice vector {index+1} has only 1 non-zero component (along {axis}). The vector is {main_mol.lattice[index]}"
assert (
np.abs(main_mol.lattice[index][other_indices[1]]) < 1e-6
), f"Density along {axis} requires that lattice vector {index+1} has only 1 non-zero component (along {axis}). The vector is {main_mol.lattice[index]}"
lattice_vectors = np.array(self.get_history_property("LatticeVectors")).reshape(-1, 3, 3)
lattice_vectors = lattice_vectors[start_step:end_step:every] * bohr2ang
vec1s = lattice_vectors[:, other_indices[0], :]
vec2s = lattice_vectors[:, other_indices[1], :]
cross_products = np.cross(vec1s, vec2s)
areas = np.linalg.norm(cross_products, axis=1)
mean_area = np.mean(areas)
coords = coords[:, :, index]
if atom_indices is not None:
zero_based_atom_indices = [x - 1 for x in atom_indices]
coords = coords[:, zero_based_atom_indices]
coords *= bohr2ang
avogadro = Units.constants["NA"]
min_c = np.min(coords)
max_c = np.max(coords)
num_bins = round((max_c - min_c) / bin_width + 1)
bins = np.linspace(min_c, max_c, num_bins, endpoint=True)
if density_type == "mass":
masses = np.array(self.readrkf("Molecule", "AtomMasses"))
if atom_indices is not None:
masses = masses[zero_based_atom_indices]
masses_broadcasted = np.broadcast_to(masses, coords.shape)
hist, bin_edges = np.histogram(coords, weights=masses_broadcasted, bins=bins)
volume_slice_ang3 = mean_area * (bin_edges[1] - bin_edges[0])
volume_slice_cm3 = volume_slice_ang3 * 1e-24
density = (1.0 / nEntries) * (hist / avogadro) / volume_slice_cm3
elif density_type == "number":
hist, bin_edges = np.histogram(coords, bins=bins)
volume_slice_ang3 = mean_area * (bin_edges[1] - bin_edges[0])
density = (1.0 / nEntries) * hist / volume_slice_ang3
elif density_type == "histogram":
hist, bin_edges = np.histogram(coords, bins=bins)
density = (1.0 / nEntries) * hist
z = (bin_edges[:-1] + bin_edges[1:]) / 2.0
return z, density
[docs] def recreate_molecule(self) -> Union[None, Molecule, Dict[str, Molecule]]:
"""Recreate the input molecule(s) for the corresponding job based on files present in the job folder.
This method is used by |load_external|. If ``ams.rkf`` is present in the job folder,
extract data from the ``InputMolecule`` and ``InputMolecule(*)`` sections.
"""
if "ams" in self.rkfs:
mols = self.get_input_molecules()
if len(mols) == 0:
return None
elif len(mols) == 1 and "" in mols:
return mols[""]
else:
return mols
return None
[docs] def recreate_settings(self):
"""Recreate the input |Settings| instance for the corresponding job based on files present in the job folder. This method is used by |load_external|.
If ``ams.rkf`` is present in the job folder, extract user input and parse it back to a |Settings| instance using ``scm.libbase`` module. Remove the ``system`` branch from that instance.
"""
if "ams" in self.rkfs:
user_input = self.readrkf("General", "user input")
try:
from scm.plams.interfaces.adfsuite.inputparser import InputParserFacade
inp = InputParserFacade().to_settings("ams", user_input)
except:
log("Failed to recreate input settings from {}".format(self.rkfs["ams"].path))
return None
s = Settings()
s.input = inp
if "system" in s.input.ams:
del s.input.ams.system
s.soft_update(config.job)
return s
return None
[docs] def ok(self):
"""Check if the execution of the associated :attr:`job` was successful or not.
See :meth:`Job.ok<scm.plams.core.basejob.Job.ok>` for more information."""
return self.job.ok()
[docs] def get_errormsg(self):
"""Tries to get an an error message for a associated :attr:`job`. This method returns ``None`` if the associated job was successful.
See :meth:`Job.get_errormsg<scm.plams.core.basejob.Job.errormsg>` for more information."""
return self.job.get_errormsg()
@property
def name(self):
"""Retrun the :attr:`job.name` of the job associated with this results instance."""
return self.job.name
class EnergyLandscape:
class State:
def __init__(
self,
landscape,
engfile,
energy,
mol,
count,
isTS,
reactantsID=None,
productsID=None,
prefactorsFromReactant=None,
prefactorsFromProduct=None,
):
self._landscape = landscape
self.engfile = engfile
self.energy = energy
self.molecule = mol
self.count = count
self.isTS = isTS
self.reactantsID = reactantsID
self.productsID = productsID
self.prefactorsFromReactant = prefactorsFromReactant
self.prefactorsFromProduct = prefactorsFromProduct
@property
def id(self):
return self._landscape._states.index(self) + 1
@property
def reactants(self):
return self._landscape._states[self.reactantsID - 1]
@property
def products(self):
return self._landscape._states[self.productsID - 1]
def __str__(self):
if self.isTS:
lines = [
f"State {self.id}: {self.molecule.get_formula(False)} transition state @ {self.energy:.8f} Hartree (found {self.count} times"
+ (f", results on {self.engfile})" if self.engfile is not None else ")")
]
if self.reactantsID is not None:
lines += [f" +- Reactants: {self.reactants}"]
if self.productsID is not None:
lines += [f" Products: {self.products}"]
if self.reactantsID is not None and self.productsID is not None:
eV = Units.convert(1.0, "eV", "hartree")
lines += [
f" Prefactors: {self.prefactorsFromReactant:.3E}:{self.prefactorsFromProduct:.3E} s^-1"
]
dEforward = (self.energy - self._landscape._states[self.reactantsID - 1].energy) / eV
dEbackward = (self.energy - self._landscape._states[self.productsID - 1].energy) / eV
lines += [f" Barriers: {dEforward:.3f}:{dEbackward:.3f} eV"]
elif self.productsID is not None:
lines += [f" Prefactors: {self.prefactorsFromReactant:.3E}:?"]
else:
lines = [
f"State {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (found {self.count} times"
+ (f", results on {self.engfile})" if self.engfile is not None else ")")
]
return "\n".join(lines)
class Fragment:
def __init__(self, landscape, engfile, energy, mol):
self._landscape = landscape
self.engfile = engfile
self.energy = energy
self.molecule = mol
@property
def id(self):
return self._landscape._fragments.index(self) + 1
def __str__(self):
lines = [
f"Fragment {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (results on {self.engfile})"
]
return "\n".join(lines)
class FragmentedState:
def __init__(
self,
landscape,
energy,
composition,
connections=None,
adsorptionPrefactors=None,
desorptionPrefactors=None,
):
self._landscape = landscape
self.energy = energy
self.composition = composition
self.connections = connections
self.adsorptionPrefactors = adsorptionPrefactors
self.desorptionPrefactors = desorptionPrefactors
@property
def id(self):
return self._landscape._fstates.index(self) + 1
@property
def fragments(self):
return [self._landscape._fragments[i] for i in self.composition]
def __str__(self):
formula = ""
for i, id in enumerate(self.composition):
formula += self._landscape._fragments[id].molecule.get_formula(False)
if i != len(self.composition) - 1:
formula += "+"
lines = [
f"FragmentedState {self.id}: {formula} local minimum @ {self.energy:.8f} Hartree (fragments {[i+1 for i in self.composition]})"
]
for i, iState in enumerate(self.connections):
lines += [f" +- {self._landscape._states[iState]}"]
if i == len(self.connections) - 1:
lines += [
f" Prefactors: {self.adsorptionPrefactors[i]:.3E}:{self.desorptionPrefactors[i]:.3E}"
]
else:
lines += [
f" | Prefactors: {self.adsorptionPrefactors[i]:.3E}:{self.desorptionPrefactors[i]:.3E}"
]
return "\n".join(lines)
def __init__(self, results):
self._states = []
self._fragments = []
self._fstates = []
if results is None:
return
if "EnergyLandscape" not in results.get_rkf_skeleton():
return
sec = results.read_rkf_section("EnergyLandscape")
# If there is only 1 state in the 'EnergyLandscape' section, some variables that are normally lists are insted be build-in types (e.g. a 'float' instead of a 'list of floats').
# For convenience here we make sure that the following variables are always 'lists':
for var in ["energies", "counts", "isTS", "reactants", "products"]:
if not isinstance(sec[var], list):
sec[var] = [sec[var]]
nStates = sec["nStates"]
for iState in range(nStates):
energy = sec["energies"][iState]
resfile = os.path.splitext(sec["fileNames"].split("\0")[iState])[0]
mol = results.get_molecule("Molecule", file=resfile)
count = sec["counts"][iState]
if not sec["isTS"][iState]:
self._states.append(AMSResults.EnergyLandscape.State(self, resfile, energy, mol, count, False))
else:
reactantsID = sec["reactants"][iState] if sec["reactants"][iState] > 0 else None
productsID = sec["products"][iState] if sec["products"][iState] > 0 else None
prefactorsFromReactant = (
sec["prefactorsFromReactant"][iState] if sec["products"][iState] > 0 else None
)
prefactorsFromProduct = (
sec["prefactorsFromProduct"][iState] if sec["products"][iState] > 0 else None
)
self._states.append(
AMSResults.EnergyLandscape.State(
self,
resfile,
energy,
mol,
count,
True,
reactantsID,
productsID,
prefactorsFromReactant,
prefactorsFromProduct,
)
)
if "nFragments" not in sec:
return
nFragments = sec["nFragments"]
for iFragment in range(nFragments):
energy = sec["fragmentsEnergies"][iFragment]
resfile = os.path.splitext(sec["fragmentsFileNames"].split("\0")[iFragment])[0]
mol = results.get_molecule("Molecule", file=resfile)
self._fragments.append(AMSResults.EnergyLandscape.Fragment(self, resfile, energy, mol))
if "nFStates" not in sec:
return
nFragmentedStates = sec["nFStates"]
for iFState in range(nFragmentedStates):
iEnergy = sec["fStatesEnergy(" + str(iFState + 1) + ")"]
value = sec["fStatesComposition(" + str(iFState + 1) + ")"]
iComposition = [i - 1 for i in value] if isinstance(value, list) else [value - 1]
value = sec["fStatesConnections(" + str(iFState + 1) + ")"]
iConnections = [i - 1 for i in value] if isinstance(value, list) else [value - 1]
value = sec["fStatesAdsorptionPrefactors(" + str(iFState + 1) + ")"]
iAdsorptionPrefactors = value if isinstance(value, list) else [value]
value = sec["fStatesDesorptionPrefactors(" + str(iFState + 1) + ")"]
iDesorptionPrefactors = value if isinstance(value, list) else [value]
self._fstates.append(
AMSResults.EnergyLandscape.FragmentedState(
self, iEnergy, iComposition, iConnections, iAdsorptionPrefactors, iDesorptionPrefactors
)
)
@property
def minima(self):
return [s for s in self._states if not s.isTS]
@property
def transition_states(self):
return [s for s in self._states if s.isTS]
@property
def fragments(self):
return [f for f in self._fragments]
@property
def fragmented_states(self):
return [fs for fs in self._fstates]
def __str__(self):
lines = ["All stationary points:"]
lines += ["======================"]
for s in self._states:
lines += [str(s)]
for f in self._fragments:
lines += [str(f)]
for fs in self._fstates:
lines += [str(fs)]
return "\n".join(lines)
def __getitem__(self, i):
return self._states[i - 1]
def __iter__(self):
return iter(self._states)
def __len__(self):
return len(self._states)
[docs] def get_energy_landscape(self):
"""Returns the energy landscape obtained from a PESExploration job run by the AMS driver. The energy landscape is a set of stationary PES points (local minima and transition states).
The returned object is of the type ``AMSResults.EnergyLandscape`` and offers convenient access to the states' energies, geometries as well as the information on which transition states connect which minima.
.. code-block:: python
el = results.get_energy_landscape()
print(el)
for state in el:
print(f"Energy = {state.energy}")
print(f"Is transition state = {state.isTS}")
print("Geometry:", state.molecule)
if state.isTS:
print(f"Forward barrier: {state.energy - state.reactants.energy}")
print(f"Backward barrier: {state.energy - state.products.energy}")
"""
return AMSResults.EnergyLandscape(self)
# =========================================================================
def _access_rkf(self, func, file="ams"):
"""A skeleton method for accessing any of the ``.rkf`` files produced by AMS.
The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file.
The *func* argument has to be a function to call on a chosen ``.rkf`` file. It should take one argument, an instance of |KFFile|.
"""
# Try unique engine:
if file == "engine":
names = self.engine_names()
if len(names) == 1:
return func(self.rkfs[names[0]])
else:
raise ValueError(
"You cannot use 'engine' as 'file' argument if the engine results file is not unique. Please use the real name of the file you wish to read"
)
# Try:
if file in self.rkfs:
return func(self.rkfs[file])
# Try harder:
filename = file + ".rkf"
self.refresh()
if filename in self.files:
self.rkfs[file] = KFFile(opj(self.job.path, filename))
return func(self.rkfs[file])
# Surrender:
raise FileError("File {} not present in {}".format(filename, self.job.path))
def _process_engine_results(self, func, engine: str = None):
"""A generic method skeleton for processing any engine results ``.rkf`` file. *func* should be a function that takes one argument (an instance of |KFFile|) and returns arbitrary data.
The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder.
"""
names = self.engine_names()
if engine is not None:
if engine in names:
return func(self.rkfs[engine])
else:
raise FileError(f"File {engine}.rkf not present in {self.job.path}\n engine names found are: {names}")
else:
if len(names) == 1:
return func(self.rkfs[names[0]])
elif len(names) == 0:
raise FileError("There is no engine .rkf present in {}".format(self.job.path))
else:
raise ValueError(
"You need to specify the 'engine' argument when there are multiple engine result files present in the job folder"
)
# ===========================================================================
# ===========================================================================
# ===========================================================================
[docs]class AMSJob(SingleJob):
"""A class representing a single computation with AMS driver. The corresponding results type is |AMSResults|."""
results: AMSResults
molecule: Union[Molecule, Dict[str, Molecule], None]
_result_type = AMSResults
_command = "ams"
[docs] def __init__(self, molecule: Union[Molecule, Dict[str, Molecule], None] = None, *args, **kwargs):
super().__init__(molecule, *args, **kwargs)
[docs] def run(self, jobrunner=None, jobmanager=None, watch=False, **kwargs) -> AMSResults:
"""Run the job using *jobmanager* and *jobrunner* (or defaults, if ``None``).
If *watch* is set to ``True``, the contents of the AMS driver logfile will be forwarded line by line to the PLAMS logfile (and stdout), allowing for an easier monitoring of the running job.
Not that the forwarding of the AMS driver logfile will make the call to this method block until the job's execution has finished, even when using a parallel |JobRunner|.
Other keyword arguments (*\*\*kwargs*) are stored in ``run`` branch of job's settings.
Returned value is the |AMSResults| instance associated with this job.
"""
if _has_watchdog and watch:
if "default_jobmanager" in config:
jobmanager = config.default_jobmanager
else:
raise PlamsError("No default jobmanager found. This probably means that PLAMS init() was not called.")
observer = Observer()
event_handler = AMSJobLogTailHandler(self, jobmanager)
observer.schedule(event_handler, jobmanager.workdir, recursive=True)
observer.start()
try:
results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs)
results.wait()
event_handler.trigger()
finally:
observer.stop()
observer.join()
else:
results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs)
return results
[docs] def get_runscript(self):
"""Generate the runscript. Returned string is of the form::
unset AMS_SWITCH_LOGFILE_AND_STDOUT
AMS_JOBNAME=jobname AMS_RESULTSDIR=. $AMSBIN/ams [-n nproc] --input=jobname.in [>jobname.out]
``-n`` flag is added if ``settings.runscript.nproc`` exists. ``[>jobname.out]`` is used based on ``settings.runscript.stdout_redirect``. If ``settings.runscript.preamble_lines`` exists, those lines will be added to the runscript verbatim before the execution of AMS. If ``settings.runscript.postamble_lines`` exists, those lines will be added to the runscript verbatim after the execution of AMS.
"""
ret = "unset AMS_SWITCH_LOGFILE_AND_STDOUT\n"
ret += "unset SCM_LOGFILE\n"
if "nnode" in self.settings.runscript and config.slurm:
# Running as a SLURM job step and user specified the number of nodes explicitly.
ret += f'export SCM_SRUN_OPTIONS="$SCM_SRUN_OPTIONS -N {self.settings.runscript.nnode}"\n'
elif "nproc" in self.settings.runscript and config.slurm:
# Running as a SLURM job step and user asked for a specific number of tasks.
# Make sure to use as few nodes as possible to avoid distributing jobs needlessly across nodes.
# See: https://stackoverflow.com/questions/71382578
for nnode in range(1, len(config.slurm.tasks_per_node) + 1):
if sum(config.slurm.tasks_per_node[0:nnode]) >= self.settings.runscript.nproc:
break
if nnode > 1:
nnode = f"1-{nnode}"
ret += f'export SCM_SRUN_OPTIONS="$SCM_SRUN_OPTIONS -N {nnode}"\n'
if _has_scm_pisa and isinstance(self.settings.input, DriverBlock):
if self.settings.input.Engine.name == "QuantumESPRESSO":
ret += "export SCM_DISABLE_MPI=1\n"
else:
if "QuantumEspresso" in self.settings.input:
ret += "export SCM_DISABLE_MPI=1\n"
if "preamble_lines" in self.settings.runscript:
for line in self.settings.runscript.preamble_lines:
ret += f"{line}\n"
ret += 'AMS_JOBNAME="{}" AMS_RESULTSDIR=. $AMSBIN/ams'.format(self.name)
if "nproc" in self.settings.runscript:
ret += " -n {}".format(self.settings.runscript.nproc)
ret += ' --input="{}" < /dev/null'.format(self._filename("inp"))
if self.settings.runscript.stdout_redirect:
ret += ' >"{}"'.format(self._filename("out"))
ret += "\n\n"
if "postamble_lines" in self.settings.runscript:
for line in self.settings.runscript.postamble_lines:
ret += f"{line}\n"
ret += "\n"
return ret
[docs] def check(self):
"""Check if ``termination status`` variable from ``General`` section of main KF file equals ``NORMAL TERMINATION``."""
try:
status = self.results.readrkf("General", "termination status")
except:
return False
if "NORMAL TERMINATION" in status:
if "errors" in status:
log("Job {} reported errors. Please check the output".format(self._full_name()), 1)
return False
if "warnings" in status:
log("Job {} reported warnings. Please check the output".format(self._full_name()), 1)
return True
return False
[docs] def get_errormsg(self):
"""Tries to get an error message for a failed job. This method returns ``None`` for successful jobs."""
if self.check():
return None
else:
# Something went wrong. The first place to check is the termination status on the ams.rkf.
# If the AMS driver stopped with a known error (called StopIt in the Fortran code), the error will be in there.
try:
msg = self.results.readrkf("General", "termination status")
if msg == "NORMAL TERMINATION with errors" or msg is None:
# Apparently this wasn't a hard stop in the middle of the job.
# Let's look for the last error in the logfile ...
msg = self.results.grep_file("ams.log", "ERROR: ")[-1].partition("ERROR: ")[2]
elif msg == "IN PROGRESS" and "$JN.err" in self.results:
# If the status is still "IN PROGRESS", that probably means AMS was shut down hard from the outside.
# E.g. it got SIGKILL from the scheduler for exceeding some resource limit.
# In this case useful information may be found on stderr.
with open(self.results["$JN.err"], "r") as err:
errlines = err.read().splitlines()
for el in reversed(errlines):
if el != "" and not el.isspace():
msg = "Killed while IN PROGRESS: " + el
break
except:
msg = "Could not determine error message. Please check the output manually."
return msg
# =========================================================================
def _serialize_input(self, special) -> str:
"""Transform the contents of ``settings.input`` branch into string with blocks, keys and values.
First, the contents of ``settings.input`` are extended with entries returned by :meth:`_serialize_molecule`. Then the contents of ``settings.input.ams`` are used to generate AMS text input. Finally, every other (than ``ams``) entry in ``settings.input`` is used to generate engine specific input.
Special values can be indicated with *special* argument, which should be a dictionary having types of objects as keys and functions translating these types to strings as values.
"""
def unspec(value):
"""Check if *value* is one of a special types and convert it to string if it is."""
for spec_type in special:
if isinstance(value, spec_type):
return special[spec_type](value)
return value
def serialize(key, value, indent, end="End"):
"""Given a *key* and its corresponding *value* from the |Settings| instance produce a snippet of the input file representing this pair.
If the value is a nested |Settings| instance, use recursive calls to build the snippet for the entire block. Indent the result with *indent* spaces.
"""
ret = ""
if isinstance(value, Settings):
# Open block ...
ret += " " * indent + key
# ... with potential header
if "_h" in value:
ret += " " + unspec(value["_h"])
ret += "\n"
# Free block, or explicitly placed entries with _1, _2, ...
i = 1
while f"_{i}" in value:
if isinstance(value[f"_{i}"], Settings):
for ckey in value[f"_{i}"]:
ret += serialize(ckey, value[f"_{i}"][ckey], indent + 2)
else:
ret += serialize("", value["_" + str(i)], indent + 2)
i += 1
# Figure out the order in which we should serialize the entries in the block
if "_o" in value:
# Ordered block: we need to serialize the contents in the order given by "_o"
children = []
for v in value["_o"]:
ckey, _, idx = v.partition("[")
if idx:
# Child is a recurring entry, e.g. SubBlock[5]
idx = int(idx.rstrip("]"))
children.append((ckey, value[ckey][idx - 1]))
else:
# Child is a unique entry
children.append((ckey, value[ckey]))
else:
# Unordered block: normal Python order when iterating over a dict/Settings
children = [(ckey, value[ckey]) for ckey in value if not ckey.startswith("_")]
# Serialize all children
for ckey, cvalue in children:
split_key = key.lower().split()
split_ckey = ckey.lower().split()
if len(split_key) > 0 and split_key[0] == "engine" and ckey.lower() == "input":
ret += serialize(ckey, cvalue, indent + 2, "EndInput")
# REB: For the hybrid engine. How to deal with the space in ckey (Engine DFTB)? Replace by underscore?
elif len(split_ckey) > 0 and split_ckey[0] == "engine":
engine = " ".join(ckey.split("_"))
ret += serialize(engine, cvalue, indent + 2, end="EndEngine") + "\n"
else:
ret += serialize(ckey, cvalue, indent + 2)
# Close block
if key.lower() == "input":
end = "endinput"
ret += " " * indent + end + "\n"
elif isinstance(value, list):
for el in value:
ret += serialize(key, el, indent, end)
elif value == "" or value is True:
ret += " " * indent + key + "\n"
elif value is False or value is None:
pass
else:
ret += " " * indent + key + " " + str(unspec(value)) + "\n"
return ret
if _has_scm_pisa and isinstance(self.settings.input, DriverBlock):
# AMS specific way of writing input files:
# self.settings.input is an input class that knows how to serialize itself to text input.
txtinp = self.settings.input.get_input_string()
systems = self._serialize_molecule()
for system in systems:
system_input = serialize("System", system, 0)
# merge existing system block with one serialized from the molecule
if hasattr(self.settings.input, "System") and self.settings.input.System.value_changed:
if system["_h"]:
split_line = f"System {system['_h']}\n"
else:
split_line = "System\n"
start, end = txtinp.split(split_line)
# strip duplicate 'End' from molecule system block
system_input = "".join(system_input.splitlines(keepends=True)[:-1])
# insert system input in proper place
txtinp = start + system_input + end
else:
txtinp += "\n" + system_input
else:
# Open-source PLAMS way of writing input files:
# self.settings.input is a Settings object that we need to serialize to text input.
fullinput = self.settings.input.copy()
# prepare contents of 'system' block(s)
more_systems = self._serialize_molecule()
if more_systems:
if "system" in fullinput.ams:
# nonempty system block was already present in input.ams
system = fullinput.ams.system
system_list = system if isinstance(system, list) else [system]
system_list_set = Settings({(s._h if "_h" in s else ""): s for s in system_list})
more_systems_set = Settings({(s._h if "_h" in s else ""): s for s in more_systems})
system_list_set += more_systems_set
system_list = list(system_list_set.values())
system = system_list[0] if len(system_list) == 1 else system_list
fullinput.ams.system = system
else:
fullinput.ams.system = more_systems[0] if len(more_systems) == 1 else more_systems
txtinp = ""
ams = fullinput.find_case("ams")
# contents of the 'ams' block (AMS input) go first
for item in fullinput[ams]:
txtinp += serialize(item, fullinput[ams][item], 0) + "\n"
# and then engines
for engine in fullinput:
if engine != ams:
txtinp += serialize("Engine " + engine, fullinput[engine], 0, end="EndEngine") + "\n"
return txtinp
def _serialize_molecule(self):
"""Return a list of |Settings| instances containing the information about one or more |Molecule| instances stored in the ``molecule`` attribute.
Molecular charge is taken from ``molecule.properties.charge``, if present. Additional, atom-specific information to be put in ``atoms`` block after XYZ coordinates can be supplied with ``atom.properties.suffix``.
If the ``molecule`` attribute is a dictionary, the returned list is of the same length as the size of the dictionary. Keys from the dictionary are used as headers of returned ``system`` blocks.
"""
def serialize_to_settings(name, mol):
if isinstance(mol, Molecule):
sett = serialize_molecule_to_settings(mol, name)
elif _has_scm_unichemsys and isinstance(mol, ChemicalSystem):
sett = serialize_unichemsys_to_settings(mol)
if name:
sett._h = name
return sett
def serialize_unichemsys_to_settings(mol):
from scm.plams.interfaces.adfsuite.inputparser import InputParserFacade
sett = InputParserFacade().to_settings(AMSJob._command, str(mol))
return sett.ams.system[0]
def serialize_molecule_to_settings(mol, name: str = None):
sett = Settings()
if len(mol.lattice) in [1, 2] and mol.align_lattice():
log(
"The lattice of {} Molecule supplied for job {} did not follow the convention required by AMS. I rotated the whole system for you. You're welcome".format(
name if name else "main", self._full_name()
),
3,
)
sett.Atoms._1 = [
atom.str(symbol=self._atom_symbol(atom), space=18, decimal=10, suffix=self._atom_suffix(atom))
for atom in mol
]
if mol.lattice:
sett.Lattice._1 = ["{:16.10f} {:16.10f} {:16.10f}".format(*vec) for vec in mol.lattice]
if len(mol.bonds) > 0:
lines = ["{} {} {}".format(mol.index(b.atom1), mol.index(b.atom2), b.order) for b in mol.bonds]
# Add bond properties if they are defined
sett.BondOrders._1 = [
(
"{} {}".format(text, mol.bonds[i].properties.suffix)
if "suffix" in mol.bonds[i].properties
else text
)
for i, text in enumerate(lines)
]
if "charge" in mol.properties:
sett.Charge = mol.properties.charge
if "regions" in mol.properties:
# sett.Region = [Settings({'_h':name, '_1':['%s=%s'%(k,str(v)) for k,v in data.items()]}) for name, data in molecule.properties.regions.items()]
sett.Region = [
Settings({"_h": name, "Properties": Settings({"_1": [line for line in data]})})
for name, data in mol.properties.regions.items()
if isinstance(data, list)
]
return sett
if self.molecule is None:
return Settings()
moldict = {}
if isinstance(self.molecule, Molecule) or (_has_scm_unichemsys and isinstance(self.molecule, ChemicalSystem)):
moldict = {"": self.molecule}
elif isinstance(self.molecule, dict):
moldict = self.molecule
else:
raise JobError(
"Incorrect 'molecule' attribute of job {}. 'molecule' should be a Molecule, a UnifiedChemicalSystem, a dictionary or None, and not {}".format(
self._full_name(), type(self.molecule)
)
)
ret = [serialize_to_settings(name, molecule) for name, molecule in moldict.items()]
return ret
# =========================================================================
[docs] def get_task(self):
"""Returns the AMS Task from the job's settings. If it does not exist, returns None."""
if (
isinstance(self.settings, Settings)
and "input" in self.settings
and "ams" in self.settings.input
and "task" in self.settings.input.ams
):
return self.settings.input.ams.task
return None
@staticmethod
def _atom_suffix(atom):
"""Return the suffix of an atom. Combines both ``properties.suffix`` as well as the other properties in a format that is suitable for inclusion in the AMSuite input."""
# Build key-value dictionary from properties
keyval_dict = {}
def serialize(sett, prefix=""):
for key, val in sett.items():
if prefix == "" and key.lower() in ["suffix", "ghost", "name"]:
# Special atomic properties that are handled by _atom_symbol() already.
# Suffix handled explicitly below ...
continue
if isinstance(val, Settings):
# Recursively serialize nested Settings object
serialize(val, prefix + key + ".")
continue
elif isinstance(val, list):
# Lists are converted to a comma separated string of elements
val = ",".join(str(v) for v in val)
elif isinstance(val, set):
# Sets are converted to a *sorted* comma separated string of elements
val = ",".join(str(v) for v in sorted(val))
key = prefix + key
if not isinstance(val, str):
val = str(val)
if "\n" in val:
raise ValueError(f"String representation of atomic property {key} may not include line breaks")
if "{" in val or "}" in val:
raise ValueError(
f"String representation of atomic property {key} may not include curly brackets: {val}"
)
if " " in val:
# Ensure that space containing values are quoted
if '"' not in val:
val = '"' + val + '"'
elif "'" not in val:
val = "'" + val + "'"
else:
raise ValueError(f"Atomic property {key} can not include both single and double quotes: {val}")
keyval_dict[key] = str(val)
serialize(atom.properties)
# Assemble and return complete suffix string
ret = " ".join(f"{key}={val}" for key, val in keyval_dict.items())
if "suffix" in atom.properties:
ret += " " + str(atom.properties.suffix)
return ret.strip()
@staticmethod
def _atom_symbol(atom):
"""Return the atomic symbol of *atom*. Ensure proper formatting for AMSuite input taking into account ``ghost`` and ``name`` entries in ``properties`` of *atom*."""
smb = atom.symbol
if "ghost" in atom.properties and atom.properties.ghost:
smb = ("Gh." + smb).rstrip(".")
if "name" in atom.properties:
smb = (smb + "." + str(atom.properties.name)).lstrip(".")
return smb
@staticmethod
def _tuple2rkf(arg):
"""Transform a pair ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a name of ``.rkf`` file (``ams`` or engine) to an absolute path to that ``.rkf`` file."""
if len(arg) == 2 and isinstance(arg[1], str):
if isinstance(arg[0], AMSJob):
return arg[0].results.rkfpath(arg[1])
if isinstance(arg[0], AMSResults):
return arg[0].rkfpath(arg[1])
return str(arg)
[docs] @classmethod
def load_external(cls, path, settings=None, molecule=None, finalize=False, fmt="ams") -> "AMSJob":
"""Load an external job from *path*.
In this context an "external job" is an execution of some external binary that was not managed by PLAMS, and hence does not have a ``.dill`` file. It can also be used in situations where the execution was started with PLAMS, but the Python process was terminated before the execution finished, resulting in steps 9-12 of :ref:`job-life-cycle` not happening.
All the files produced by your computation should be placed in one folder and *path* should be the path to this folder or a file in this folder. The name of the folder is used as a job name. Input, output, error and runscript files, if present, should have names defined in ``_filenames`` class attribute (usually ``[jobname].in``, ``[jobname].out``, ``[jobname].err`` and ``[jobname].run``). It is not required to supply all these files, but in most cases one would like to use at least the output file, in order to use methods like :meth:`~scm.plams.core.results.Results.grep_output` or :meth:`~scm.plams.core.results.Results.get_output_chunk`. If *path* is an instance of an AMSJob, that instance is returned.
This method is a class method, so it is called via class object and it returns an instance of that class::
>>> a = AMSJob.load_external(path='some/path/jobname')
>>> type(a)
scm.plams.interfaces.adfsuite.ams.AMSJob
You can supply |Settings| and |Molecule| instances as *settings* and *molecule* parameters, they will end up attached to the returned job instance. If you don't do this, PLAMS will try to recreate them automatically using methods :meth:`~scm.plams.core.results.Results.recreate_settings` and :meth:`~scm.plams.core.results.Results.recreate_molecule` of the corresponding |Results| subclass. If no |Settings| instance is obtained in either way, the defaults from ``config.job`` are copied.
You can set the *finalize* parameter to ``True`` if you wish to run the whole :meth:`~Job._finalize` on the newly created job. In that case PLAMS will perform the usual :meth:`~Job.check` to determine the job status (*successful* or *failed*), followed by cleaning of the job folder (|cleaning|), |postrun| and pickling (|pickling|). If *finalize* is ``False``, the status of the returned job is *copied*.
fmt : str
One of 'ams', 'qe', 'vasp', or 'any'.
'ams': load a finished AMS job
'qe': convert a Quantum ESPRESSO .out file to ams.rkf and qe.rkf and load from those files
'vasp': convert a VASP OUTCAR file to ams.rkf and vasp.rkf and load from those files (see more below)
'any': auto-detect the format.
This method can also be used to convert a finished VASP job to an AMSJob. If you supply the path to a folder containing OUTCAR, then a subdirectory will be created in this folder called AMSJob. In the AMSJob subdirectory, two files will be created: ams.rkf and vasp.rkf, that contain some of the results from the VASP calculation. If the AMSJob subdirectory already exists, the existing ams.rkf and vasp.rkf files will be reused. NOTE: the purpose of loading VASP data this way is to let you call for example job.results.get_energy() etc., not to run new VASP calculations!
"""
if isinstance(path, cls):
return path
preferred_name = None
fmt = fmt.lower()
# first check if path is a VASP output
# in which case call cls._vasp_to_ams, which will return a NEW path
# containing ams.rkf and vasp.rkf (the .rkf files will be created if they do not exist)
if os.path.isdir(path):
preferred_name = os.path.basename(os.path.abspath(path))
if (
not os.path.exists(os.path.join(path, "ams.rkf"))
and os.path.exists(os.path.join(path, "OUTCAR"))
and (fmt == "vasp" or fmt == "any")
):
path = vasp_output_to_ams(path, overwrite=False)
elif os.path.exists(path) and os.path.basename(path) == "OUTCAR" and (fmt == "vasp" or fmt == "any"):
preferred_name = os.path.basename(os.path.dirname(os.path.abspath(path)))
path = vasp_output_to_ams(os.path.dirname(path), overwrite=False)
elif os.path.exists(path):
from ase.io.formats import filetype
try:
ft = filetype(path)
# check if path is a Quantum ESPRESSO .out file
# qe_output_to_ams returns a NEW path containng the ams.rkf and qe.rkf files (will be created if
# they do not exist)
if fmt == "qe":
assert ft == "espresso-out", f"The file {path} does not seem to be a Quantum ESPRESSO output file"
if fmt == "gaussian":
assert ft == "gaussian-out", f"The file {path} does not seem to be a Gaussian output file"
if ft == "espresso-out" and (fmt == "qe" or (fmt == "any" and not path.endswith("rkf"))):
path = qe_output_to_ams(path, overwrite=False)
elif ft == "gaussian-out" and (fmt == "gaussian" or (fmt == "any" and not path.endswith("rkf"))):
path = gaussian_output_to_ams(path, overwrite=False)
except Exception:
# several types of exceptions can be raised, e.g. StopIteration and UnicodeDecodeError
# assume that any exception means that the file was not a QE output file
pass
if not os.path.isdir(path):
if os.path.exists(path):
path = os.path.dirname(os.path.abspath(path))
elif os.path.isdir(path + ".results"):
path = path + ".results"
elif os.path.isdir(path + "results"):
path = path + "results"
else:
raise FileError("Path {} does not exist, cannot load from it.".format(path))
job = super(AMSJob, cls).load_external(path, settings, molecule, finalize)
if preferred_name is not None:
job.name = preferred_name
if job.name.endswith(".results") and len(job.name) > 8:
job.name = job.name[:-8]
return job
[docs] @staticmethod
def settings_to_mol(s: Settings) -> Dict[str, Molecule]:
"""Pop the `s.input.ams.system` block from a settings instance and convert it into a dictionary of molecules.
The provided settings should be in the same style as the ones produced by the SCM input parser.
Dictionary keys are taken from the header of each system block.
The existing `s.input.ams.system` block is removed in the process, assuming it was present in the first place.
"""
from scm.plams.tools.units import Units
def get_list(s):
if "_1" in s and not "_2" in s and isinstance(s._1, list):
return s._1
else:
i = 1
l = []
while "_" + str(i) in s:
l.append(s["_" + str(i)])
i += 1
return l
def read_mol(settings_block: Settings) -> Molecule:
"""Retrieve single molecule from a single `s.input.ams.system` block."""
if "geometryfile" in settings_block:
mol = Molecule(settings_block.geometryfile)
else:
mol = Molecule()
if "_h" in settings_block.atoms:
conv = Units.conversion_ratio(settings_block.atoms._h.strip("[]"), "Angstrom")
else:
conv = 1.0
for atom in get_list(settings_block.atoms):
# Extract arguments for Atom()
symbol, x, y, z, *suffix = atom.split(maxsplit=4)
coords = conv * float(x), conv * float(y), conv * float(z)
try:
at = Atom(symbol=symbol, coords=coords)
except PTError: # It's either a ghost atom and/or an atom with a custom name
kwargs = {}
if symbol.startswith("Gh."): # Ghost atom
kwargs["ghost"] = True
_, symbol = symbol.split(".", maxsplit=1)
if "." in symbol: # Atom with a custom name
symbol, kwargs["name"] = symbol.split(".", maxsplit=1)
at = Atom(symbol=symbol, coords=coords, **kwargs)
if suffix:
at.properties.soft_update(AMSJob._atom_suffix_to_settings(suffix[0]))
mol.add_atom(at)
# Set the lattice vector if applicable
if get_list(settings_block.lattice):
if "_h" in settings_block.lattice:
conv = Units.conversion_ratio(settings_block.lattice._h.strip("[]"), "Angstrom")
else:
conv = 1.0
mol.lattice = [tuple(conv * float(j) for j in i.split()) for i in get_list(settings_block.lattice)]
# Add bonds
for bond in get_list(settings_block.bondorders):
_at1, _at2, _order, *suffix = bond.split(maxsplit=3)
at1, at2, order = mol[int(_at1)], mol[int(_at2)], float(_order)
plams_bond = Bond(at1, at2, order=order)
if suffix:
plams_bond.properties.suffix = suffix[0]
mol.add_bond(plams_bond)
# Set the molecular charge
if settings_block.charge:
mol.properties.charge = settings_block.charge
# Set the region info (used in ACErxn)
if settings_block.region:
for s_reg in settings_block.region:
if not "_h" in s_reg.keys():
raise JobError("Region block requires a header!")
if s_reg.properties:
mol.properties.regions[s_reg._h] = s_reg.properties._1
# Apply the supercell keyword
if "supercell" in settings_block:
if isinstance(settings_block.supercell, str):
mol = mol.supercell(*[int(d) for d in settings_block.supercell.split()])
else:
mol = mol.supercell(*settings_block.supercell)
for at in mol:
del at.properties.supercell
mol.properties.name = str(settings_block.get("_h", ""))
return mol
# Raises a KeyError if the `system` key is absent
with s.suppress_missing():
try:
settings_list = s.input.ams.system
if not isinstance(settings_list, list):
settings_list = [settings_list]
except KeyError: # The block s.input.ams.system is absent
return None
# Create a new dictionary with system headers as keys and molecules as values
moldict: Dict[str, Molecule] = {}
for settings_block in settings_list:
key = (
str(settings_block._h) if ("_h" in settings_block) else ""
) # Empty string used as default system name.
if key in moldict:
raise KeyError(f"Duplicate system headers found in s.input.ams.system: {repr(key)}")
moldict[key] = read_mol(settings_block)
used_properties = ["atoms", "bondorders", "lattice", "charge", "region", "supercell"]
for used_property in used_properties:
if used_property in settings_block:
settings_block.pop(used_property)
s.input.ams.System = [system for system in settings_block if system]
if not len(s.input.ams.System):
del s.input.ams.System
return moldict
@staticmethod
def _atom_suffix_to_settings(suffix) -> Settings:
def is_int(s):
if "." in s:
return False
try:
return int(s) == float(s)
except ValueError:
return False
def is_float(s):
try:
float(s)
return True
except ValueError:
return False
import shlex
tokens = shlex.split(suffix)
# Remove possible spaces around = signs
i = 0
while i < len(tokens):
if tokens[i].endswith("="):
tokens[i] += tokens.pop(i + 1)
elif tokens[i].startswith("="):
tokens[i - 1] += tokens.pop(i)
else:
i += 1
properties = Settings()
for i, t in enumerate(tokens):
key, _, val = t.partition("=")
if not val:
# Token that is not a key=value pair? Let's accumulate those in the plain text suffix.
if "suffix" in properties:
properties.suffix += " " + key
else:
properties.suffix = key
continue
else:
# We have a value. Let's make an educated guess for its type.
if val.lower() == "true":
val = True
elif val.lower() == "false":
val = False
elif is_int(val):
val = int(val)
elif is_float(val):
val = float(val)
elif "," in val:
elem = val.split(",")
if key.lower() == "region":
# For regions we will output a set instead of a list
val = set(elem)
elif all(is_int(i) for i in elem):
val = [int(i) for i in elem]
elif all(is_float(f) for f in elem):
val = [float(f) for f in elem]
else:
# Anything else will come out as a list of strings
val = elem
properties.set_nested(key.split("."), val)
return properties
@staticmethod
def _add_region(atom: Atom, name: str):
"""
Converts atom.properties.region to a set if it isn't already a set.
Adds the name to the set.
If name is None, then only the conversion of atom.properties.region is performed.
"""
if "region" not in atom.properties:
atom.properties.region = set()
if isinstance(atom.properties.region, str):
atom.properties.region = set([atom.properties.region])
if not isinstance(atom.properties.region, set):
atom.properties.region = set(atom.properties.region)
if name is None:
return
atom.properties.region.add(name)
def extract_engine_settings(settings: Settings) -> Settings:
"""
Extracts the engine settings from a Settings object.
Example:
s.input.ams.Task = "singlepoint"
s.runscript.nproc = 1
s.input.ForceField.Type = "UFF"
extract_engine_settings(s) will return Settings with s.input.ForceField.Type = "UFF"
"""
ret = Settings()
if "input" in settings:
for e in settings.input:
if e != "ams":
ret.input[e] = settings.input[e].copy()
break
return ret
[docs]def hybrid_committee_engine_settings(settings_list: List[Settings]) -> Settings:
"""
Creates the Settings for an AMS Hybrid engine that takes the average of all subengines as the prediction (for energies and forces).
settings_list: list of Settings
The Settings (at the top level) for each subengine.
Returns: Settings (at the top level) for a Hybrid engine.
"""
def get_partial_settings(x: int) -> Settings:
original_settings = settings_list[x]
pure_settings = None
pure_engine_name = ""
for e in original_settings.input:
if e != "ams":
pure_engine_name = e
pure_settings = original_settings.input[e]
break
assert pure_settings is not None
pure_settings._h = f"{pure_engine_name} Engine{x+1}"
return pure_settings
N = len(settings_list)
s = Settings()
s.input.Hybrid.Engine = [get_partial_settings(x) for x in range(N)]
s.input.Hybrid.Energy.Term = [f"Factor={1/N} Region=* UseCappingAtoms=No EngineID=Engine{x+1}" for x in range(N)]
s.input.Hybrid.Committee.Enabled = "Yes"
s.runscript.preamble_lines = ["export OMP_NUM_THREADS=1"]
return s