Job runners¶
Job runners have already been mentioned in previous chapters about jobs and results.
Here we sum up all that information and introduce a basic JobRunner
object together with its subclass GridRunner
which is meant for interacting with queueing systems that manage resources on computer clusters.
Job runners in PLAMS are very simple objects, both from user’s perspective and in terms of internal architecture.
They have no methods that are meant to be called in your scripts, apart from constructors.
Job runners are supposed to be created (with some parameters adjusting their behavior) and passed to the run()
method as parameters (or placed as config.default_jobrunner
).
Local job runner¶
-
class
JobRunner
(parallel=False, maxjobs=0, maxthreads=256)[source]¶ Class defining the basic job runner interface. Instances of this class represent local job runners – job runners that execute computational jobs on the current machine.
The goal of the job runner is to take care of two important things – parallelization and runscript execution:
- When the method
run()
of anyJob
instance is executed, the control, after some initial preparations, is passed to aJobRunner
instance. ThisJobRunner
instance decides if a separate thread should be spawned for the job or if the execution should proceed in the current thread. This decision is based on theparallel
attribute which can be set onJobRunner
creation. There are no separate classes for serial and parallel job runner, both cases are covered byJobRunner
depending on theparallel
parameter. - If the executed job is an instance of
SingleJob
, it creates a runscript which contains most of the actual computational work (usually it’s just an execution of some external binary). The runscript is then submitted to aJobRunner
instance using itscall()
method. This method executes the runscript as a separate subprocess and takes care of output and error streams handling, setting a proper working directory etc.
For a job runner with parallel execution enabled the number of simultaneously running jobs can be limited using the maxjobs parameter. If maxjobs is 0, no limit is enforced. If parallel is
False
, maxjobs is ignored. If parallel isTrue
and maxjobs is a positive integer, aBoundedSemaphore
of that size is used to limit the number of simultaneously runningcall()
methods.For a parallel
JobRunner
the maximum amount of threads started is limited by the maxthreads argument. As each job is running in a separate thread, this number necessarily acts as an upper limit for the number of jobs that can run in parallel. If the limit is exhausted, running further jobs with thisJobRunner
instance will block execution until another already running job thread terminates. Themaxthreads
limit should be set as large as possible, but not so large as to exceed any limits imposed by the operating system.A
JobRunner
instance can be passed torun()
with a keyword argumentjobrunner
. If this argument is omitted, the instance stored inconfig.default_jobrunner
is used.-
__init__
(parallel=False, maxjobs=0, maxthreads=256)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
call
(runscript, workdir, out, err, runflags)[source]¶ Execute the runscript in the folder workdir. Redirect output and error streams to out and err, respectively.
Arguments runscript, workdir, out and err should be strings with paths to corresponding files or folders.
runflags is a
Settings
instance containing the run branch of running job’s settings. The basic job runner defined here ignores them, but they can be useful inJobRunner
subclasses (seeGridRunner.call()
).Returns an integer with the exit code returned by the runscript.
This method can be safely overridden in
JobRunner
subclasses. For example, inGridRunner
it submits the runscript to a queueing system instead of executing it locally.Note
This method is used automatically during
run()
and should never be explicitly called in your script.
-
call_function
(function, args)[source]¶ Execute the function that needs to be scheduled by the jobrunner.
This is wrapped by the _limit() function, so that no more than maxjobs processes can run at the same time
-
_run_job
(job, jobmanager)[source]¶ This method aggregates the parts of Running a job that are supposed to be run in a separate thread in case of parallel job execution. It is wrapped with
_in_limited_thread()
decorator.This method should not be overridden.
-
_run_job_with_args
(job, jobmanager, args, args_fin)[source]¶ _run_job(job, jobmanager) This method aggregates the parts of Running a job that are supposed to be run in a separate thread in case of parallel job execution. It is wrapped with
_in_limited_thread()
decorator.This method should not be overridden.
- When the method
Technical
Similarly to the Results
class, the proper behavior of JobRunner
and its subclasses (also the ones defined by the user) is ensured using a metaclass.
For the sake of completeness we present here a brief specification of all involved elements:
-
class
_MetaRunner
[source]¶ Metaclass for
JobRunner
. During an instance creation wrap thecall()
method with_limit()
decorator which enforces a limit on the number of simultaneouscall()
calls.
Remote job runner¶
-
class
GridRunner
(grid='auto', sleepstep=5, parallel=True, maxjobs=0)[source]¶ Subclass of
JobRunner
that submits the runscript to a queueing system instead of executing it locally. Besides two new keyword arguments (grid and sleepstep) it behaves and is meant to be used just like a regularJobRunner
.Note
The default value of the parallel argument is
True
, unlike in the regularJobRunner
.There are many different queueing systems that are popular nowadays (for example: TORQUE, SLURM, OGE). Usually they use different commands for submitting jobs or checking the queue status.
GridRunner
class tries to build a common interface to these systems. The commands used to communicate with the queueing system are not hard-coded, but rather taken from aSettings
instance. Thanks to that the user has almost full control over the behavior of aGridRunner
instance and the behavior can be ajdusted dynamically.The behavior of a
GridRunner
instance is determined by the contents of aSettings
instance stored in itssettings
attribute. ThatSettings
instance can be manually supplied by the user or taken from a collection of predefined instances stored as branches ofGridRunner.config
. The adjustment is done with the grid parameter which should be a string or aSettings
instance. If it’s a string, it has to be a key occurring inGridRunner.config
or'auto'
for autodetection. For example, ifgrid='slurm'
is passed,GridRunner.config.slurm
is used as settings. Ifgrid='auto'
then entries present inGridRunner.config
are tested and the first one that works (its submit command is present on your system) is chosen. When aSettings
instance is passed as grid, it is directly used assettings
.Currently two predefined schemes are available (see Defaults file):
slurm
for SLURM andpbs
for queueing systems following PBS syntax (PBS, TORQUE, Oracle Grid Engine etc.).The settings of
GridRunner
should have the following structure:output
– flag for specifying the output file path.error
– flag for specifying the error file path.workdir
– flag for specifying path to the working directory.commands.submit
– submit command.commands.check
– queue status check command.commands.getid
– function extracting submitted job’s ID from the output of the submit command.commands.running
– function extracting a list of all running jobs from the output of queue check commandcommands.special
– branch storing definitions of specialrun()
keyword arguments.
See
call()
for more details and examples.The sleepstep parameter defines how often the queue check is performed. It should be a numerical value telling how many seconds should the interval between two consecutive checks last.
Note
Usually queueing systems are configured in such a way that output of your calculation is captured somewhere else and copied to the location indicated by the output flag only when the job is finished. Because of that it is not possible to have a peek at your output while your job is running (for example, to see if your calculation is going well). This limitation can be circumvented with
myjob.settings.runscript.stdout_redirect
flag. If set toTrue
, the output redirection will not be handled by the queueing system, but rather placed in the runscript using the shell redirection>
. That forces the output file to be created directly in workdir and updated live as the job proceeds.-
__init__
(grid='auto', sleepstep=5, parallel=True, maxjobs=0)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
call
(runscript, workdir, out, err, runflags)[source]¶ Submit runscript to the queueing system with workdir as the working directory. Redirect output and error streams to out and err, respectively. runflags stores varoius submit command options.
The submit command has the following structure:
<commands.submit>_<workdir>_{workdir}_<error>_{err}[_<output>_{out}][FLAGS]_{runscript}
Underscores denote spaces, parts in pointy brackets correspond to
settings
entries, parts in curly brackets tocall()
arguments, square brackets contain optional parts. Output part is added if out is notNone
. This is handled automatically based onrunscript.stdout_redirect
value in job’ssettings
.FLAGS
part is built based on runflags argument, which is aSettings
instance storingrun()
keyword arguments. For every (key,value) pair in runflags the string_-key_value
is appended toFLAGS
unless the key is a special key occurring incommands.special
. In that case_<commands.special.key>value
is used (mind the lack of space in between). For example, aSettings
instance defining interaction with SLURM has the following entries:workdir = '-D' output = '-o' error = '-e' special.nodes = '-N ' special.walltime = '-t ' special.memory = '--mem=' special.queue = '-p ' commands.submit = 'sbatch' commands.check = 'squeue'
The submit command produced by:
gr = GridRunner(parallel=True, maxjobs=4, grid='slurm') j.run(jobrunner=gr, queue='short', nodes=2, J='something', O='')
will be:
sbatch -D {workdir} -e {err} -o {out} -p short -N 2 -J something -O {runscript}
In certain queueing systems some flags don’t have a short form with semantics
-key value
. For example, in SLURM the flag--nodefile=value
has a short form-F value
, but the flag--export=value
does not. One can still use such a flag using the special keys logic:gr = GridRunner(parallel=True, maxjobs=4, grid='slurm') gr.settings.special.export = '--export=' j.run(jobrunner=gr, queue='short', export='value')
That results in the command:
sbatch -D {workdir} -e {err} -o {out} -p short --export=value {runscript}
The submit command is then executed and the output returned by it is used to determine the submitted job’s ID. The value stored in
commands.getid
is used for that purpose. It should be a function taking a single string (the whole output of the submit command) and returning a string with job’s ID.The submitted job’s ID is then added to
_active_jobs
dictionary, with the key being job’s ID and the value being an instance ofthreading.Lock
. This lock is used to singal the fact that the job is finished and the thread handling it can continue. Then the_check_queue()
method starts the thread querying the queue and unlocking finished jobs.Since it is difficult to automatically obtain job’s exit code, the returned value is 0 (or 1, if the submit command failed). From
run()
perspective it means that a job executed withGridRunner
is crashed only if it never entered the queue (usually due to improper submit command).Note
This method is used automatically during
run()
and should never be explicitly called in your script.
-
_check_queue
()[source]¶ Query the queueing system to obtain a list of currently running jobs. Check for active jobs that are not any more in the queue and release their locks. Repeat this procedure every
sleepstep
seconds until there are no more active jobs. The_mainlock
lock ensures that there is at most one thread executing the main loop of this method at the same time.
-
_autodetect
()[source]¶ Try to autodetect the type of queueing system.
The autodetection mechanism is very simple. For each entry in
GridRunner.config
the submit command followed by--version
is executed (for exampleqsub --version
). If the execution was successful (which is indicated by the exit code 0), that queueing system is present and it is chosen. Thus if there are multiple queueing systems installed, only one of them is picked – the one which “name” (indicated by a key inGridRunner.config
) is first in the lexicographical order.Returned value is one of
GridRunner.config
branches. If autodetection was not successful, an exception is raised.