Source code for scm.plams.core.jobmanager

import os
import threading
import re
import shutil
try:
    import dill as pickle
except ImportError:
    import pickle

from os.path import join as opj

from .basejob import MultiJob
from .errors import PlamsError, FileError
from .functions import config, log

__all__ = ['JobManager']



[docs]class JobManager: """Class responsible for jobs and files management. Every instance has the following attributes: * ``foldername`` -- the working folder name. * ``workdir`` -- the absolute path to the working folder. * ``logfile`` -- the absolute path to the logfile. * ``input`` -- the absolute path to the copy of the input file in the working folder. * ``settings`` -- a |Settings| instance for this job manager (see below). * ``jobs`` -- a list of all jobs managed with this instance (in order of |run| calls). * ``names`` -- a dictionary with names of jobs. For each name an integer value is stored indicating how many jobs with that basename have already been run. * ``hashes`` -- a dictionary working as a hash-table for jobs. The *path* argument should be be a path to a directory inside which the main working folder will be created. If ``None``, the directory from where the whole script was executed is used. The ``foldername`` attribute is initially set to the *folder* argument. If such a folder already exists, the suffix ``.002`` is appended to *folder* and the number is increased (``.003``, ``.004``...) until a non-existsing name is found. If *folder* is ``None``, the name ``plams_workdir`` is used, followed by the same procedure to find a unique ``foldername``. The ``settings`` attribute is directly set to the value of *settings* argument (unlike in other classes where they are copied) and it should be a |Settings| instance with the following keys: * ``hashing`` -- chosen hashing method (see |RPM|). * ``counter_len`` -- length of number appended to the job name in case of a name conflict. * ``remove_empty_directories`` -- if ``True``, all empty subdirectories of the working folder are removed on |finish|. """
[docs] def __init__(self, settings, path=None, folder=None): self.settings = settings self.jobs = [] self.names = {} self.hashes = {} self._register_lock = threading.RLock() if path is None: ams_resultsdir = os.getenv("AMS_RESULTSDIR") if not ams_resultsdir is None and os.path.isdir(ams_resultsdir): self.path = ams_resultsdir else: self.path = os.getcwd() elif os.path.isdir(path): self.path = os.path.abspath(path) else: raise PlamsError('Invalid path: {}'.format(path)) basename = os.path.normpath(folder) if folder else 'plams_workdir' self.foldername = basename n = 2 while os.path.exists(opj(self.path, self.foldername)): self.foldername = basename + '.' + str(n).zfill(3) n += 1 self.workdir = opj(self.path, self.foldername) scm_logfile = os.getenv("SCM_LOGFILE") if not scm_logfile is None and os.path.isfile(scm_logfile): self.logfile = scm_logfile else: self.logfile = opj(self.workdir, 'logfile') self.input = opj(self.workdir, 'input') os.mkdir(self.workdir)
[docs] def load_job(self, filename): """Load previously saved job from *filename*. *Filename* should be a path to a ``.dill`` file in some job folder. A |Job| instance stored there is loaded and returned. All attributes of this instance removed before pickling are restored. That includes ``jobmanager``, ``path`` (the absolute path to the folder containing *filename* is used) and ``default_settings`` (a list containing only ``config.job``). See |pickling| for details. """ def setstate(job, path, parent=None): job.parent = parent job.jobmanager = self job.default_settings = [config.job] job.path = path if isinstance(job, MultiJob): job._lock = threading.Lock() for child in job: setstate(child, opj(path, child.name), job) for otherjob in job.other_jobs(): setstate(otherjob, opj(path, otherjob.name), job) job.results.refresh() h = job.hash() if h is not None: self.hashes[h] = job for key in job._dont_pickle: job.__dict__[key] = None if os.path.isfile(filename): filename = os.path.abspath(filename) else: raise FileError('File {} not present'.format(filename)) path = os.path.dirname(filename) with open(filename, 'rb') as f: try: job = pickle.load(f) except Exception as e: log("Unpickling of {} failed. Caught the following Exception:\n{}".format(filename, e), 1) return None setstate(job, path) return job
[docs] def remove_job(self, job): """Remove *job* from the job manager. Forget its hash.""" with self._register_lock: if job in self.jobs: self.jobs.remove(job) job.jobmanager = None h = job.hash() if h in self.hashes and self.hashes[h] == job: del self.hashes[h] if isinstance(job, MultiJob): for child in job: self.remove_job(child) for otherjob in job.other_jobs(): self.remove_job(otherjob) shutil.rmtree(job.path)
[docs] def _register(self, job): """Register the *job*. Register job's name (rename if needed) and create the job folder. If a job with the same name was already registered, *job* is renamed by appending consecutive integers. The number of digits in the appended number is defined by the ``counter_len`` value in ``settings``. Note that jobs whose name already contains a counting suffix, e.g. ``myjob.002`` will have the suffix stripped as the very first step. """ with self._register_lock: log('Registering job {}'.format(job.name), 7) job.jobmanager = self # If the name ends with the counting suffix, e.g. ".002", remove it. # The suffix is just not part of a legitimate job name and users will have to live with it potentially changing. orgfname = job._full_name() job.name = re.sub(r"(\.\d{%i})+$"%(self.settings.counter_len), "", job.name) fname = job._full_name() if fname in self.names: self.names[fname] += 1 job.name += '.'+str(self.names[fname]).zfill(self.settings.counter_len) fname = job._full_name() else: self.names[fname] = 1 if fname != orgfname: log('Renaming job {} to {}'.format(orgfname, fname), 3) if job.path is None: if job.parent: job.path = opj(job.parent.path, job.name) else: job.path = opj(self.workdir, job.name) os.mkdir(job.path) self.jobs.append(job) job.status = 'registered' log('Job {} registered'.format(job.name), 7)
[docs] def _check_hash(self, job): """Calculate the hash of *job* and, if it is not ``None``, search previously run jobs for the same hash. If such a job is found, return it. Otherwise, return ``None``""" h = job.hash() if h is not None: with self._register_lock: if h in self.hashes: prev = self.hashes[h] log('Job {} previously run as {}, using old results'.format(job.name, prev.name), 1) return prev else: self.hashes[h] = job return None
[docs] def _clean(self): """Clean all registered jobs according to the ``save`` parameter in their ``settings``. If ``remove_empty_directories`` is ``True``, traverse the working directory and delete all empty subdirectories.""" log('Cleaning job manager', 7) for job in self.jobs: job.results._clean(job.settings.save) if self.settings.remove_empty_directories: for root, dirs, files in os.walk(self.workdir, topdown=False): for dirname in dirs: fullname = opj(root, dirname) if not os.listdir(fullname): os.rmdir(fullname) log('Job manager cleaned', 7)