import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from natsort import natsorted
from watchdog.events import FileModifiedEvent, PatternMatchingEventHandler
from watchdog.observers import Observer
from scm.akfreader import AKFReader
from scm.input_classes import drivers
from scm.libbase import KFError, KFFile
from scm.params.plams.paramsjob import ParAMSJob
from scm.plams import AMSJob, Molecule, Results, Settings, SingleJob, config, log
from scm.plams.core.errors import FileError
class SimpleActiveLearningJobLogTailHandler(PatternMatchingEventHandler):
def __init__(self, job, jobmanager):
super().__init__(
patterns=[os.path.join(jobmanager.workdir, f"{job.name}*", "simple_active_learning.log")],
ignore_patterns=["*.rkf", "*.out"],
ignore_directories=True,
case_sensitive=True,
)
self._job = job
self._seekto = 0
def on_any_event(self, event):
if (
self._job.path is not None
and event.src_path == os.path.join(self._job.path, "simple_active_learning.log")
and isinstance(event, FileModifiedEvent)
):
try:
with open(event.src_path, "r") as f:
f.seek(self._seekto)
while True:
line = f.readline()
if not line:
break
print(line.rstrip())
self._seekto = f.tell()
except FileNotFoundError:
self._seekto = 0
[docs]class SimpleActiveLearningResults(Results):
"""
Results class for SimpleActiveLearningJob
"""
[docs] def get_errormsg(self) -> str:
"""Returns the error message of this calculation if any were raised.
:return: String containing the error message.
:rtype: str
"""
return self.job.get_errormsg()
[docs] def rkfpath(self, file="simple_active_learning") -> str:
"""Returns path to simple_active_learning.rkf
:return: Path to simple_active_learning.rkf
:rtype: str
"""
if file not in ["simple_active_learning", "ams"]:
raise ValueError(
f"Unknown file argument to rkfpath(), expected one of simple_active_learning or ams: {file}"
)
return str((Path(self.job.path) / f"{file}.rkf").resolve())
def recreate_molecule(self) -> Dict[str, Molecule]:
"""Obtain the input molecule(s) used to create this result from the rkf file associated with this result.
:return: The input molecule(s).
:rtype: Dict[str, Molecule]
"""
molecule = SimpleActiveLearningJob.from_rkf(self.rkfpath()).molecule
if isinstance(molecule, Molecule):
molecule = {"": molecule}
return molecule
def recreate_settings(self) -> Settings:
"""Obtain the original input used to create this result from the rkf file associated with this result.
:return: The original input Settings.
:rtype: Settings
"""
return SimpleActiveLearningJob.from_rkf(self.rkfpath()).settings
[docs] def readrkf(self, section: str, variable: str):
"""Reads simple_active_learning.rkf"""
return AKFReader(self.rkfpath()).read(f"{section}%{variable}")
def _get_directory(
self,
suffix: str,
allow_extra: bool,
extra: Optional[str] = None,
step: Optional[int] = None,
attempt: Optional[int] = None,
) -> str:
if step is None and attempt is not None:
raise ValueError(f"Must specify both step and attempt. Got {step=} {attempt=}")
if step is not None and attempt is None:
raise ValueError(f"Must specify both step and attempt. Got {step=} {attempt=}")
if allow_extra and not extra:
raise ValueError("Must specify extra if allow_extra=True")
if step is not None and attempt is not None:
sas = Path(self.job.path) / f"step{step}_attempt{attempt}_{suffix}"
if not sas.exists():
raise FileNotFoundError(f"Couldn't find {sas}")
return str(sas)
if allow_extra and extra:
fps = Path(self.job.path) / extra
if fps.exists():
return str(fps)
# try to autodetect
d = None
for d in natsorted(Path(self.job.path).glob(f"step*_attempt*_{suffix}")):
pass
if d is not None:
return str(d)
initial_path = Path(self.job.path) / f"initial_{suffix}"
if initial_path.exists():
return str(initial_path)
loaded_path = Path(self.job.path) / f"loaded_{suffix}"
if loaded_path.exists():
return str(loaded_path)
raise FileNotFoundError(f"Couldn't find any {suffix} directory")
[docs] def get_simulation_directory(
self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True
) -> str:
"""
Returns the absolute path to a simulation directory.
step: optional, int
The step number. If not specified will be autodetected to the last step.
attempt: optional, int
The attempt number. If not specified will be autodetected to the last attempt.
allow_final: bool
If True and step=None and attempt=None, then it will return final_production_simulation if it exists.
"""
return self._get_directory(
"simulation", allow_extra=allow_final, extra="final_production_simulation", step=step, attempt=attempt
)
[docs] def get_main_molecule(self, allow_final: bool = True) -> Union[Molecule, None]:
"""Returns AMSResults.get_main_molecule() on the main simulation job.
:param allow_final: _description_, defaults to True
:type allow_final: bool, optional
:return: _description_
:rtype: Union[Molecule, Dict[str, Molecule], None]
"""
d = self.get_simulation_directory(allow_final=allow_final)
job = AMSJob.load_external(d)
return job.results.get_main_molecule()
[docs] def get_params_results_directory(
self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True
) -> str:
"""
Returns the absolute path to a ParAMS results directory that can be loaded
with ParAMSJob.load_external or used as LoadModel in ParAMS or SimpleActiveLearning input.
step: optional, int
The step number. If not specified will be autodetected to the last step.
attempt: optional, int
The attempt number. If not specified will be autodetected to the last attempt.
allow_final: bool
If True and step=None and attempt=None, then it will return final_training/results if it exists.
"""
params_dir = self._get_directory(
"training", allow_extra=allow_final, extra="final_training", step=step, attempt=attempt
)
results_dir = Path(params_dir) / "results"
if not results_dir.exists():
raise FileNotFoundError(f"The ParAMS directory {params_dir} does not contain a results folder.")
return str(results_dir)
[docs] def get_params_job(
self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True
) -> ParAMSJob:
"""Returns the latest ParAMSJob. This can be used to analyze results from the parametrization.
:param step: _description_, defaults to None
:type step: Optional[int], optional
:param attempt: _description_, defaults to None
:type attempt: Optional[int], optional
:param allow_final: _description_, defaults to True
:type allow_final: bool, optional
:return: _description_
:rtype: ParAMSJob
"""
return ParAMSJob.load_external(
self.get_params_results_directory(step=step, attempt=attempt, allow_final=allow_final)
)
[docs] def get_production_engine_settings(
self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True
) -> Settings:
"""Returns the production engine settings from the ParAMSJob"""
return self.get_params_job(
step=step, attempt=attempt, allow_final=allow_final
).results.get_production_engine_settings()
[docs] def get_reference_data_directory(
self,
step: Optional[int] = None,
attempt: Optional[int] = None,
) -> str:
"""
Returns the absolute path to a reference data directory that can be opened in the ParAMS GUI or which lets you
initialize a ParAMSJob with ParAMSJob.from_yaml()
step: optional, int
The step number. If not specified will be autodetected to the last step.
attempt: optional, int
The attempt number. If not specified will be autodetected to the last attempt.
allow_final: bool
If True and step=None and attempt=None, then it will return final_training/results if it exists.
"""
return self._get_directory("reference_data", allow_extra=False, step=step, attempt=attempt)
[docs]class SimpleActiveLearningJob(SingleJob):
"""
PLAMS Job class for running Simple Active Learning.
This class inherits from the PLAMS SingleJob class. For usage, see the SingleJob documentation.
If you supply a Settings object to the constructor, it will be converted to a
PISA (Python Input System for AMS) object.
Attributes:
* ``input``: an alias for self.settings.input
"""
_result_type = SimpleActiveLearningResults
results: SimpleActiveLearningResults
_command = "simple_active_learning"
_json_definitions = "simple_active_learning"
_subblock_end = "End"
[docs] def __init__(
self,
name: str = "simple_active_learning_job",
driver: Optional[drivers.SimpleActiveLearning] = None,
settings: Optional[Settings] = None,
molecule: Optional[Union[Molecule, Dict[str, Molecule]]] = None,
**kwargs,
):
"""
Initialize the SimpleActiveLearningJob.
name : str
The name of the job
driver : scm.input_classes.drivers.SimpleActiveLearning
PISA object describing the input to the SimpleActiveLearning program
settings: scm.plams.Settings
All settings for the job. Input settings in the PLAMS settings format under ``settings.input`` are
automatically converted to the PISA format. You can specify ``settings.runscript.nproc`` to
set the total number of cores to run on.
molecule: scm.plams.Molecule or Dict[str, scm.plams.Molecule]
The initial system in PLAMS Molecule format, or if the simulation
requires multiple input system, given as a dictionary where the
main system has an empty string ``""`` as the key.
"""
super().__init__(name=name, settings=settings, molecule=molecule, **kwargs)
if driver is not None:
self.settings.input = driver
elif self.settings.input:
text_input = AMSJob(settings=self.settings).get_input()
self.settings.input = drivers.SimpleActiveLearning.from_text(text_input)
else:
self.settings.input = drivers.SimpleActiveLearning()
[docs] @classmethod
def load_external(cls, path: Union[str, Path], finalize: bool = False) -> "SimpleActiveLearningJob":
"""Load a previous SimpleActiveLearning job from disk.
:param path: A reactions discovery results folder.
:type path: Union[str, Path]
:param finalize: See SingleJob, defaults to False
:type finalize: bool, optional
:raises FileError: When the path does not exist.
:return: An initialized SimpleActiveLearningJob
:rtype: SimpleActiveLearningJob
"""
path = Path(path)
if not os.path.isdir(path):
if os.path.exists(path):
path = os.path.dirname(os.path.abspath(path))
elif os.path.isdir(path / ".results"):
path = path / ".results"
elif os.path.isdir(path / "results"):
path = path / "results"
else:
raise FileError("Path {} does not exist, cannot load from it.".format(path))
job = super(SimpleActiveLearningJob, cls).load_external(path, finalize=finalize)
if job.name.endswith(".results") and len(job.name) > 8:
job.name = job.name[:-8]
return job
[docs] @classmethod
def from_rkf(cls, path: str) -> "SimpleActiveLearningJob":
"""Initialize a job from a simple_active_learning.rkf file.
:param path: Path to a simple_active_learning.rkf file
:type path: str
:return: A new SimpleActiveLearningJob instance based on the information found in path.
:rtype: SimpleActiveLearningJob
"""
with KFFile(path) as kf:
text_input = kf.read_string("General", "user input")
return cls.from_input(text_input)
[docs] @classmethod
def restart_from(
cls,
job: Union["SimpleActiveLearningJob", str, Path],
name: str = "simple_active_learning_job",
molecule: Optional[Molecule] = None,
keep_initial_reference_data_settings: bool = False,
job_prefix: Optional[str] = None,
) -> "SimpleActiveLearningJob":
"""Returns a SimpleActiveLearningJob with LoadModel set appropriately.
:param job: A previously finished SimpleActiveLearningJob (or path to its results folder)
:type job: Union[SimpleActiveLearningJob, str, Path]
:param molecule: Input molecule for the new job. If None, use the final molecule from ``job``.
:type molecule: Optional[Molecule], default None
:param keep_initial_reference_data_settings: Whether to keep the ActiveLearning%InitialReferenceData block from the original job.
:type keep_initial_reference_data_settings: bool, default False
:return: Returns a new SimpleActiveLearningJob.
:rtype: SimpleActiveLearningJob
"""
if isinstance(job, (str, Path)):
job = cls.load_external(job)
if not isinstance(job, cls):
raise ValueError(f"Argument {job=} not of expected type {cls}")
molecule = molecule or job.results.get_main_molecule()
ret = cls(driver=job.settings.input, molecule=molecule, name=name)
ret.settings.input.MachineLearning.LoadModel = job.results.get_params_results_directory()
if job_prefix is not None:
ret.settings.input.ActiveLearning.JobPrefix = job_prefix
if not keep_initial_reference_data_settings:
try:
del ret.settings.input.ActiveLearning.InitialReferenceData
except AttributeError:
pass
# ret.settings.input.ActiveLearning.InitialReferenceData.Generate.M3GNetShortMD.Enabled = False
# ret.settings.input.ActiveLearning.InitialReferenceData.Generate.ReferenceMD.Enabled = False
# del ret.settings.input.ActiveLearning.InitialReferenceData.Load.Directory
# ret.settings.input.ActiveLearning.InitialReferenceData.Load.FromPreviousModel = True
return ret
@staticmethod
def _extract_mol_from_pisa(pisa: drivers.SimpleActiveLearning) -> Union[Molecule, Dict[str, Molecule]]:
"""Remove a molecule from a System block in the SimpleActiveLearning PISA object and return it as molecule(s)"""
settings = Settings()
settings.input.ams.system = pisa.to_settings().system
molecule = AMSJob.settings_to_mol(settings)
object.__setattr__(pisa, "System", pisa._System("System"))
return molecule
[docs] def get_errormsg(self) -> str:
"""Returns the contents of the jobname.err file if it exists. If the file does not exist an
empty string is returned.
:return: The error message
:rtype: str
"""
try:
with open(self.results["$JN.err"], "r") as err:
errlines = err.read()
return errlines
except FileNotFoundError:
return ""
[docs] def get_runscript(self) -> str:
"""
Generates the runscript. Use ``self.settings.runscript.preamble_lines = ['line1', 'line2']``
or similarly for ``self.settings.runscript.postamble_lines`` to set custom settings.
``self.settings.runscript.nproc`` controls the total number of cores to run on.
"""
filename = self._filename("inp")
ret = ""
for line in self.settings.runscript.get("preamble_lines", ""):
ret += f"{line}\n"
# need to use `pwd` here and not "." since "." doesn't expand
ret += f'AMS_JOBNAME="{self.name}" AMS_RESULTSDIR="`pwd`" "$AMSBIN/{self._command}" '
nproc = self.settings.runscript.get("nproc", None)
if nproc:
ret += f"-n {nproc} "
ret += f'< "{filename}"\n'
for line in self.settings.runscript.get("postamble_lines", ""):
ret += f"{line}\n"
return ret
[docs] def check(self) -> bool:
"""Returns True if "NORMAL TERMINATION" is given in the General section of simple_active_learning.rkf."""
with KFFile(self.results.rkfpath()) as kf:
termination = kf.read_string("General", "termination status")
i_am_ok = "NORMAL TERMINATION" in termination
if not i_am_ok:
return False
return True
[docs] def ok(self) -> bool:
"""Synonym for check()"""
return self.check()
[docs] def run(self, jobrunner=None, jobmanager=None, watch: bool = False, **kwargs) -> SimpleActiveLearningResults:
"""
Runs the job
"""
if watch:
if "default_jobmanager" in config:
jobmanager = config.default_jobmanager
else:
raise RuntimeError("No default jobmanager found. This probably means that PLAMS init() was not called.")
observer = Observer()
event_handler = SimpleActiveLearningJobLogTailHandler(self, jobmanager)
observer.schedule(event_handler, jobmanager.workdir, recursive=True)
observer.start()
try:
results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs)
results.wait()
finally:
observer.stop()
observer.join()
else:
results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs)
return results
@property
def input(self) -> drivers.SimpleActiveLearning:
"""PISA format input"""
return self.settings.input
@input.setter
def input(self, input: drivers.SimpleActiveLearning):
self.settings.input = input
def _serialize_input(self, s):
return AMSJob._serialize_input(self, s)
def _serialize_molecule(self):
return AMSJob._serialize_molecule(self)
@staticmethod
def _atom_symbol(s):
return AMSJob._atom_symbol(s)
@staticmethod
def _atom_suffix(s):
return AMSJob._atom_suffix(s)