import os
import functools
import threading
import time
from os.path import join as opj
from subprocess import DEVNULL, PIPE
from .basejob import SingleJob
from .errors import PlamsError
from .functions import config, log
from .private import saferun
from .settings import Settings
__all__ = ['JobRunner', 'GridRunner']
[docs]def _in_thread(func):
"""Decorator for an instance method. If ``parallel`` attribute of given instance is ``True``, run decorated method in a separate :class:`~threading.Thread`. This thread is usually a daemon thread, the decision is based on ``config.daemon_threads`` entry."""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if self.parallel:
t = threading.Thread(name='plamsthread', target=func, args=(self,)+args, kwargs=kwargs)
t.daemon = config.daemon_threads
t.start()
else:
func(self, *args, **kwargs)
return wrapper
[docs]def _limit(func):
"""Decorator for an instance method. If ``semaphore`` attribute of given instance is not ``None``, use this attribute to wrap decorated method via :ref:`with<with-locks>` statement."""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if self.semaphore:
with self.semaphore:
return func(self, *args, **kwargs)
else:
return func(self, *args, **kwargs)
return wrapper
#===========================================================================
#===========================================================================
#===========================================================================
[docs]class JobRunner(metaclass=_MetaRunner):
"""Class defining the basic job runner interface. Instances of this class represent local job runners -- job runners that execute computational jobs on the current machine.
The goal of the job runner is to take care of two important things -- parallelization and runscript execution:
* When the method |run| of any |Job| instance is executed, the control, after some initial preparations, is passed to a |JobRunner| instance. This |JobRunner| instance decides if a separate thread should be spawned for the job or if the execution should proceed in the current thread. This decision is based on the ``parallel`` attribute which can be set on |JobRunner| creation. There are no separate classes for serial and parallel job runner, both cases are covered by |JobRunner| depending on the ``parallel`` parameter.
* If the executed job is an instance of |SingleJob|, it creates a runscript which contains most of the actual computational work (usually it's just an execution of some external binary). The runscript is then submitted to a |JobRunner| instance using its :meth:`call` method. This method executes the runscript as a separate subprocess and takes care of output and error streams handling, setting a proper working directory etc.
For a job runner with parallel execution enabled the number of simultaneously running jobs can be limited using the *maxjobs* parameter. If *maxjobs* is 0, no limit is enforced. If *parallel* is ``False``, *maxjobs* is ignored. If *parallel* is ``True`` and *maxjobs* is a positive integer, a :class:`BoundedSemaphore<threading.BoundedSemaphore>` of that size is used to limit the number of simultaneously running :meth:`call` methods.
A |JobRunner| instance can be passed to |run| with a keyword argument ``jobrunner``. If this argument is omitted, the instance stored in ``config.default_jobrunner`` is used.
"""
def __init__ (self, parallel=False, maxjobs=0):
self.parallel = parallel
self.semaphore = threading.BoundedSemaphore(maxjobs) if maxjobs else None
[docs] def call(self, runscript, workdir, out, err, runflags):
"""call(runscript, workdir, out, err, runflags)
Execute the *runscript* in the folder *workdir*. Redirect output and error streams to *out* and *err*, respectively.
Arguments *runscript*, *workdir*, *out* and *err* should be strings with paths to corresponding files or folders.
*runflags* is a |Settings| instance containing the `run` branch of running job's settings. The basic job runner defined here ignores them, but they can be useful in |JobRunner| subclasses (see :meth:`GridRunner.call`).
Returns an integer with the exit code returned by the *runscript*.
This method can be safely overridden in |JobRunner| subclasses. For example, in |GridRunner| it submits the runscript to a queueing system instead of executing it locally.
.. note::
This method is used automatically during |run| and should never be explicitly called in your script.
"""
log('Executing {}'.format(runscript), 5)
command = ['./'+runscript] if os.name == 'posix' else ['sh', runscript]
if out is not None:
with open(opj(workdir, err), 'w') as e, open(opj(workdir, out), 'w') as o:
process = saferun(command, cwd=workdir, stderr=e, stdout=o)
else:
with open(opj(workdir, err), 'w') as e:
process = saferun(command, cwd=workdir, stderr=e)
log('Execution of {} finished with returncode {}'.format(runscript, process.returncode), 5)
return process.returncode
@_in_thread
[docs] def _run_job(self, job, jobmanager):
"""_run_job(job, jobmanager)
This method aggregates the parts of :ref:`job-life-cycle` that are supposed to be run in a separate thread in case of parallel job execution. It is wrapped with :func:`_in_thread` decorator.
This method should not be overridden.
"""
if job._prepare(jobmanager):
job._execute(self)
job._finalize()
#===========================================================================
#===========================================================================
#===========================================================================
[docs]class GridRunner(JobRunner):
"""Subclass of |JobRunner| that submits the runscript to a queueing system instead of executing it locally. Besides two new keyword arguments (*grid* and *sleepstep*) it behaves and is meant to be used just like a regular |JobRunner|.
.. note::
The default value of the *parallel* argument is ``True``, unlike in the regular |JobRunner|.
There are many different queueing systems that are popular nowadays (for example: TORQUE, SLURM, OGE). Usually they use different commands for submitting jobs or checking the queue status. |GridRunner| class tries to build a common interface to these systems. The commands used to communicate with the queueing system are not hard-coded, but rather taken from a |Settings| instance. Thanks to that the user has almost full control over the behavior of a |GridRunner| instance and the behavior can be ajdusted dynamically.
The behavior of a |GridRunner| instance is determined by the contents of a |Settings| instance stored in its ``settings`` attribute. That |Settings| instance can be manually supplied by the user or taken from a collection of predefined instances stored as branches of ``config.gridrunner``. The adjustment is done with the *grid* parameter which should be a string or a |Settings| instance. If it's a string, it has to be a key occurring in ``config.gridrunner`` or ``'auto'`` for autodetection. For example, if ``grid='slurm'`` is passed, ``config.gridrunner.slurm`` is used as settings. If ``grid='auto'`` then entries present in ``config.gridrunner`` are tested and the first one that works (its submit command is present on your system) is chosen. When a |Settings| instance is passed as *grid*, it is directly used as ``settings``.
Currently two predefined schemes are available (see :ref:`plams-defaults`): ``slurm`` for SLURM and ``pbs`` for queueing systems following PBS syntax (PBS, TORQUE, Oracle Grid Engine etc.).
The settings of |GridRunner| should have the following structure:
* ``output`` -- flag for specifying the output file path.
* ``error`` -- flag for specifying the error file path.
* ``workdir`` -- flag for specifying path to the working directory.
* ``commands.submit`` -- submit command.
* ``commands.check`` -- queue status check command.
* ``commands.getid`` -- function extracting submitted job's ID from the output of the submit command.
* ``commands.running`` -- function extracting a list of all running jobs from the output of queue check command
* ``commands.special`` -- branch storing definitions of special |run| keyword arguments.
See :meth:`call` for more details and examples.
The *sleepstep* parameter defines how often the queue check is performed. It should be a numerical value telling how many seconds should the interval between two consecutive checks last. If ``None`` is used, the global default from ``config.sleepstep`` is taken.
.. note::
Usually queueing systems are configured in such a way that output of your calculation is captured somewhere else and copied to the location indicated by the output flag only when the job is finished. Because of that it is not possible to have a peek at your output while your job is running (for example, to see if your calculation is going well). This limitation can be circumvented with ``myjob.settings.runscript.stdout_redirect`` flag. If set to ``True``, the output redirection will not be handled by the queueing system, but rather placed in the runscript using the shell redirection ``>``. That forces the output file to be created directly in *workdir* and updated live as the job proceeds.
"""
def __init__(self, grid='auto', sleepstep=None, parallel=True, maxjobs=0):
JobRunner.__init__(self, parallel=parallel, maxjobs=maxjobs)
self.sleepstep = sleepstep or config.sleepstep
self._active_jobs = {}
self._active_lock = threading.Lock()
self._mainlock = threading.Lock()
if grid == 'auto':
self.settings = self._autodetect()
elif grid in config.gridrunner:
self.settings = config.gridrunner[grid]
try:
saferun([self.settings.commands.submit, '--version'], stdout=DEVNULL, stderr=DEVNULL)
except OSError:
raise PlamsError('GridRunner: {} command not found'.format(self.settings.commands.submit))
elif isinstance(grid, Settings):
self.settings = grid
else:
raise PlamsError("GridRunner: invalid 'grid' argument. 'grid' should be either a Settings instance (see documentations for details) or a string occurring in config.gridrunner or 'auto' for autodetection")
[docs] def call(self, runscript, workdir, out, err, runflags):
"""call(runscript, workdir, out, err, runflags)
Submit *runscript* to the queueing system with *workdir* as the working directory. Redirect output and error streams to *out* and *err*, respectively. *runflags* stores varoius submit command options.
The submit command has the following structure::
<commands.submit>_<workdir>_{workdir}_<error>_{err}[_<output>_{out}][FLAGS]_{runscript}
Underscores denote spaces, parts in pointy brackets correspond to ``settings`` entries, parts in curly brackets to :meth:`call` arguments, square brackets contain optional parts. Output part is added if *out* is not ``None``. This is handled automatically based on ``runscript.stdout_redirect`` value in job's ``settings``.
``FLAGS`` part is built based on *runflags* argument, which is a |Settings| instance storing |run| keyword arguments. For every *(key,value)* pair in *runflags* the string ``_-key_value`` is appended to ``FLAGS`` **unless** the *key* is a special key occurring in ``commands.special``. In that case ``_<commands.special.key>value`` is used (mind the lack of space in between). For example, a |Settings| instance defining interaction with SLURM has the following entries::
workdir = '-D'
output = '-o'
error = '-e'
special.nodes = '-N '
special.walltime = '-t '
special.memory = '--mem='
special.queue = '-p '
commands.submit = 'sbatch'
commands.check = 'squeue'
The submit command produced by::
gr = GridRunner(parallel=True, maxjobs=4, grid='slurm')
j.run(jobrunner=gr, queue='short', nodes=2, J='something', O='')
will be:
.. code-block:: none
sbatch -D {workdir} -e {err} -o {out} -p short -N 2 -J something -O {runscript}
In certain queueing systems some flags don't have a short form with semantics ``-key value``. For example, in SLURM the flag ``--nodefile=value`` has a short form ``-F value``, but the flag ``--export=value`` does not. One can still use such a flag using the special keys logic::
gr = GridRunner(parallel=True, maxjobs=4, grid='slurm')
gr.settings.special.export = '--export='
j.run(jobrunner=gr, queue='short', export='value')
That results in the command::
sbatch -D {workdir} -e {err} -o {out} -p short --export=value {runscript}
The submit command is then executed and the output returned by it is used to determine the submitted job's ID. The value stored in ``commands.getid`` is used for that purpose. It should be a function taking a single string (the whole output of the submit command) and returning a string with job's ID.
The submitted job's ID is then added to ``_active_jobs`` dictionary, with the key being job's ID and the value being an instance of :class:`threading.Lock`. This lock is used to singal the fact that the job is finished and the thread handling it can continue. Then the :meth:`_check_queue` method starts the thread querying the queue and unlocking finished jobs.
Since it is difficult to automatically obtain job's exit code, the returned value is 0 (or 1, if the submit command failed). From |run| perspective it means that a job executed with |GridRunner| is *crashed* only if it never entered the queue (usually due to improper submit command).
.. note::
This method is used automatically during |run| and should never be explicitly called in your script.
"""
s = self.settings
cmd = ' '.join([s.commands.submit, s.workdir, workdir, s.error, err])
if out is not None:
cmd += ' '+s.output+' '+out
for k,v in runflags.items():
if k in s.special:
cmd += ' '+s.special[k]+str(v)
else:
cmd += ' -'+k+' '+str(v)
cmd += ' ' + opj(workdir,runscript)
log('Submitting {} with command {}'.format(runscript, cmd), 5)
process = saferun(cmd.split(' '), stdout=PIPE, stderr=PIPE)
subout = process.stdout.decode()
log('Output of {} submit command: {}'.format(runscript, subout), 5)
jobid = s.commands.getid(subout)
if jobid is None:
log('Submitting of {} failed. Stderr of submit command:\n{}'.format(runscript, process.stderr.decode()), 1)
return 1
log('{} submitted successfully as job {}'.format(runscript, jobid), 3)
event = threading.Event()
with self._active_lock:
self._active_jobs[jobid] = event
self._check_queue()
event.wait()
log('Execution of {} finished'.format(runscript), 5)
return 0
@_in_thread
[docs] def _check_queue(self):
"""Query the queueing system to obtain a list of currently running jobs. Check for active jobs that are not any more in the queue and release their locks. Repeat this procedure every ``sleepstep`` seconds until there are no more active jobs. The ``_mainlock`` lock ensures that there is at most one thread executing the main loop of this method at the same time."""
if self._mainlock.acquire(blocking=False):
try:
while True:
with self._active_lock:
active_jobs = set(self._active_jobs.keys())
process = saferun([self.settings.commands.check], stdout=PIPE)
output = process.stdout.decode()
running_jobs = set(self.settings.commands.running(output))
with self._active_lock:
for jobid in active_jobs - running_jobs:
self._active_jobs[jobid].set()
del self._active_jobs[jobid]
if len(self._active_jobs) == 0:
return
time.sleep(self.sleepstep)
finally:
self._mainlock.release()
[docs] def _autodetect(self):
"""Try to autodetect the type of queueing system.
The autodetection mechanism is very simple. For each entry in ``config.gridrunner`` the submit command followed by ``--version`` is executed (for example ``qsub --version``). If the execution was successful (which is indicated by the exit code 0), that queueing system is present and it is chosen. Thus if there are multiple queueing systems installed, only one of them is picked -- the one which "name" (indicated by a key in ``config.gridrunner``) is first in the lexicographical order.
Returned value is one of ``config.gridrunner`` branches. If autodetection was not successful, an exception is raised.
"""
for grid in config.gridrunner:
try:
process = saferun([config.gridrunner[grid].commands.submit, '--version'], stdout=DEVNULL, stderr=DEVNULL)
except OSError: continue
if process.returncode == 0:
log("Grid type autodetected as '{}'".format(grid), 5)
return config.gridrunner[grid]
raise PlamsError('GridRunner: Failed to autodetect grid type')