Type: | Package |
Title: | Run Interruptible Code Asynchronously |
Version: | 1.7.0 |
Date: | 2025-05-23 |
Description: | Takes an R expression and returns a job object with a $stop() method which can be called to terminate the background job. Also provides timeouts and other mechanisms for automatically terminating a background job. The result of the expression is available synchronously via $result or asynchronously with callbacks or through the 'promises' package framework. |
URL: | https://cmmr.github.io/jobqueue/, https://github.com/cmmr/jobqueue |
BugReports: | https://github.com/cmmr/jobqueue/issues |
License: | MIT + file LICENSE |
Encoding: | UTF-8 |
RoxygenNote: | 7.3.2 |
VignetteBuilder: | knitr |
Config/Needs/website: | rmarkdown |
Config/testthat/edition: | 3 |
Config/testthat/parallel: | false |
Depends: | R (≥ 4.2.0) |
Imports: | cli, interprocess (≥ 1.2.0), later, magrittr, parallelly, promises, ps, R6, rlang, utils |
Suggests: | glue, knitr, rmarkdown, testthat |
NeedsCompilation: | no |
Packaged: | 2025-05-23 23:03:46 UTC; Daniel |
Author: | Daniel P. Smith |
Maintainer: | Daniel P. Smith <dansmith01@gmail.com> |
Repository: | CRAN |
Date/Publication: | 2025-05-23 23:22:01 UTC |
Define an R Expression (R6 Class)
Description
The job
object encapsulates an expression and its evaluation
parameters. It also provides a way to check for and retrieve the result.
Active bindings
expr
R expression that will be run by this
job
.vars
Get or set - List of variables that will be placed into the expression's environment before evaluation.
reformat
Get or set -
function (job)
for defining<job>$result
.signal
Get or set - Conditions to signal.
cpus
Get or set - Number of CPUs to reserve for evaluating
expr
.timeout
Get or set - Time limits to apply to this
job
.proxy
Get or set -
job
to proxy in place of runningexpr
.state
Get or set - The
job's
state:'created'
,'submitted'
,'queued'
,'dispatched'
,'starting'
,'running'
, or'done'
. Assigning to<job>$state
will trigger callback hooks.output
Get or set -
job's
raw output. Assigning to<job>$output
will change thejob's
state to'done'
.jobqueue
worker
result
Result of
expr
. Will block untiljob
is finished.hooks
Currently registered callback hooks as a named list of functions. Set new hooks with
<job>$on()
.is_done
TRUE
orFALSE
depending on if thejob's
result is ready.uid
A short string, e.g.
'J16'
, that uniquely identifies thisjob
.
Methods
Public methods
Method new()
Creates a job
object defining how to run an expression on
a background worker
process.
Typically you won't need to call job_class$new()
. Instead, create a
jobqueue
and use <jobqueue>$run()
to generate
job
objects.
Usage
job_class$new( expr, vars = NULL, timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, ... )
Arguments
expr
A call or R expression wrapped in curly braces to evaluate on a
worker
. Will have access to any variables defined byvars
, as well as theworker's
globals
,packages
, andinit
configuration. Seevignette('eval')
.vars
A named list of variables to make available to
expr
during evaluation. Alternatively, an object that can be coerced to a named list withas.list()
, e.g. named vector, data.frame, or environment. Or afunction (job)
that returns such an object.timeout
A named numeric vector indicating the maximum number of seconds allowed for each state the
job
passes through, or 'total' to apply a single timeout from 'submitted' to 'done'. Or afunction (job)
that returns the same. Example:timeout = c(total = 2.5, running = 1)
. Seevignette('stops')
.hooks
A named list of functions to run when the
job
state changes, of the formhooks = list(created = function (worker) {...})
. Or afunction (job)
that returns the same. Names ofworker
hooks are typically'created'
,'submitted'
,'queued'
,'dispatched'
,'starting'
,'running'
,'done'
, or'*'
(duplicates okay). Seevignette('hooks')
.reformat
Set
reformat = function (job)
to define what<job>$result
should return. The default,reformat = NULL
passes<job>$output
to<job>$result
unchanged. Seevignette('results')
.signal
Should calling
<job>$result
signal on condition objects? WhenFALSE
,<job>$result
will return the object without taking additional action. Setting toTRUE
or a character vector of condition classes, e.g.c('interrupt', 'error', 'warning')
, will cause the equivalent ofstop(<condition>)
to be called when those conditions are produced. Alternatively, afunction (job)
that returnsTRUE
orFALSE
. Seevignette('results')
.cpus
How many CPU cores to reserve for this
job
. Or afunction (job)
that returns the same. Used to limit the number ofjobs
running simultaneously to respect<jobqueue>$max_cpus
. Does not prevent ajob
from using more CPUs than reserved....
Arbitrary named values to add to the returned
job
object.
Returns
A job
object.
Method print()
Print method for a job
.
Usage
job_class$print(...)
Arguments
...
Arguments are not used currently.
Returns
This job
, invisibly.
Method on()
Attach a callback function to execute when the job
enters
state
.
Usage
job_class$on(state, func)
Arguments
state
The name of a
job
state. Typically one of:-
'*'
- Every time the state changes. -
'.next'
- Only one time, the next time the state changes. -
'created'
- Afterjob_class$new()
initialization. -
'submitted'
- After<job>$jobqueue
is assigned. -
'queued'
- Afterstop_id
andcopy_id
are resolved. -
'dispatched'
- After<job>$worker
is assigned. -
'starting'
- Before evaluation begins. -
'running'
- After evaluation begins. -
'done'
- After<job>$output
is assigned.
Custom states can also be specified.
-
func
A function that accepts a
job
object as input. You can call<job>$stop()
or edit<job>$
values and the changes will be persisted (sincejobs
are reference class objects). You can also edit/stop other queuedjobs
by modifying thejobs
in<job>$jobqueue$jobs
. Return value is ignored.
Returns
A function that when called removes this callback from the
job
.
Method wait()
Blocks until the job
enters the given state.
Usage
job_class$wait(state = "done", timeout = NULL)
Arguments
state
The name of a
job
state. Typically one of:-
'*'
- Every time the state changes. -
'.next'
- Only one time, the next time the state changes. -
'created'
- Afterjob_class$new()
initialization. -
'submitted'
- After<job>$jobqueue
is assigned. -
'queued'
- Afterstop_id
andcopy_id
are resolved. -
'dispatched'
- After<job>$worker
is assigned. -
'starting'
- Before evaluation begins. -
'running'
- After evaluation begins. -
'done'
- After<job>$output
is assigned.
Custom states can also be specified.
-
timeout
Stop the
job
if it takes longer than this number of seconds, orNULL
.
Returns
This job
, invisibly.
Method stop()
Stop this job
. If the job
is running, its
worker
will be restarted.
Usage
job_class$stop(reason = "job stopped by user", cls = NULL)
Arguments
reason
A message to include in the 'interrupt' condition object that will be returned as the
job's
result. Or a condition object.cls
Character vector of additional classes to prepend to
c('interrupt', 'condition')
.
Returns
This job
, invisibly.
Assigns Jobs to Workers
Description
Jobs go in. Results come out.
Usage
jobqueue(
globals = NULL,
packages = NULL,
namespace = NULL,
init = NULL,
max_cpus = availableCores(),
workers = ceiling(max_cpus * 1.2),
timeout = NULL,
hooks = NULL,
reformat = NULL,
signal = FALSE,
cpus = 1L,
stop_id = NULL,
copy_id = NULL
)
Arguments
globals |
A named list of variables that all |
packages |
Character vector of package names to load on
|
namespace |
The name of a package to attach to the
|
init |
A call or R expression wrapped in curly braces to evaluate on
each |
max_cpus |
Total number of CPU cores that can be reserved by all
running |
workers |
How many background |
timeout |
A named numeric vector indicating the maximum number of
seconds allowed for each state the |
hooks |
A named list of functions to run when the |
reformat |
Set |
signal |
Should calling |
cpus |
The default number of CPU cores per |
stop_id |
If an existing |
copy_id |
If an existing |
Value
A jobqueue
object.
Examples
jq <- jobqueue(globals = list(N = 42), workers = 2)
print(jq)
job <- jq$run({ paste("N is", N) })
job$result
jq$stop()
Assigns Jobs to Workers (R6 Class)
Description
Jobs go in. Results come out.
Active bindings
hooks
A named list of currently registered callback hooks.
jobs
Get or set - List of jobs currently managed by this
jobqueue
.state
The
jobqueue's
state:'starting'
,'idle'
,'busy'
,'stopped'
, or'error.'
uid
A short string, e.g.
'Q1'
, that uniquely identifies thisjobqueue
.tmp
The
jobqueue
's temporary directory.workers
cnd
The error that caused the
jobqueue
to stop.
Methods
Public methods
Method new()
Creates a pool of background processes for handling $run()
and
$submit()
calls. These workers
are initialized
according to the globals
, packages
, and init
arguments.
Usage
jobqueue_class$new( globals = NULL, packages = NULL, namespace = NULL, init = NULL, max_cpus = availableCores(), workers = ceiling(max_cpus * 1.2), timeout = NULL, hooks = NULL, reformat = NULL, signal = FALSE, cpus = 1L, stop_id = NULL, copy_id = NULL )
Arguments
globals
A named list of variables that all
<job>$expr
s will have access to. Alternatively, an object that can be coerced to a named list withas.list()
, e.g. named vector, data.frame, or environment.packages
Character vector of package names to load on
workers
.namespace
The name of a package to attach to the
worker's
environment.init
A call or R expression wrapped in curly braces to evaluate on each
worker
just once, immediately after start-up. Will have access to variables defined byglobals
and assets frompackages
andnamespace
. Returned value is ignored.max_cpus
Total number of CPU cores that can be reserved by all running
jobs
(sum(<job>$cpus)
). Does not enforce limits on actual CPU utilization.workers
How many background
worker
processes to start. Set to more thanmax_cpus
to enable standbyworkers
to quickly swap out withworkers
that need to restart.timeout, hooks, reformat, signal, cpus, stop_id, copy_id
Defaults for this
jobqueue's
$run()
method. Here only,stop_id
andcopy_id
must be either afunction (job)
orNULL
.hooks
can setjobqueue
,worker
, and/orjob
hooks - see the "Attaching" section invignette('hooks')
.
Returns
A jobqueue
object.
Method print()
Print method for a jobqueue
.
Usage
jobqueue_class$print(...)
Arguments
...
Arguments are not used currently.
Method run()
Creates a job
object and submits it to the
jobqueue
for running. Any NA
arguments will be
replaced with their value from jobqueue_class$new()
.
Usage
jobqueue_class$run( expr, vars = list(), timeout = NA, hooks = NA, reformat = NA, signal = NA, cpus = NA, stop_id = NA, copy_id = NA, ... )
Arguments
expr
A call or R expression wrapped in curly braces to evaluate on a
worker
. Will have access to any variables defined byvars
, as well as thejobqueue's
globals
,packages
, andinit
configuration. Seevignette('eval')
.vars
A named list of variables to make available to
expr
during evaluation. Alternatively, an object that can be coerced to a named list withas.list()
, e.g. named vector, data.frame, or environment. Or afunction (job)
that returns such an object.timeout
A named numeric vector indicating the maximum number of seconds allowed for each state the
job
passes through, or 'total' to apply a single timeout from 'submitted' to 'done'. Can also limit the 'starting' state forworkers
. Afunction (job)
can be used in place of a number. Example:timeout = c(total = 2.5, running = 1)
. Seevignette('stops')
.hooks
A named list of functions to run when the
job
state changes, of the formhooks = list(created = function (worker) {...})
. Or afunction (job)
that returns the same. Names ofworker
hooks are typically'created'
,'submitted'
,'queued'
,'dispatched'
,'starting'
,'running'
,'done'
, or'*'
(duplicates okay). Seevignette('hooks')
.reformat
Set
reformat = function (job)
to define what<job>$result
should return. The default,reformat = NULL
passes<job>$output
to<job>$result
unchanged. Seevignette('results')
.signal
Should calling
<job>$result
signal on condition objects? WhenFALSE
,<job>$result
will return the object without taking additional action. Setting toTRUE
or a character vector of condition classes, e.g.c('interrupt', 'error', 'warning')
, will cause the equivalent ofstop(<condition>)
to be called when those conditions are produced. Alternatively, afunction (job)
that returnsTRUE
orFALSE
. Seevignette('results')
.cpus
How many CPU cores to reserve for this
job
. Or afunction (job)
that returns the same. Used to limit the number ofjobs
running simultaneously to respect<jobqueue>$max_cpus
. Does not prevent ajob
from using more CPUs than reserved.stop_id
If an existing
job
in thejobqueue
has the samestop_id
, thatjob
will be stopped and return an 'interrupt' condition object as its result.stop_id
can also be afunction (job)
that returns thestop_id
to assign to a givenjob
. Astop_id
ofNULL
disables this feature. Seevignette('stops')
.copy_id
If an existing
job
in thejobqueue
has the samecopy_id
, the newly submittedjob
will become a "proxy" for that earlierjob
, returning whatever result the earlierjob
returns.copy_id
can also be afunction (job)
that returns thecopy_id
to assign to a givenjob
. Acopy_id
ofNULL
disables this feature. Seevignette('stops')
....
Arbitrary named values to add to the returned
job
object.
Returns
The new job
object.
Method submit()
Adds a job
to the jobqueue
for
running on a background process.
Usage
jobqueue_class$submit(job)
Arguments
job
A
job
object, as created byjob_class$new()
.
Returns
This jobqueue
, invisibly.
Method wait()
Blocks until the jobqueue
enters the given state.
Usage
jobqueue_class$wait(state = "idle", timeout = NULL, signal = TRUE)
Arguments
Returns
This jobqueue
, invisibly.
Method on()
Attach a callback function to execute when the
jobqueue
enters state
.
Usage
jobqueue_class$on(state, func)
Arguments
Returns
A function that when called removes this callback from the
jobqueue
.
Method stop()
Usage
jobqueue_class$stop(reason = "jobqueue shut down by user", cls = NULL)
Arguments
Returns
This jobqueue
, invisibly.
Objects exported from other packages
Description
These objects are imported from other packages. Follow the links below to see their documentation.
- later
- magrittr
- parallelly
- promises
%...>%
,%...T>%
,as.promise
,promise_all
,promise_map
,promise_race
,promise_reduce
,then
A Background Process (R6 Class)
Description
Where job expressions are evaluated.
Active bindings
hooks
A named list of currently registered callback hooks.
job
The currently running
job
.ps
The
ps::ps_handle()
object for the background process.state
The
worker's
state:'starting'
,'idle'
,'busy'
, or'stopped'
.uid
A short string, e.g.
'W11'
, that uniquely identifies thisworker
.tmp
The
worker's
temporary directory.cnd
The error that caused the
worker
to stop.
Methods
Public methods
Method new()
Creates a background R process for running jobs
.
Usage
worker_class$new( globals = NULL, packages = NULL, namespace = NULL, init = NULL, hooks = NULL, wait = TRUE, timeout = Inf )
Arguments
globals
A named list of variables that all
<job>$expr
s will have access to. Alternatively, an object that can be coerced to a named list withas.list()
, e.g. named vector, data.frame, or environment.packages
Character vector of package names to load on
workers
.namespace
The name of a package to attach to the
worker's
environment.init
A call or R expression wrapped in curly braces to evaluate on each
worker
just once, immediately after start-up. Will have access to variables defined byglobals
and assets frompackages
andnamespace
. Returned value is ignored.hooks
A named list of functions to run when the
worker
state changes, of the formhooks = list(idle = function (worker) {...})
. Names ofworker
hooks are typicallystarting
,idle
,busy
,stopped
, or'*'
(duplicates okay). Seevignette('hooks')
.wait
If
TRUE
, blocks until theworker
is 'idle'. IfFALSE
, theworker
object is returned in the 'starting' state.timeout
How long to wait for the
worker
to finish starting (in seconds). IfNA
, defaults to theworker_class$new()
argument.
Returns
A worker
object.
Method print()
Print method for a worker
.
Usage
worker_class$print(...)
Arguments
...
Arguments are not used currently.
Returns
The worker
, invisibly.
Method start()
Restarts a stopped worker
.
Usage
worker_class$start(wait = TRUE, timeout = NA)
Arguments
Returns
The worker
, invisibly.
Method stop()
Stops a worker
by terminating the background process
and calling <job>$stop(reason)
on any jobs
currently
assigned to this worker
.
Usage
worker_class$stop(reason = "worker stopped by user", cls = NULL)
Arguments
Returns
The worker
, invisibly.
Method restart()
Restarts a worker
by calling <worker>$stop(reason)
and <worker>$start()
in succession.
Usage
worker_class$restart( wait = TRUE, timeout = NA, reason = "restarting worker", cls = NULL )
Arguments
wait
If
TRUE
, blocks until theworker
is 'idle'. IfFALSE
, theworker
object is returned in the 'starting' state.timeout
How long to wait for the
worker
to finish starting (in seconds). IfNA
, defaults to theworker_class$new()
argument.reason
Passed to
<job>$stop()
for anyjobs
currently managed by thisworker
.cls
Passed to
<job>$stop()
for anyjobs
currently managed by thisworker
.
Returns
The worker
, invisibly.
Method on()
Attach a callback function to execute when the worker
enters state
.
Usage
worker_class$on(state, func)
Arguments
Returns
A function that when called removes this callback from the
worker
.
Method wait()
Blocks until the worker
enters the given state.
Usage
worker_class$wait(state = "idle", timeout = Inf, signal = TRUE)
Arguments
Returns
This worker
, invisibly.
Method run()
Assigns a job
to this worker
for
evaluation on the background process.
Usage
worker_class$run(job)
Arguments
job
A
job
object, as created byjob_class$new()
.
Returns
This worker
, invisibly.