Job runners

Job runners have already been mentioned in previous chapters about jobs and results. Here we sum up all that information and introduce a basic JobRunner object together with its subclass GridRunner which is meant for interacting with queueing systems that manage resources on computer clusters.

Job runners in PLAMS are very simple objects, both from user’s perspective and in terms of internal architecture. They have no methods that are meant to be called in your scripts, apart from constructors. Job runners are supposed to be created (with some parameters adjusting their behavior) and passed to the run() method as parameters (or placed as config.default_jobrunner).

Local job runner

class JobRunner(parallel=False, maxjobs=0, maxthreads=256)[source]

Class defining the basic job runner interface. Instances of this class represent local job runners – job runners that execute computational jobs on the current machine.

The goal of the job runner is to take care of two important things – parallelization and runscript execution:

  • When the method run() of any Job instance is executed, the control, after some initial preparations, is passed to a JobRunner instance. This JobRunner instance decides if a separate thread should be spawned for the job or if the execution should proceed in the current thread. This decision is based on the parallel attribute which can be set on JobRunner creation. There are no separate classes for serial and parallel job runner, both cases are covered by JobRunner depending on the parallel parameter.
  • If the executed job is an instance of SingleJob, it creates a runscript which contains most of the actual computational work (usually it’s just an execution of some external binary). The runscript is then submitted to a JobRunner instance using its call() method. This method executes the runscript as a separate subprocess and takes care of output and error streams handling, setting a proper working directory etc.

For a job runner with parallel execution enabled the number of simultaneously running jobs can be limited using the maxjobs parameter. If maxjobs is 0, no limit is enforced. If parallel is False, maxjobs is ignored. If parallel is True and maxjobs is a positive integer, a BoundedSemaphore of that size is used to limit the number of simultaneously running call() methods.

For a parallel JobRunner the maximum amount of threads started is limited by the maxthreads argument. As each job is running in a separate thread, this number necessarily acts as an upper limit for the number of jobs that can run in parallel. If the limit is exhausted, running further jobs with this JobRunner instance will block execution until another already running job thread terminates. The maxthreads limit should be set as large as possible, but not so large as to exceed any limits imposed by the operating system.

A JobRunner instance can be passed to run() with a keyword argument jobrunner. If this argument is omitted, the instance stored in config.default_jobrunner is used.

__init__(parallel=False, maxjobs=0, maxthreads=256)[source]

Initialize self. See help(type(self)) for accurate signature.

call(runscript, workdir, out, err, runflags)[source]

Execute the runscript in the folder workdir. Redirect output and error streams to out and err, respectively.

Arguments runscript, workdir, out and err should be strings with paths to corresponding files or folders.

runflags is a Settings instance containing the run branch of running job’s settings. The basic job runner defined here ignores them, but they can be useful in JobRunner subclasses (see GridRunner.call()).

Returns an integer with the exit code returned by the runscript.

This method can be safely overridden in JobRunner subclasses. For example, in GridRunner it submits the runscript to a queueing system instead of executing it locally.

Note

This method is used automatically during run() and should never be explicitly called in your script.

call_function(function, args)[source]

Execute the function that needs to be scheduled by the jobrunner.

This is wrapped by the _limit() function, so that no more than maxjobs processes can run at the same time

_run_job(job, jobmanager)[source]

This method aggregates the parts of Running a job that are supposed to be run in a separate thread in case of parallel job execution. It is wrapped with _in_limited_thread() decorator.

This method should not be overridden.

_run_job_with_args(job, jobmanager, args, args_fin)[source]

_run_job(job, jobmanager) This method aggregates the parts of Running a job that are supposed to be run in a separate thread in case of parallel job execution. It is wrapped with _in_limited_thread() decorator.

This method should not be overridden.

Technical

Similarly to the Results class, the proper behavior of JobRunner and its subclasses (also the ones defined by the user) is ensured using a metaclass. For the sake of completeness we present here a brief specification of all involved elements:

class _MetaRunner[source]

Metaclass for JobRunner. During an instance creation wrap the call() method with _limit() decorator which enforces a limit on the number of simultaneous call() calls.

static __new__(meta, name, bases, dct)[source]

Create and return a new object. See help(type) for accurate signature.

_limit(func)[source]

Decorator for an instance method. If semaphore attribute of given instance is not None, use this attribute to wrap decorated method via with statement.

_in_thread(func)[source]

Decorator for an instance method. If parallel attribute of given instance is True, run decorated method in a separate Thread. This thread is usually a daemon thread, the decision is based on config.daemon_threads entry.

Remote job runner

class GridRunner(grid='auto', sleepstep=5, parallel=True, maxjobs=0)[source]

Subclass of JobRunner that submits the runscript to a queueing system instead of executing it locally. Besides two new keyword arguments (grid and sleepstep) it behaves and is meant to be used just like a regular JobRunner.

Note

The default value of the parallel argument is True, unlike in the regular JobRunner.

There are many different queueing systems that are popular nowadays (for example: TORQUE, SLURM, OGE). Usually they use different commands for submitting jobs or checking the queue status. GridRunner class tries to build a common interface to these systems. The commands used to communicate with the queueing system are not hard-coded, but rather taken from a Settings instance. Thanks to that the user has almost full control over the behavior of a GridRunner instance and the behavior can be ajdusted dynamically.

The behavior of a GridRunner instance is determined by the contents of a Settings instance stored in its settings attribute. That Settings instance can be manually supplied by the user or taken from a collection of predefined instances stored as branches of GridRunner.config. The adjustment is done with the grid parameter which should be a string or a Settings instance. If it’s a string, it has to be a key occurring in GridRunner.config or 'auto' for autodetection. For example, if grid='slurm' is passed, GridRunner.config.slurm is used as settings. If grid='auto' then entries present in GridRunner.config are tested and the first one that works (its submit command is present on your system) is chosen. When a Settings instance is passed as grid, it is directly used as settings.

Currently two predefined schemes are available (see Defaults file): slurm for SLURM and pbs for queueing systems following PBS syntax (PBS, TORQUE, Oracle Grid Engine etc.).

The settings of GridRunner should have the following structure:

  • output – flag for specifying the output file path.
  • error – flag for specifying the error file path.
  • workdir – flag for specifying path to the working directory.
  • commands.submit – submit command.
  • commands.check – queue status check command.
  • commands.getid – function extracting submitted job’s ID from the output of the submit command.
  • commands.running – function extracting a list of all running jobs from the output of queue check command
  • commands.special – branch storing definitions of special run() keyword arguments.

See call() for more details and examples.

The sleepstep parameter defines how often the queue check is performed. It should be a numerical value telling how many seconds should the interval between two consecutive checks last.

Note

Usually queueing systems are configured in such a way that output of your calculation is captured somewhere else and copied to the location indicated by the output flag only when the job is finished. Because of that it is not possible to have a peek at your output while your job is running (for example, to see if your calculation is going well). This limitation can be circumvented with myjob.settings.runscript.stdout_redirect flag. If set to True, the output redirection will not be handled by the queueing system, but rather placed in the runscript using the shell redirection >. That forces the output file to be created directly in workdir and updated live as the job proceeds.

__init__(grid='auto', sleepstep=5, parallel=True, maxjobs=0)[source]

Initialize self. See help(type(self)) for accurate signature.

call(runscript, workdir, out, err, runflags)[source]

Submit runscript to the queueing system with workdir as the working directory. Redirect output and error streams to out and err, respectively. runflags stores varoius submit command options.

The submit command has the following structure:

<commands.submit>_<workdir>_{workdir}_<error>_{err}[_<output>_{out}][FLAGS]_{runscript}

Underscores denote spaces, parts in pointy brackets correspond to settings entries, parts in curly brackets to call() arguments, square brackets contain optional parts. Output part is added if out is not None. This is handled automatically based on runscript.stdout_redirect value in job’s settings.

FLAGS part is built based on runflags argument, which is a Settings instance storing run() keyword arguments. For every (key,value) pair in runflags the string _-key_value is appended to FLAGS unless the key is a special key occurring in commands.special. In that case _<commands.special.key>value is used (mind the lack of space in between). For example, a Settings instance defining interaction with SLURM has the following entries:

workdir = '-D'
output  = '-o'
error   = '-e'
special.nodes    = '-N '
special.walltime = '-t '
special.memory = '--mem='
special.queue    = '-p '
commands.submit  = 'sbatch'
commands.check  = 'squeue'

The submit command produced by:

gr = GridRunner(parallel=True, maxjobs=4, grid='slurm')
j.run(jobrunner=gr, queue='short', nodes=2, J='something', O='')

will be:

sbatch -D {workdir} -e {err} -o {out} -p short -N 2 -J something -O  {runscript}

In certain queueing systems some flags don’t have a short form with semantics -key value. For example, in SLURM the flag --nodefile=value has a short form -F value, but the flag --export=value does not. One can still use such a flag using the special keys logic:

gr = GridRunner(parallel=True, maxjobs=4, grid='slurm')
gr.settings.special.export = '--export='
j.run(jobrunner=gr, queue='short', export='value')

That results in the command:

sbatch -D {workdir} -e {err} -o {out} -p short --export=value {runscript}

The submit command is then executed and the output returned by it is used to determine the submitted job’s ID. The value stored in commands.getid is used for that purpose. It should be a function taking a single string (the whole output of the submit command) and returning a string with job’s ID.

The submitted job’s ID is then added to _active_jobs dictionary, with the key being job’s ID and the value being an instance of threading.Lock. This lock is used to singal the fact that the job is finished and the thread handling it can continue. Then the _check_queue() method starts the thread querying the queue and unlocking finished jobs.

Since it is difficult to automatically obtain job’s exit code, the returned value is 0 (or 1, if the submit command failed). From run() perspective it means that a job executed with GridRunner is crashed only if it never entered the queue (usually due to improper submit command).

Note

This method is used automatically during run() and should never be explicitly called in your script.

_check_queue()[source]

Query the queueing system to obtain a list of currently running jobs. Check for active jobs that are not any more in the queue and release their locks. Repeat this procedure every sleepstep seconds until there are no more active jobs. The _mainlock lock ensures that there is at most one thread executing the main loop of this method at the same time.

_autodetect()[source]

Try to autodetect the type of queueing system.

The autodetection mechanism is very simple. For each entry in GridRunner.config the submit command followed by --version is executed (for example qsub --version). If the execution was successful (which is indicated by the exit code 0), that queueing system is present and it is chosen. Thus if there are multiple queueing systems installed, only one of them is picked – the one which “name” (indicated by a key in GridRunner.config) is first in the lexicographical order.

Returned value is one of GridRunner.config branches. If autodetection was not successful, an exception is raised.