Title: | A Distributed Worker Launcher Framework |
Description: | In computationally demanding analysis projects, statisticians and data scientists asynchronously deploy long-running tasks to distributed systems, ranging from traditional clusters to cloud services. The 'NNG'-powered 'mirai' R package by Gao (2023) <doi:10.5281/zenodo.7912722> is a sleek and sophisticated scheduler that efficiently processes these intense workloads. The 'crew' package extends 'mirai' with a unifying interface for third-party worker launchers. Inspiration also comes from packages. 'future' by Bengtsson (2021) <doi:10.32614/RJ-2021-048>, 'rrq' by FitzJohn and Ashton (2023) https://github.com/mrc-ide/rrq, 'clustermq' by Schubert (2019) <doi:10.1093/bioinformatics/btz284>), and 'batchtools' by Lang, Bischel, and Surmann (2017) <doi:10.21105/joss.00135>. |
Version: | 1.2.1 |
License: | MIT + file LICENSE |
URL: | https://wlandau.github.io/crew/, https://github.com/wlandau/crew |
BugReports: | https://github.com/wlandau/crew/issues |
Depends: | R (≥ 4.0.0) |
Imports: | cli (≥ 3.1.0), data.table, later, mirai (≥ 2.0.1), nanonext (≥ 1.6.0), processx, promises, ps, R6, rlang, stats, tibble, tidyselect, tools, utils |
Suggests: | autometric (≥ 0.1.0), knitr (≥ 1.30), markdown (≥ 1.1), rmarkdown (≥ 2.4), testthat (≥ 3.0.0) |
Encoding: | UTF-8 |
Language: | en-US |
VignetteBuilder: | knitr |
Config/testthat/edition: | 3 |
RoxygenNote: | 7.3.2 |
NeedsCompilation: | no |
Packaged: | 2025-06-09 12:45:24 UTC; C240390 |
Author: | William Michael Landau
|
Maintainer: | William Michael Landau <will.landau.oss@gmail.com> |
Repository: | CRAN |
Date/Publication: | 2025-06-09 16:10:02 UTC |
crew: a distributed worker launcher framework
Description
In computationally demanding analysis projects,
statisticians and data scientists asynchronously deploy
long-running tasks to distributed systems, ranging from
traditional clusters to cloud services.
The NNG-powered
mirai
R package is a sleek and sophisticated scheduler
that efficiently processes these intense workloads.
The crew
package extends
mirai
with a unifying interface
for third-party worker launchers.
Inspiration also comes from packages
future
,
rrq
,
clustermq
,
and batchtools
.
Crew assertion
Description
Assert that a condition is true.
Usage
crew_assert(value = NULL, ..., message = NULL, envir = parent.frame())
Arguments
value |
An object or condition. |
... |
Conditions that use the |
message |
Optional message to print on error. |
envir |
Environment to evaluate the condition. |
Value
NULL
(invisibly). Throws an error if the condition is not true.
See Also
Other utility:
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
Examples
crew_assert(1 < 2)
crew_assert("object", !anyNA(.), nzchar(.))
tryCatch(
crew_assert(2 < 1),
crew_error = function(condition) message("false")
)
Local asynchronous client object.
Description
Create an R6
object to manage local asynchronous quick
tasks with error detection.
Usage
crew_async(workers = NULL)
Arguments
workers |
Number of local |
Details
crew_async()
objects are created inside launchers to allow
launcher plugins to run local tasks asynchronously, such as
calls to cloud APIs to launch serious remote workers.
Value
An R6
async client object.
See Also
Other async:
crew_class_async
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
x <- crew_async()
x$start()
out <- x$eval(1 + 1)
mirai::call_mirai(out)
out$data # 2
x$terminate()
}
R6
async class.
Description
R6
class for async configuration.
Details
See crew_async()
.
Active bindings
workers
See
crew_async()
.instance
Name of the current instance.
Methods
Public methods
Method new()
TLS configuration constructor.
Usage
crew_class_async$new(workers = NULL)
Arguments
workers
Argument passed from
crew_async()
.
Returns
An R6
object with TLS configuration.
Method validate()
Validate the object.
Usage
crew_class_async$validate()
Returns
NULL
(invisibly).
Method start()
Start the local workers and error handling socket.
Usage
crew_class_async$start()
Details
Does not create workers or an error handling socket
if workers
is NULL
or the object is already started.
Returns
NULL
(invisibly).
Method terminate()
Start the local workers and error handling socket.
Usage
crew_class_async$terminate()
Details
Waits for existing tasks to complete first.
Returns
NULL
(invisibly).
Method started()
Show whether the object is started.
Usage
crew_class_async$started()
Returns
Logical of length 1, whether the object is started.
Method asynchronous()
Show whether the object is asynchronous (has real workers).
Usage
crew_class_async$asynchronous()
Returns
Logical of length 1, whether the object is asynchronous.
Method eval()
Run a local asynchronous task using a local compute profile.
Usage
crew_class_async$eval( command, substitute = TRUE, data = list(), packages = character(0L), library = NULL )
Arguments
command
R code to run.
substitute
Logical of length 1, whether to substitute
command
. IfFALSE
, thencommand
must be an expression object or language object.data
Named list of data objects required to run
command
.packages
Character vector of packages to load.
library
Character vector of library paths to load the packages from.
Details
Used for launcher plugins with asynchronous launches and
terminations. If processes
is NULL
, the task will run locally.
Otherwise, the task will run on a local process in the local
mirai
compute profile.
Returns
If the processes
field is NULL
, a list with an object named
data
containing the result of evaluating expr
synchronously.
Otherwise, the task is evaluated asynchronously, and the result
is a mirai
task object. Either way, the data
element
of the return value will contain the result of the task.
See Also
Other async:
crew_async()
R6
client class.
Description
R6
class for mirai
clients.
Details
See crew_client()
.
Active bindings
host
See
crew_client()
.port
See
crew_client()
.tls
See
crew_client()
.serialization
See
crew_client()
.seconds_interval
See
crew_client()
.seconds_timeout
See
crew_client()
.relay
Relay object for event-driven programming on a downstream condition variable.
started
Whether the client is started.
url
Client websocket URL.
profile
Compute profile of the client.
condition
Condition variable of the client.
client
Process ID of the local process running the client.
dispatcher
Process ID of the
mirai
dispatcher
Methods
Public methods
Method new()
mirai
client constructor.
Usage
crew_class_client$new( host = NULL, port = NULL, tls = NULL, serialization = NULL, seconds_interval = NULL, seconds_timeout = NULL, relay = NULL )
Arguments
host
Argument passed from
crew_client()
.port
Argument passed from
crew_client()
.tls
Argument passed from
crew_client()
.serialization
Argument passed from
crew_client()
.seconds_interval
Argument passed from
crew_client()
.seconds_timeout
Argument passed from
crew_client()
.relay
Argument passed from
crew_client()
.
Returns
An R6
object with the client.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() client$log() client$terminate() }
Method validate()
Validate the client.
Usage
crew_class_client$validate()
Returns
NULL
(invisibly).
Method set_started()
Register the client as started.
Usage
crew_class_client$set_started()
Details
Exported to implement the sequential controller. Only meant to be called manually inside the client or the sequential controller.
Returns
NULL
(invisibly).
Method start()
Start listening for workers on the available sockets.
Usage
crew_class_client$start()
Returns
NULL
(invisibly).
Method terminate()
Stop the mirai client and disconnect from the worker websockets.
Usage
crew_class_client$terminate()
Returns
NULL
(invisibly).
Method resolved()
Get the true value of the nanonext
condition variable.
Usage
crew_class_client$resolved()
Returns
The value of the nanonext
condition variable.
Method status()
Internal function:
return the mirai
status of the compute profile.
Usage
crew_class_client$status()
Details
Should only be called by the launcher, never by the user.
The returned events
field changes on every call and must be
interpreted by the launcher before it vanishes.
Returns
A list with status information.
Method pids()
Get the process IDs of the local process and the
mirai
dispatcher (if started).
Usage
crew_class_client$pids()
Returns
An integer vector of process IDs of the local process and the
mirai
dispatcher (if started).
See Also
Other client:
crew_client()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
client$log()
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_client$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
client$log()
client$terminate()
}
Controller class
Description
R6
class for controllers.
Details
See crew_controller()
.
Active bindings
client
Client object.
launcher
Launcher object.
tasks
A list of
mirai::mirai()
task objects.pushed
Number of tasks pushed since the controller was started.
popped
Number of tasks popped since the controller was started.
reset_globals
See
crew_controller()
. since the controller was started.reset_packages
See
crew_controller()
. since the controller was started.reset_options
See
crew_controller()
. since the controller was started.garbage_collection
See
crew_controller()
. since the controller was started.crashes_max
See
crew_controller()
.backup
See
crew_controller()
.error
Tibble of task results (with one result per row) from the last call to
map(error = "stop)
.backlog
A
crew_queue()
object tracking explicitly backlogged tasks.autoscaling
TRUE
orFALSE
, whether asynclater
-based auto-scaling is currently runningqueue
Queue of resolved unpopped/uncollected tasks.
Methods
Public methods
Method new()
mirai
controller constructor.
Usage
crew_class_controller$new( client = NULL, launcher = NULL, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_max = NULL, backup = NULL )
Arguments
client
Client object. See
crew_controller()
.launcher
Launcher object. See
crew_controller()
.reset_globals
See
crew_controller()
.reset_packages
See
crew_controller()
.reset_options
See
crew_controller()
.garbage_collection
See
crew_controller()
.crashes_max
See
crew_controller()
.backup
See
crew_controller()
.
Returns
An R6
controller object.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() launcher <- crew_launcher_local() controller <- crew_controller(client = client, launcher = launcher) controller$start() controller$push(name = "task", command = sqrt(4)) controller$wait() controller$pop() controller$terminate() }
Method validate()
Validate the controller.
Usage
crew_class_controller$validate()
Returns
NULL
(invisibly).
Method empty()
Check if the controller is empty.
Usage
crew_class_controller$empty(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push()
.
Returns
TRUE
if the controller is empty, FALSE
otherwise.
Method nonempty()
Check if the controller is nonempty.
Usage
crew_class_controller$nonempty(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push()
.
Returns
TRUE
if the controller is empty, FALSE
otherwise.
Method resolved()
Number of resolved mirai()
tasks.
Usage
crew_class_controller$resolved(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
resolved()
is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
Returns
Non-negative integer of length 1,
number of resolved mirai()
tasks.
The return value is 0 if the condition variable does not exist
(i.e. if the client is not running).
Method unresolved()
Number of unresolved mirai()
tasks.
Usage
crew_class_controller$unresolved(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Non-negative integer of length 1,
number of unresolved mirai()
tasks.
Method unpopped()
Number of resolved mirai()
tasks available via pop()
.
Usage
crew_class_controller$unpopped(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Non-negative integer of length 1,
number of resolved mirai()
tasks available via pop()
.
Method saturated()
Check if the controller is saturated.
Usage
crew_class_controller$saturated( collect = NULL, throttle = NULL, controller = NULL )
Arguments
collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
A controller is saturated if the number of unresolved tasks
is greater than or equal to the maximum number of workers.
In other words, in a saturated controller, every available worker
has a task.
You can still push tasks to a saturated controller, but
tools that use crew
such as targets
may choose not to.
Returns
TRUE
if the controller is saturated, FALSE
otherwise.
Method start()
Start the controller if it is not already started.
Usage
crew_class_controller$start(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
Register the mirai client and register worker websockets with the launcher.
Returns
NULL
(invisibly).
Method started()
Check whether the controller is started.
Usage
crew_class_controller$started(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
Actually checks whether the client is started.
Returns
TRUE
if the controller is started, FALSE
otherwise.
Method launch()
Launch one or more workers.
Usage
crew_class_controller$launch(n = 1L, controllers = NULL)
Arguments
n
Number of workers to launch.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL
(invisibly).
Method scale()
Auto-scale workers out to meet the demand of tasks.
Usage
crew_class_controller$scale(throttle = TRUE, controllers = NULL)
Arguments
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
The scale()
method launches new workers to
run tasks if needed.
Returns
Invisibly returns TRUE
if there was any relevant
auto-scaling activity (new worker launches or worker
connection/disconnection events) (FALSE
otherwise).
Method autoscale()
Run worker auto-scaling in a private later
loop
every controller$client$seconds_interval
seconds.
Usage
crew_class_controller$autoscale(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
Call controller$descale()
to terminate the
auto-scaling loop.
Returns
NULL
(invisibly).
Method descale()
Terminate the auto-scaling loop started by
controller$autoscale()
.
Usage
crew_class_controller$descale(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL
(invisibly).
Method crashes()
Report the number of consecutive crashes of a task.
Usage
crew_class_controller$crashes(name, controllers = NULL)
Arguments
name
Character string, name of the task to check.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
See the crashes_max
argument of crew_controller()
.
Returns
Non-negative integer, number of consecutive times the task crashed.
Method push()
Push a task to the head of the task list.
Usage
crew_class_controller$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NULL, save_command = NULL, controller = NULL )
Arguments
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the global environment for the task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).scale
Logical, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load. Also see thethrottle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.name
Character string, name of the task. If
NULL
, then a random name is generated automatically. The name of the task must not conflict with the name of another task pushed to the controller. Any previous task with the same name must first be popped before a new task with that name can be pushed.save_command
Deprecated on 2025-01-22 (
crew
version 0.10.2.9004) and no longer used.controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Invisibly return the mirai
object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise()
.
Method walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
Usage
crew_class_controller$walk( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
Arguments
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")
andf(x = 2, y = "b")
, setcommand
tof(x, y)
, and setiterate
tolist(x = c(1, 2), y = c("a", "b"))
. The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])
andf(x = iterate$x[[2]], y = iterate$y[[2]])
. All the elements ofiterate
must have the same length. If there are any name conflicts betweeniterate
anddata
,iterate
takes precedence.data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
. Objects in this list are treated as single values and are held constant for each iteration of the map.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).names
Optional character of length 1, name of the element of
iterate
with names for the tasks. Ifnames
is supplied, theniterate[[names]]
must be a character vector.save_command
Deprecated on 2025-01-22 (
crew
version 0.10.2.9004). The command is always saved now.verbose
Logical of length 1, whether to print to a progress bar when pushing tasks.
scale
Logical, whether to automatically scale workers to meet demand. See also the
throttle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
In contrast to walk()
, map()
blocks the local R session
and waits for all tasks to complete.
Returns
Invisibly returns a list of mirai
task objects for the
newly created tasks. The order of tasks in the list matches the
order of data in the iterate
argument.
Method map()
Apply a single command to multiple inputs, wait for all tasks to complete, and return the results of all tasks.
Usage
crew_class_controller$map( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_interval = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, error = "stop", warnings = TRUE, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
Arguments
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")
andf(x = 2, y = "b")
, setcommand
tof(x, y)
, and setiterate
tolist(x = c(1, 2), y = c("a", "b"))
. The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])
andf(x = iterate$x[[2]], y = iterate$y[[2]])
. All the elements ofiterate
must have the same length. If there are any name conflicts betweeniterate
anddata
,iterate
takes precedence.data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
. Objects in this list are treated as single values and are held constant for each iteration of the map.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_interval
Deprecated on 2025-01-17 (
crew
version 0.10.2.9003). Instead, theseconds_interval
argument passed tocrew_controller_group()
is used asseconds_max
in acrew_throttle()
object which orchestrates exponential backoff.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).names
Optional character string, name of the element of
iterate
with names for the tasks. Ifnames
is supplied, theniterate[[names]]
must be a character vector.save_command
Deprecated on 2025-01-22 (
crew
version 0.10.2.9004). The command is always saved now.error
Character of length 1, choice of action if a task was not successful. Possible values:
-
"stop"
: throw an error in the main R session instead of returning a value. In case of an error, the results from the last erroredmap()
are in theerror
field of the controller, e.g.controller_object$error
. To reduce memory consumption, setcontroller_object$error <- NULL
after you are finished troubleshooting. -
"warn"
: throw a warning. This allows the return value with all the error messages and tracebacks to be generated. -
"silent"
: do nothing special. NOTE: the only kinds of errors considered here are errors at the R level. A crashed tasks will return a status of"crash"
in the output and not trigger an error inmap()
unlesscrashes_max
is reached.
-
warnings
Logical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verbose
Logical of length 1, whether to print to a progress bar as tasks resolve.
scale
Logical, whether to automatically scale workers to meet demand. See also the
throttle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
map()
cannot be used unless all prior tasks are
completed and popped. You may need to wait and then pop them
manually. Alternatively, you can start over: either call
terminate()
on the current controller object to reset it, or
create a new controller object entirely.
Returns
A tibble
of results and metadata: one row per task
and columns corresponding to the output of pop()
.
Method pop()
Pop a completed task from the results data frame.
Usage
crew_class_controller$pop( scale = TRUE, collect = NULL, throttle = TRUE, error = NULL, controllers = NULL )
Arguments
scale
Logical of length 1, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load. Scaling up onpop()
may be important for transient or nearly transient workers that tend to drop off quickly after doing little work. See also thethrottle
argument.collect
Deprecated in version 0.5.0.9003 (2023-10-02).
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.error
NULL
or character of length 1, choice of action if the popped task threw an error. Possible values:-
"stop"
: throw an error in the main R session instead of returning a value. -
"warn"
: throw a warning. -
NULL
or"silent"
: do not react to errors. NOTE: the only kinds of errors considered here are errors at the R level. A crashed tasks will return a status of"crash"
in the output and not trigger an error inpop()
unlesscrashes_max
is reached.
-
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
If not task is currently completed, pop()
will attempt to auto-scale workers as needed.
Returns
If there is no task to collect, return NULL
. Otherwise,
return a one-row tibble
with the following columns.
-
name
: the task name. -
command
: a character string with the R command. -
result
: a list containing the return value of the R command.NA
if the task failed. -
status
: a character string."success"
if the task succeeded,"cancel"
if the task was canceled with thecancel()
controller method,"crash"
if the worker running the task exited before it could complete the task, or"error"
for any other kind of error. -
error
: the first 2048 characters of the error message if the task status is not"success"
,NA
otherwise. Messages for crashes and cancellations are captured here alongside ordinary R-level errors. -
code
: an integer code denoting the specific exit status:0
for successful tasks,-1
for tasks with an error in the R command of the task, and another positive integer with an NNG status code if there is an error at the NNG/nanonext
level.nanonext::nng_error()
can interpret these codes. -
trace
: the first 2048 characters of the text of the traceback if the task threw an error,NA
otherwise. -
warnings
: the first 2048 characters. of the text of warning messages that the task may have generated,NA
otherwise. -
seconds
: number of seconds that the task ran. -
seed
: the single integer originally supplied topush()
,NA
otherwise. The pseudo-random number generator state just prior to the task can be restored usingset.seed(seed = seed, kind = algorithm)
, whereseed
andalgorithm
are part of this output. -
algorithm
: name of the pseudo-random number generator algorithm originally supplied topush()
,NA
otherwise. The pseudo-random number generator state just prior to the task can be restored usingset.seed(seed = seed, kind = algorithm)
, whereseed
andalgorithm
are part of this output. -
controller
: name of thecrew
controller where the task ran. -
worker
: name of thecrew
worker that ran the task.
Method collect()
Pop all available task results and return them in a tidy
tibble
.
Usage
crew_class_controller$collect( scale = TRUE, throttle = TRUE, error = NULL, controllers = NULL )
Arguments
scale
Logical of length 1, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.error
NULL
or character of length 1, choice of action if the popped task threw an error. Possible values: *"stop"
: throw an error in the main R session instead of returning a value. *"warn"
: throw a warning. *NULL
or"silent"
: do not react to errors. NOTE: the only kinds of errors considered here are errors at the R level. A crashed tasks will return a status of"crash"
in the output and not trigger an error incollect()
unlesscrashes_max
is reached.controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
A tibble
of results and metadata of all resolved tasks,
with one row per task. Returns NULL
if there are no tasks
to collect. See pop()
for details on the columns of the
returned tibble
.
Method promise()
Create a promises::promise()
object to asynchronously
pop or collect one or more tasks.
Usage
crew_class_controller$promise( mode = "one", seconds_interval = 1, scale = NULL, throttle = NULL, controllers = NULL )
Arguments
mode
Character of length 1, what kind of promise to create.
mode
must be"one"
or"all"
. Details:If
mode
is"one"
, then the promise is fulfilled (or rejected) when at least one task is resolved and available topop()
. When that happens,pop()
runs asynchronously, pops a result off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofpop()
(a one-rowtibble
with the result and metadata). If the task threw an error, the error message of the task is forwarded to any error callbacks registered with the promise.If
mode
is"all"
, then the promise is fulfilled (or rejected) when there are no unresolved tasks left in the controller. (Be careful: this condition is trivially met in the moment if the controller is empty and you have not submitted any tasks, so it is best to create this kind of promise only after you submit tasks.) When there are no unresolved tasks left,collect()
runs asynchronously, pops all available results off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofcollect()
(atibble
with one row per task result). If any of the tasks threw an error, then the first error message detected is forwarded to any error callbacks registered with the promise.
seconds_interval
Positive numeric of length 1, delay in the
later::later()
polling interval to asynchronously check if the promise can be resolved.scale
Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()
always turns on auto-scaling in a privatelater
loop (if not already activated).throttle
Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()
always turns on auto-scaling in a privatelater
loop (if not already activated).controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
Please be aware that pop()
or collect()
will happen
asynchronously at a some unpredictable time after the promise object
is created, even if your local R process appears to be doing
something completely different. This behavior is highly desirable
in a Shiny reactive context, but please be careful as it may be
surprising in other situations.
Returns
A promises::promise()
object whose eventual value will
be a tibble
with results from one or more popped tasks.
If mode = "one"
, only one task is popped and returned (one row).
If mode = "all"
, then all the tasks are returned in a tibble
with one row per task (or NULL
is returned if there are no
tasks to pop).
Method wait()
Wait for tasks.
Usage
crew_class_controller$wait( mode = "all", seconds_interval = NULL, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
Arguments
mode
Character of length 1:
"all"
to wait for all tasks to complete,"one"
to wait for a single task to complete.seconds_interval
Deprecated on 2025-01-17 (
crew
version 0.10.2.9003). Instead, theseconds_interval
argument passed tocrew_controller_group()
is used asseconds_max
in acrew_throttle()
object which orchestrates exponential backoff.seconds_timeout
Timeout length in seconds waiting for tasks.
scale
Logical, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load. See also thethrottle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
The wait()
method blocks the calling R session and
repeatedly auto-scales workers for tasks that need them.
The function runs until it either times out or the condition
in mode
is met.
Returns
A logical of length 1, invisibly. TRUE
if the condition
in mode
was met, FALSE
otherwise.
Method push_backlog()
Push the name of a task to the backlog.
Usage
crew_class_controller$push_backlog(name, controller = NULL)
Arguments
name
Character of length 1 with the task name to push to the backlog.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
pop_backlog()
pops the tasks that can be pushed
without saturating the controller.
Returns
NULL
(invisibly).
Method pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
Usage
crew_class_controller$pop_backlog(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Character vector of task names which can be pushed to the
controller without saturating it. If the controller is saturated,
character(0L)
is returned.
Method summary()
Summarize the workers and tasks of the controller.
Usage
crew_class_controller$summary(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
A data frame of summary statistics on the tasks
that ran on a worker and then were returned by pop()
or
collect()
.
It has one row and the following columns:
-
controller
: name of the controller. -
tasks
: number of tasks. -
seconds
: total number of runtime in seconds. -
success
: total number of successful tasks. -
error
: total number of tasks with errors, either in the R code of the task or an NNG-level error that is not a cancellation or crash. -
crash
: total number of crashed tasks (where the worker exited unexpectedly while it was running the task). -
cancel
: total number of tasks interrupted with thecancel()
controller method. -
warnings
: total number of tasks with one or more warnings.
Method cancel()
Cancel one or more tasks.
Usage
crew_class_controller$cancel(names = character(0L), all = FALSE)
Arguments
names
Character vector of names of tasks to cancel. Those names must have been manually supplied by
push()
.all
TRUE
to cancel all tasks,FALSE
otherwise.all = TRUE
supersedes thenames
argument.
Returns
NULL
(invisibly).
Method pids()
Get the process IDs of the local process and the
mirai
dispatcher (if started).
Usage
crew_class_controller$pids(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
An integer vector of process IDs of the local process and the
mirai
dispatcher (if started).
Method terminate()
Terminate the workers and the mirai
client.
Usage
crew_class_controller$terminate(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL
(invisibly).
See Also
Other controller:
crew_controller()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
Controller group class
Description
R6
class for controller groups.
Details
Active bindings
controllers
List of
R6
controller objects.relay
Relay object for event-driven programming on a downstream condition variable.
throttle
crew_throttle()
object to orchestrate exponential backoff in the relay and auto-scaling.
Methods
Public methods
Method new()
Multi-controller constructor.
Usage
crew_class_controller_group$new( controllers = NULL, relay = NULL, throttle = NULL )
Arguments
controllers
List of
R6
controller objects.relay
Relay object for event-driven programming on a downstream condition variable.
throttle
crew_throttle()
object to orchestrate exponential backoff in the relay and auto-scaling.
Returns
An R6
object with the controller group object.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { persistent <- crew_controller_local(name = "persistent") transient <- crew_controller_local( name = "transient", tasks_max = 1L ) group <- crew_controller_group(persistent, transient) group$start() group$push(name = "task", command = sqrt(4), controller = "transient") group$wait() group$pop() group$terminate() }
Method validate()
Validate the client.
Usage
crew_class_controller_group$validate()
Returns
NULL
(invisibly).
Method empty()
See if the controllers are empty.
Usage
crew_class_controller_group$empty(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Details
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push()
.
Returns
TRUE
if all the selected controllers are empty,
FALSE
otherwise.
Method nonempty()
Check if the controller group is nonempty.
Usage
crew_class_controller_group$nonempty(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Details
A controller is empty if it has no running tasks
or completed tasks waiting to be retrieved with push()
.
Returns
TRUE
if the controller is empty, FALSE
otherwise.
Method resolved()
Number of resolved mirai()
tasks.
Usage
crew_class_controller_group$resolved(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Details
resolved()
is cumulative: it counts all the resolved
tasks over the entire lifetime of the controller session.
Returns
Non-negative integer of length 1,
number of resolved mirai()
tasks.
The return value is 0 if the condition variable does not exist
(i.e. if the client is not running).
Method unresolved()
Number of unresolved mirai()
tasks.
Usage
crew_class_controller_group$unresolved(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
Non-negative integer of length 1,
number of unresolved mirai()
tasks.
Method unpopped()
Number of resolved mirai()
tasks available via pop()
.
Usage
crew_class_controller_group$unpopped(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
Non-negative integer of length 1,
number of resolved mirai()
tasks available via pop()
.
Method saturated()
Check if a controller is saturated.
Usage
crew_class_controller_group$saturated( collect = NULL, throttle = NULL, controller = NULL )
Arguments
collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
controller
Character vector of length 1 with the controller name. Set to
NULL
to select the default controller thatpush()
would choose.
Details
A controller is saturated if the number of unresolved tasks
is greater than or equal to the maximum number of workers.
In other words, in a saturated controller, every available worker
has a task.
You can still push tasks to a saturated controller, but
tools that use crew
such as targets
may choose not to.
Returns
TRUE
if all the selected controllers are saturated,
FALSE
otherwise.
Method start()
Start one or more controllers.
Usage
crew_class_controller_group$start(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
NULL
(invisibly).
Method started()
Check whether all the given controllers are started.
Usage
crew_class_controller_group$started(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Details
Actually checks whether all the given clients are started.
Returns
TRUE
if the controllers are started, FALSE
if any are not.
Method launch()
Launch one or more workers on one or more controllers.
Usage
crew_class_controller_group$launch(n = 1L, controllers = NULL)
Arguments
n
Number of workers to launch in each controller selected.
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
NULL
(invisibly).
Method scale()
Automatically scale up the number of workers if needed in one or more controller objects.
Usage
crew_class_controller_group$scale(throttle = TRUE, controllers = NULL)
Arguments
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Details
See the scale()
method in individual controller classes.
Returns
Invisibly returns TRUE
if there was any relevant
auto-scaling activity (new worker launches or worker
connection/disconnection events) (FALSE
otherwise).
Method autoscale()
Run worker auto-scaling in a private later
loop
every controller$client$seconds_interval
seconds.
Usage
crew_class_controller_group$autoscale(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
NULL
(invisibly).
Method descale()
Terminate the auto-scaling loop started by
controller$autoscale()
.
Usage
crew_class_controller_group$descale(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
NULL
(invisibly).
Method crashes()
Report the number of consecutive crashes of a task, summed over all selected controllers in the group.
Usage
crew_class_controller_group$crashes(name, controllers = NULL)
Arguments
name
Character string, name of the task to check.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
See the crashes_max
argument of crew_controller()
.
Returns
Number of consecutive crashes of the named task, summed over all the controllers in the group.
Method push()
Push a task to the head of the task list.
Usage
crew_class_controller_group$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NULL, save_command = NULL, controller = NULL )
Arguments
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the global environment for the task. See the
reset_globals
argument ofcrew_controller_local()
.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).scale
Logical, whether to automatically scale workers to meet demand. See the
scale
argument of thepush()
method of ordinary single controllers.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.name
Character string, name of the task. If
NULL
, a random name is automatically generated. The task name must not conflict with an existing task in the controller where it is submitted. To reuse the name, wait for the existing task to finish, then eitherpop()
orcollect()
it to remove it from its controller.save_command
Deprecated on 2025-01-22 (
crew
version 0.10.2.9004).controller
Character of length 1, name of the controller to submit the task. If
NULL
, the controller defaults to the first controller in the list.
Returns
Invisibly return the mirai
object of the pushed task.
This allows you to interact with the task directly, e.g.
to create a promise object with promises::as.promise()
.
Method walk()
Apply a single command to multiple inputs, and return control to the user without waiting for any task to complete.
Usage
crew_class_controller_group$walk( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
Arguments
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")
andf(x = 2, y = "b")
, setcommand
tof(x, y)
, and setiterate
tolist(x = c(1, 2), y = c("a", "b"))
. The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])
andf(x = iterate$x[[2]], y = iterate$y[[2]])
. All the elements ofiterate
must have the same length. If there are any name conflicts betweeniterate
anddata
,iterate
takes precedence.data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
. Objects in this list are treated as single values and are held constant for each iteration of the map.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).names
Optional character of length 1, name of the element of
iterate
with names for the tasks. Ifnames
is supplied, theniterate[[names]]
must be a character vector.save_command
Deprecated on 2025-01-22 (
crew
version 0.10.2.9004).verbose
Logical of length 1, whether to print to a progress bar when pushing tasks.
scale
Logical, whether to automatically scale workers to meet demand. See also the
throttle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controller
Character of length 1, name of the controller to submit the tasks. If
NULL
, the controller defaults to the first controller in the list.
Details
In contrast to walk()
, map()
blocks the local R session
and waits for all tasks to complete.
Returns
Invisibly returns a list of mirai
task objects for the
newly created tasks. The order of tasks in the list matches the
order of data in the iterate
argument.
Method map()
Apply a single command to multiple inputs.
Usage
crew_class_controller_group$map( command, iterate, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_interval = NULL, seconds_timeout = NULL, names = NULL, save_command = NULL, error = "stop", warnings = TRUE, verbose = interactive(), scale = TRUE, throttle = TRUE, controller = NULL )
Arguments
command
Language object with R code to run.
iterate
Named list of vectors or lists to iterate over. For example, to run function calls
f(x = 1, y = "a")
andf(x = 2, y = "b")
, setcommand
tof(x, y)
, and setiterate
tolist(x = c(1, 2), y = c("a", "b"))
. The individual function calls are evaluated asf(x = iterate$x[[1]], y = iterate$y[[1]])
andf(x = iterate$x[[2]], y = iterate$y[[2]])
. All the elements ofiterate
must have the same length. If there are any name conflicts betweeniterate
anddata
,iterate
takes precedence.data
Named list of constant local data objects in the evaluation environment. Objects in this list are treated as single values and are held constant for each iteration of the map.
globals
Named list of constant objects to temporarily assign to the global environment for each task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
. Objects in this list are treated as single values and are held constant for each iteration of the map.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
, then the random number generator defaults to the recommended widely spaced worker-specific L'Ecuyer streams as supported bymirai::nextstream()
. Seevignette("parallel", package = "parallel")
for details.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_interval
Deprecated on 2025-01-17 (
crew
version 0.10.2.9003). Instead, theseconds_interval
argument passed tocrew_controller_group()
is used asseconds_max
in acrew_throttle()
object which orchestrates exponential backoff.seconds_timeout
Optional task timeout passed to the
.timeout
argument ofmirai::mirai()
(after converting to milliseconds).names
Optional character of length 1, name of the element of
iterate
with names for the tasks. Ifnames
is supplied, theniterate[[names]]
must be a character vector.save_command
Deprecated on 2025-01-22 (
crew
version 0.10.2.9004).error
Character vector of length 1, choice of action if a task has an error. Possible values:
-
"stop"
: throw an error in the main R session instead of returning a value. In case of an error, the results from the last erroredmap()
are in theerror
field of the controller, e.g.controller_object$error
. To reduce memory consumption, setcontroller_object$error <- NULL
after you are finished troubleshooting. -
"warn"
: throw a warning. This allows the return value with all the error messages and tracebacks to be generated. -
"silent"
: do nothing special.
-
warnings
Logical of length 1, whether to throw a warning in the interactive session if at least one task encounters an error.
verbose
Logical of length 1, whether to print progress messages.
scale
Logical, whether to automatically scale workers to meet demand. See also the
throttle
argument.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controller
Character of length 1, name of the controller to submit the tasks. If
NULL
, the controller defaults to the first controller in the list.
Details
The idea comes from functional programming: for example,
the map()
function from the purrr
package.
Returns
A tibble
of results and metadata: one row per task and
columns corresponding to the output of pop()
.
Method pop()
Pop a completed task from the results data frame.
Usage
crew_class_controller_group$pop( scale = TRUE, collect = NULL, throttle = TRUE, error = NULL, controllers = NULL )
Arguments
scale
Logical, whether to automatically scale workers to meet demand. See the
scale
argument of thepop()
method of ordinary single controllers.collect
Deprecated in version 0.5.0.9003 (2023-10-02). Not used.
throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.error
NULL
or character of length 1, choice of action if the popped task threw an error. Possible values:-
"stop"
: throw an error in the main R session instead of returning a value. -
"warn"
: throw a warning. -
NULL
or"silent"
: do not react to errors.
-
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
If there is no task to collect, return NULL
. Otherwise,
return a one-row tibble
with the same columns as pop()
for ordinary controllers.
Method collect()
Pop all available task results and return them in a tidy
tibble
.
Usage
crew_class_controller_group$collect( scale = TRUE, throttle = TRUE, error = NULL, controllers = NULL )
Arguments
scale
Logical of length 1, whether to automatically call
scale()
to auto-scale workers to meet the demand of the task load.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.error
NULL
or character of length 1, choice of action if the popped task threw an error. Possible values:-
"stop"
: throw an error in the main R session instead of returning a value. -
"warn"
: throw a warning. -
NULL
or"silent"
: do not react to errors.
-
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
A tibble
of results and metadata of all resolved tasks,
with one row per task. Returns NULL
if there are no available
results.
Method promise()
Create a promises::promise()
object to asynchronously
pop or collect one or more tasks.
Usage
crew_class_controller_group$promise( mode = "one", seconds_interval = 0.1, scale = NULL, throttle = NULL, controllers = NULL )
Arguments
mode
Character of length 1, what kind of promise to create.
mode
must be"one"
or"all"
. Details:If
mode
is"one"
, then the promise is fulfilled (or rejected) when at least one task is resolved and available topop()
. When that happens,pop()
runs asynchronously, pops a result off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofpop()
(a one-rowtibble
with the result and metadata). If the task threw an error, the error message of the task is forwarded to any error callbacks registered with the promise.If
mode
is"all"
, then the promise is fulfilled (or rejected) when there are no unresolved tasks left in the controller. (Be careful: this condition is trivially met in the moment if the controller is empty and you have not submitted any tasks, so it is best to create this kind of promise only after you submit tasks.) When there are no unresolved tasks left,collect()
runs asynchronously, pops all available results off the task list, and returns a value. If the task succeeded, then the promise is fulfilled and its value is the result ofcollect()
(atibble
with one row per task result). If any of the tasks threw an error, then the first error message detected is forwarded to any error callbacks registered with the promise.
seconds_interval
Positive numeric of length 1, delay in the
later::later()
polling interval to asynchronously check if the promise can be resolved.scale
Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()
always turns on auto-scaling in a privatelater
loop (if not already activated).throttle
Deprecated on 2024-04-10 (version 0.9.1.9003) and no longer used. Now,
promise()
always turns on auto-scaling in a privatelater
loop (if not already activated).controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
Please be aware that pop()
or collect()
will happen
asynchronously at a some unpredictable time after the promise object
is created, even if your local R process appears to be doing
something completely different. This behavior is highly desirable
in a Shiny reactive context, but please be careful as it may be
surprising in other situations.
Returns
A promises::promise()
object whose eventual value will
be a tibble
with results from one or more popped tasks.
If mode = "one"
, only one task is popped and returned (one row).
If mode = "all"
, then all the tasks are returned in a tibble
with one row per task (or NULL
is returned if there are no
tasks to pop).
Method wait()
Wait for tasks.
Usage
crew_class_controller_group$wait( mode = "all", seconds_interval = NULL, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
Arguments
mode
Character of length 1:
"all"
to wait for all tasks in all controllers to complete,"one"
to wait for a single task in a single controller to complete. In this scheme, the timeout limit is applied to each controller sequentially, and a timeout is treated the same as a completed controller.seconds_interval
Deprecated on 2025-01-17 (
crew
version 0.10.2.9003). Instead, theseconds_interval
argument passed tocrew_controller_group()
is used asseconds_max
in acrew_throttle()
object which orchestrates exponential backoff.seconds_timeout
Timeout length in seconds waiting for results to become available.
scale
Logical of length 1, whether to call
scale_later()
on each selected controller to schedule auto-scaling. See thescale
argument of thewait()
method of ordinary single controllers.throttle
TRUE
to skip auto-scaling if it already happened within the lastseconds_interval
seconds.FALSE
to auto-scale every timescale()
is called. Throttling avoids overburdening themirai
dispatcher and other resources.controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Details
The wait()
method blocks the calling R session and
repeatedly auto-scales workers for tasks that need them.
The function runs until it either times out or the condition
in mode
is met.
Returns
A logical of length 1, invisibly. TRUE
if the condition
in mode
was met, FALSE
otherwise.
Method push_backlog()
Push the name of a task to the backlog.
Usage
crew_class_controller_group$push_backlog(name, controller = NULL)
Arguments
name
Character of length 1 with the task name to push to the backlog.
controller
Character vector of length 1 with the controller name. Set to
NULL
to select the default controller thatpush_backlog()
would choose.
Details
pop_backlog()
pops the tasks that can be pushed
without saturating the controller.
Returns
NULL
(invisibly).
Method pop_backlog()
Pop the task names from the head of the backlog which can be pushed without saturating the controller.
Usage
crew_class_controller_group$pop_backlog(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
Character vector of task names which can be pushed to the
controller without saturating it. If the controller is saturated,
character(0L)
is returned.
Method summary()
Summarize the workers of one or more controllers.
Usage
crew_class_controller_group$summary(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
A data frame of aggregated worker summary statistics
of all the selected controllers. It has one row per worker,
and the rows are grouped by controller.
See the documentation of the summary()
method of the controller
class for specific information about the columns in the output.
Method pids()
Get the process IDs of the local process and the
mirai
dispatchers (if started).
Usage
crew_class_controller_group$pids(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
An integer vector of process IDs of the local process and the
mirai
dispatcher (if started).
Method terminate()
Terminate the workers and disconnect the client for one or more controllers.
Usage
crew_class_controller_group$terminate(controllers = NULL)
Arguments
controllers
Character vector of controller names. Set to
NULL
to select all controllers.
Returns
NULL
(invisibly).
See Also
Other controller_group:
crew_controller_group()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
## ------------------------------------------------
## Method `crew_class_controller_group$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
Sequential controller class
Description
R6
class for sequential controllers.
Details
See crew_controller_sequential()
.
Super class
crew::crew_class_controller
-> crew_class_controller_sequential
Methods
Public methods
Inherited methods
crew::crew_class_controller$collect()
crew::crew_class_controller$crashes()
crew::crew_class_controller$empty()
crew::crew_class_controller$initialize()
crew::crew_class_controller$map()
crew::crew_class_controller$nonempty()
crew::crew_class_controller$pids()
crew::crew_class_controller$pop()
crew::crew_class_controller$promise()
crew::crew_class_controller$resolved()
crew::crew_class_controller$saturated()
crew::crew_class_controller$started()
crew::crew_class_controller$summary()
crew::crew_class_controller$terminate()
crew::crew_class_controller$unpopped()
crew::crew_class_controller$unresolved()
crew::crew_class_controller$validate()
crew::crew_class_controller$walk()
Method start()
Start the controller if it is not already started.
Usage
crew_class_controller_sequential$start(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Details
For the sequential controller, there is nothing to do except register the client as started.
Returns
NULL
(invisibly).
Method launch()
Does nothing for the sequential controller.
Usage
crew_class_controller_sequential$launch(n = 1L, controllers = NULL)
Arguments
n
Number of workers to launch.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL
(invisibly).
Method scale()
Does nothing for the sequential controller.
Usage
crew_class_controller_sequential$scale(throttle = TRUE, controllers = NULL)
Arguments
throttle
Not applicable to the sequential controller.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Invisibly returns FALSE
.
Method autoscale()
Not applicable to the sequential controller.
Usage
crew_class_controller_sequential$autoscale(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL
(invisibly).
Method descale()
Not applicable to the sequential controller.
Usage
crew_class_controller_sequential$descale(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL
(invisibly).
Method push()
Push a task to the head of the task list.
Usage
crew_class_controller_sequential$push( command, data = list(), globals = list(), substitute = TRUE, seed = NULL, algorithm = NULL, packages = character(0), library = NULL, seconds_timeout = NULL, scale = TRUE, throttle = TRUE, name = NULL, save_command = NULL, controller = NULL )
Arguments
command
Language object with R code to run.
data
Named list of local data objects in the evaluation environment.
globals
Named list of objects to temporarily assign to the global environment for the task. This list should include any functions you previously defined in the global environment which are required to run tasks. See the
reset_globals
argument ofcrew_controller_local()
.substitute
Logical of length 1, whether to call
base::substitute()
on the supplied value of thecommand
argument. IfTRUE
(default) thencommand
is quoted literally as you write it, e.g.push(command = your_function_call())
. IfFALSE
, thencrew
assumescommand
is a language object and you are passing its value, e.g.push(command = quote(your_function_call()))
.substitute = TRUE
is appropriate for interactive use, whereassubstitute = FALSE
is meant for automated R programs that invokecrew
controllers.seed
Integer of length 1 with the pseudo-random number generator seed to set for the evaluation of the task. Passed to the
seed
argument ofset.seed()
if notNULL
. Ifalgorithm
andseed
are bothNULL
for the sequential controller, then the random number generator defaults to the current RNG of the local R session where the sequential controller lives.algorithm
Integer of length 1 with the pseudo-random number generator algorithm to set for the evaluation of the task. Passed to the
kind
argument ofRNGkind()
if notNULL
. Ifalgorithm
andseed
are bothNULL
for the sequential controller, then the random number generator defaults to the current RNG of the local R session where the sequential controller lives.packages
Character vector of packages to load for the task.
library
Library path to load the packages. See the
lib.loc
argument ofrequire()
.seconds_timeout
Not used in the sequential controller..
scale
Not used in the sequential controller.
throttle
Not used in the sequential controller.
name
Character string, name of the task. If
NULL
, then a random name is generated automatically. The name of the task must not conflict with the name of another task pushed to the controller. Any previous task with the same name must first be popped before a new task with that name can be pushed.save_command
Deprecated on 2025-01-22 (
crew
version 0.10.2.9004) and no longer used.controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Invisibly returns a mirai
-like list where the data
element is the result of the task.
Method wait()
Not applicable to the sequential controller.
Usage
crew_class_controller_sequential$wait( mode = "all", seconds_interval = NULL, seconds_timeout = Inf, scale = TRUE, throttle = TRUE, controllers = NULL )
Arguments
mode
Not applicable to the sequential controller.
seconds_interval
Not applicable to the sequential controller.
seconds_timeout
Not applicable to the sequential controller.
scale
Not applicable to the sequential controller.
throttle
Not applicable to the sequential controller.
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Always returns TRUE
(invisibly)
for the sequential controller.
Method push_backlog()
Not applicable to the sequential controller.
Usage
crew_class_controller_sequential$push_backlog(name, controller = NULL)
Arguments
name
Character of length 1 with the task name to push to the backlog.
controller
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
NULL
(invisibly).
Method pop_backlog()
Not applicable to the sequential controller.
Usage
crew_class_controller_sequential$pop_backlog(controllers = NULL)
Arguments
controllers
Not used. Included to ensure the signature is compatible with the analogous method of controller groups.
Returns
Always character(0L)
for the sequential controller.
Method cancel()
Not applicable to the sequential controller.
Usage
crew_class_controller_sequential$cancel(names = character(0L), all = FALSE)
Arguments
names
Not applicable to the sequential controller.
all
Not applicable to the sequential controller.
See Also
Other sequential controllers:
crew_controller_sequential()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
controller <- crew_controller_sequential()
controller$push(name = "task", command = sqrt(4))
controller$pop()
}
Launcher abstract class
Description
R6
abstract class to build other subclasses
which launch and manage workers.
Active bindings
name
See
crew_launcher()
.workers
See
crew_launcher()
.seconds_interval
See
crew_launcher()
.seconds_timeout
See
crew_launcher()
.seconds_launch
See
crew_launcher()
.seconds_idle
See
crew_launcher()
.seconds_wall
See
crew_launcher()
.tasks_max
See
crew_launcher()
.tasks_timers
See
crew_launcher()
.tls
See
crew_launcher()
.processes
See
crew_launcher()
. asynchronously.r_arguments
See
crew_launcher()
.options_metrics
See
crew_launcher()
.url
Websocket URL for worker connections.
profile
mirai
compute profile of the launcher.instances
Data frame of worker instance information.
id
Integer worker ID from the last call to
settings()
.async
A
crew_async()
object to run low-level launcher tasks asynchronously.throttle
A
crew_throttle()
object to throttle scaling.
Methods
Public methods
Method new()
Launcher constructor.
Usage
crew_class_launcher$new( name = NULL, workers = NULL, seconds_interval = NULL, seconds_timeout = NULL, seconds_launch = NULL, seconds_idle = NULL, seconds_wall = NULL, seconds_exit = NULL, tasks_max = NULL, tasks_timers = NULL, reset_globals = NULL, reset_packages = NULL, reset_options = NULL, garbage_collection = NULL, crashes_error = NULL, launch_max = NULL, tls = NULL, processes = NULL, r_arguments = NULL, options_metrics = NULL )
Arguments
name
See
crew_launcher()
.workers
See
crew_launcher()
.seconds_interval
See
crew_launcher()
.seconds_timeout
See
crew_launcher()
.seconds_launch
See
crew_launcher()
.seconds_idle
See
crew_launcher()
.seconds_wall
See
crew_launcher()
.seconds_exit
See
crew_launcher()
.tasks_max
See
crew_launcher()
.tasks_timers
See
crew_launcher()
.reset_globals
Deprecated. See
crew_launcher()
.reset_packages
Deprecated. See
crew_launcher()
.reset_options
Deprecated. See
crew_launcher()
.garbage_collection
Deprecated. See
crew_launcher()
.crashes_error
See
crew_launcher()
.launch_max
Deprecated.
tls
See
crew_launcher()
.processes
See
crew_launcher()
.r_arguments
See
crew_launcher()
.options_metrics
See
crew_launcher()
.
Returns
An R6
object with the launcher.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local() launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
Method validate()
Validate the launcher.
Usage
crew_class_launcher$validate()
Returns
NULL
(invisibly).
Method poll()
Poll the throttle.
Usage
crew_class_launcher$poll()
Returns
TRUE
to run whatever work comes next, FALSE
to skip
until the appropriate time.
Method settings()
List of arguments for mirai::daemon()
.
Usage
crew_class_launcher$settings()
Returns
List of arguments for mirai::daemon()
.
Method call()
Create a call to crew_worker()
to
help create custom launchers.
Usage
crew_class_launcher$call( worker, socket = NULL, launcher = NULL, instance = NULL )
Arguments
worker
Character string, name of the worker.
socket
Deprecated on 2025-01-28 (
crew
version 1.0.0).launcher
Deprecated on 2025-01-28 (
crew
version 1.0.0).instance
Deprecated on 2025-01-28 (
crew
version 1.0.0).
Returns
Character string with a call to crew_worker()
.
Examples
launcher <- crew_launcher_local() launcher$start(url = "tcp://127.0.0.1:57000", profile = "profile") launcher$call(worker = "worker_name") launcher$terminate()
Method start()
Start the launcher.
Usage
crew_class_launcher$start(url = NULL, profile = NULL, sockets = NULL)
Arguments
url
Character string, websocket URL for worker connections.
profile
Character string,
mirai
compute profile.sockets
Deprecated on 2025-01-28 (
crew
version 1.0.0).
Returns
NULL
(invisibly).
Method terminate()
Terminate the whole launcher, including all workers.
Usage
crew_class_launcher$terminate()
Returns
NULL
(invisibly).
Method resolve()
Resolve asynchronous worker submissions.
Usage
crew_class_launcher$resolve()
Returns
NULL
(invisibly). Throw an error if there were any
asynchronous worker submission errors.'
Method update()
Update worker metadata, resolve asynchronous worker submissions, and terminate lost workers.
Usage
crew_class_launcher$update(status)
Arguments
status
A
mirai
status list.
Returns
NULL
(invisibly).
Method launch()
Launch a worker.
Usage
crew_class_launcher$launch()
Returns
Handle of the launched worker.
Method scale()
Auto-scale workers out to meet the demand of tasks.
Usage
crew_class_launcher$scale(status, throttle = NULL)
Arguments
status
A
mirai
status list with worker and task information.throttle
Deprecated, only used in the controller as of 2025-01-16 (
crew
version 0.10.2.9003).
Returns
Invisibly returns TRUE
if there was any relevant
auto-scaling activity (new worker launches or worker
connection/disconnection events) (FALSE
otherwise).
Method launch_worker()
Abstract worker launch method.
Usage
crew_class_launcher$launch_worker(call, name, launcher, worker)
Arguments
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in the worker and accept tasks.name
Character of length 1 with an informative worker name.
launcher
Character of length 1, name of the launcher.
worker
Positive integer of length 1, index of the worker. This worker index remains the same even when the current instance of the worker exits and a new instance launches. It is always between 1 and the maximum number of concurrent workers.
Details
Launcher plugins will overwrite this method.
Returns
A handle to mock the worker launch.
Method terminate_worker()
Abstract worker termination method.
Usage
crew_class_launcher$terminate_worker(handle)
Arguments
handle
A handle object previously returned by
launch_worker()
which allows the termination of the worker.
Details
Launcher plugins will overwrite this method.
Returns
A handle to mock worker termination.
Method terminate_workers()
Terminate all workers.
Usage
crew_class_launcher$terminate_workers()
Returns
NULL
(invisibly).
Method crashes()
Deprecated on 2025-01-28 (crew
version 1.0.0).
Usage
crew_class_launcher$crashes(index = NULL)
Arguments
index
Unused argument.
Returns
The integer 1, for compatibility.
Method set_name()
Deprecated on 2025-01-28 (crew
version 1.0.0).
Usage
crew_class_launcher$set_name(name)
Arguments
name
Name to set for the launcher.
Returns
NULL
(invisibly).
See Also
Other launcher:
crew_launcher()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local()
launcher$start(url = client$url, profile = client$profile)
launcher$launch()
task <- mirai::mirai("result", .compute = client$profile)
mirai::call_mirai(task)
task$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local()
launcher$start(url = client$url, profile = client$profile)
launcher$launch()
task <- mirai::mirai("result", .compute = client$profile)
mirai::call_mirai(task)
task$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher$call`
## ------------------------------------------------
launcher <- crew_launcher_local()
launcher$start(url = "tcp://127.0.0.1:57000", profile = "profile")
launcher$call(worker = "worker_name")
launcher$terminate()
Local process launcher class
Description
R6
class to launch and manage local process workers.
Details
Super class
crew::crew_class_launcher
-> crew_class_launcher_local
Active bindings
options_local
Methods
Public methods
Inherited methods
crew::crew_class_launcher$call()
crew::crew_class_launcher$crashes()
crew::crew_class_launcher$launch()
crew::crew_class_launcher$poll()
crew::crew_class_launcher$resolve()
crew::crew_class_launcher$scale()
crew::crew_class_launcher$set_name()
crew::crew_class_launcher$settings()
crew::crew_class_launcher$start()
crew::crew_class_launcher$terminate()
crew::crew_class_launcher$terminate_workers()
crew::crew_class_launcher$update()
Method new()
Local launcher constructor.
Usage
crew_class_launcher_local$new( name = NULL, workers = NULL, seconds_interval = NULL, seconds_timeout = NULL, seconds_launch = NULL, seconds_idle = NULL, seconds_wall = NULL, seconds_exit = NULL, tasks_max = NULL, tasks_timers = NULL, crashes_error = NULL, tls = NULL, processes = NULL, r_arguments = NULL, options_metrics = NULL, options_local = NULL )
Arguments
name
See
crew_launcher()
.workers
See
crew_launcher()
.seconds_interval
See
crew_launcher()
.seconds_timeout
See
crew_launcher()
.seconds_launch
See
crew_launcher()
.seconds_idle
See
crew_launcher()
.seconds_wall
See
crew_launcher()
.seconds_exit
See
crew_launcher()
.tasks_max
See
crew_launcher()
.tasks_timers
See
crew_launcher()
.crashes_error
See
crew_launcher()
.tls
See
crew_launcher()
.processes
See
crew_launcher()
.r_arguments
See
crew_launcher()
.options_metrics
options_local
reset_globals
Deprecated. See
crew_launcher()
.reset_packages
Deprecated. See
crew_launcher()
.reset_options
Deprecated. See
crew_launcher()
.garbage_collection
Deprecated. See
crew_launcher()
.
Returns
An R6
object with the local launcher.
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) { client <- crew_client() client$start() launcher <- crew_launcher_local(name = client$name) launcher$start(url = client$url, profile = client$profile) launcher$launch() task <- mirai::mirai("result", .compute = client$profile) mirai::call_mirai(task) task$data client$terminate() }
Method validate()
Validate the local launcher.
Usage
crew_class_launcher_local$validate()
Returns
NULL
(invisibly).
Method launch_worker()
Launch a local process worker which will dial into a socket.
Usage
crew_class_launcher_local$launch_worker(call, name, launcher, worker)
Arguments
call
Character of length 1 with a namespaced call to
crew_worker()
which will run in the worker and accept tasks.name
Character of length 1 with a long informative worker name which contains the
launcher
andworker
arguments described below.launcher
Character of length 1, name of the launcher.
worker
Character string, name of the worker within the launcher.
Details
The call
argument is R code that will run to
initiate the worker. Together, the launcher
, worker
,
and instance
arguments are useful for
constructing informative job names.
Returns
A handle object to allow the termination of the worker later on.
Method terminate_worker()
Terminate a local process worker.
Usage
crew_class_launcher_local$terminate_worker(handle)
Arguments
handle
A process handle object previously returned by
launch_worker()
.
Returns
A list with the process ID of the worker.
See Also
Other plugin_local:
crew_controller_local()
,
crew_launcher_local()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(url = client$url, profile = client$profile)
launcher$launch()
task <- mirai::mirai("result", .compute = client$profile)
mirai::call_mirai(task)
task$data
client$terminate()
}
## ------------------------------------------------
## Method `crew_class_launcher_local$new`
## ------------------------------------------------
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(url = client$url, profile = client$profile)
launcher$launch()
task <- mirai::mirai("result", .compute = client$profile)
mirai::call_mirai(task)
task$data
client$terminate()
}
Local monitor class
Description
Local monitor R6
class
Details
See crew_monitor_local()
.
Methods
Public methods
Method dispatchers()
List the process IDs of the running mirai
dispatcher
processes.
Usage
crew_class_monitor_local$dispatchers(user = ps::ps_username())
Arguments
user
Character of length 1, user ID to filter on.
NULL
to list processes of all users (not recommended).
Returns
Integer vector of process IDs of the running mirai
dispatcher
processes.
Method daemons()
List the process IDs of the locally running mirai
daemon
processes which are not crew
workers. The crew_async()
object can launch such processes: for example, when a positive
integer is supplied to the processes
argument of e.g.
crew.aws.batch::crew_controller_aws_batch()
.
Usage
crew_class_monitor_local$daemons(user = ps::ps_username())
Arguments
user
Character of length 1, user ID to filter on.
NULL
to list processes of all users (not recommended).
Returns
Integer vector of process IDs of the locally running
mirai
daemon processes which are not crew
workers.
Method workers()
List the process IDs of locally running crew
workers
launched by the local controller (crew_controller_local()
).
Usage
crew_class_monitor_local$workers(user = ps::ps_username())
Arguments
user
Character of length 1, user ID to filter on.
NULL
to list processes of all users (not recommended).
Details
Only the workers running on your local computer are listed.
Workers that are not listed include jobs on job schedulers like
SLURM or jobs on cloud services like AWS Batch. To monitor
those worker processes, please consult the monitor objects in
the relevant third-party launcher plugins such as crew.cluster
and crew.aws.batch
.
Returns
Integer vector of process IDs of locally running crew
workers
launched by the local controller (crew_controller_local()
).
Method terminate()
Terminate the given process IDs.
Usage
crew_class_monitor_local$terminate(pids)
Arguments
pids
Integer vector of process IDs of local processes to terminate.
Details
Termination happens with the operating system signal
given by crew_terminate_signal()
.
Returns
NULL
(invisibly).
See Also
Other monitor:
crew_monitor_local()
R6
queue class
Description
R6
class for a queue.
Details
See the Details section of crew_queue()
.
The R6
crew
queue class is not portable (for efficiency),
so other packages should not inherit from it.
The reason for non-portability is efficiency: elements can be
directly accessed without self$
or private$
, and they can be
directly modified with <<-
.
This is especially important for push()
because
envir$vector[slice] <- x
copies the entire vector in memory,
which has O(n^2) complexity and is extremely slow for large vectors.
Active bindings
data
See
crew_queue()
.head
Non-negative integer pointing to the location of the next element to pop.
tail
Non-negative integer pointing to the tail of the queue.
step
See
crew_queue()
.
Methods
Public methods
Method new()
Create a queue object.
Usage
crew_class_queue$new(data = NULL, step = NULL)
Arguments
data
See
crew_queue()
.step
See
crew_queue()
.
Returns
A queue object.
Method validate()
Validate the queue.
Usage
crew_class_queue$validate()
Returns
NULL
(invisibly). Called for its side effects.
Method empty()
Check if the queue is empty.
Usage
crew_class_queue$empty()
Returns
TRUE
if the queue is empty, FALSE
otherwise.
Method nonempty()
Check if the queue is empty.
Usage
crew_class_queue$nonempty()
Returns
TRUE
if the queue is nonempty, FALSE
otherwise.
Method list()
List available data.
Usage
crew_class_queue$list()
Returns
Character vector of available data.
Method reset()
Reset the queue.
Usage
crew_class_queue$reset()
Returns
NULL
(invisibly). Called for its side effects.
Method clean()
Remove popped elements from the data in the queue.
Usage
crew_class_queue$clean()
Returns
NULL
(invisibly).
Method set()
Set the data in the queue.
Usage
crew_class_queue$set(data = character(0L))
Arguments
data
Character vector of data to set.
Returns
NULL
(invisibly). Called for its side effects.
Method extend()
Extend the queue data by step
elements.
Usage
crew_class_queue$extend(n)
Arguments
n
Positive integer, number of elements to extend the queue data.
Returns
NULL
(invisibly).
Method push()
Append new elements to the queue.
Usage
crew_class_queue$push(x)
Arguments
x
Character vector of new data to append.
Details
push()
is the reason the queue class is not portable.
According to R6 documentation,
members of non-portable classes
can be accessed without self$
or private$
,
and assignment can be done with <<-
.
In the case of push()
, this prevents each assignment from
deep-copying the entire contents of the vector.
Returns
NULL
(invisibly).
Method pop()
Pop one or more elements off the queue.
Usage
crew_class_queue$pop(n = 1L)
Arguments
n
Positive integer, maximum number of elements to pop. Fewer than
n
are popped if fewer thann
are available.
Returns
Character vector of elements popped off the queue.
NULL
if there are no more elements available to pop.
Method collect()
Remove and return all available elements off the queue.
Usage
crew_class_queue$collect()
Returns
Character vector, elements collected from the queue.
NULL
if there are no more elements available to collect.
See Also
Other queue:
crew_queue()
Examples
crew_queue()
R6
relay class.
Description
R6
class for relay configuration.
Details
See crew_relay()
.
Active bindings
condition
Main condition variable.
from
Condition variable to relay from.
to
Condition variable to relay to.
throttle
A
crew_throttle()
object forwait()
.
Methods
Public methods
Method new()
Relay constructor.
Usage
crew_class_relay$new(throttle)
Arguments
throttle
A
crew_throttle()
object.
Returns
A crew_relay()
object.
Method validate()
Validate the object.
Usage
crew_class_relay$validate()
Returns
NULL
(invisibly).
Method start()
Start the relay object.
Usage
crew_class_relay$start()
Returns
NULL
(invisibly).
Method terminate()
Terminate the relay object.
Usage
crew_class_relay$terminate()
Returns
NULL
(invisibly).
Method set_from()
Set the condition variable to relay from.
Usage
crew_class_relay$set_from(from)
Arguments
from
Condition variable to relay from.
Returns
NULL
(invisibly).
Method set_to()
Set the condition variable to relay to.
Usage
crew_class_relay$set_to(to)
Arguments
to
Condition variable to relay to.
Returns
NULL
(invisibly).
Method wait()
Wait until an unobserved task resolves or the timeout is reached. Use the throttle to determine the waiting time.
Usage
crew_class_relay$wait()
Returns
NULL
(invisibly).
See Also
Other relay:
crew_relay()
Examples
crew_relay()
R6
throttle class.
Description
R6
class for throttle configuration.
Details
See crew_throttle()
.
Active bindings
seconds_max
See
crew_throttle()
.seconds_min
See
crew_throttle()
.seconds_start
See
crew_throttle()
.base
See
crew_throttle()
.seconds_interval
Current wait time interval.
polled
Positive numeric of length 1, millisecond timestamp of the last time
poll()
returnedTRUE
.NULL
ifpoll()
was never called on the current object.
Methods
Public methods
Method new()
Throttle constructor.
Usage
crew_class_throttle$new( seconds_max = NULL, seconds_min = NULL, seconds_start = NULL, base = NULL )
Arguments
seconds_max
See
crew_throttle()
.seconds_min
See
crew_throttle()
.seconds_start
See
crew_throttle()
.base
See
crew_throttle()
.
Returns
An R6
object with throttle configuration.
Examples
throttle <- crew_throttle(seconds_max = 1) throttle$poll() throttle$poll()
Method validate()
Validate the object.
Usage
crew_class_throttle$validate()
Returns
NULL
(invisibly).
Method poll()
Poll the throttler.
Usage
crew_class_throttle$poll()
Returns
TRUE
if poll()
did not return TRUE
in the last
max
seconds, FALSE
otherwise.
Method accelerate()
Divide seconds_interval
by base
.
Usage
crew_class_throttle$accelerate()
Returns
NULL
(invisibly). Called for its side effects.
Method decelerate()
Multiply seconds_interval
by base
.
Usage
crew_class_throttle$decelerate()
Returns
NULL
(invisibly). Called for its side effects.
Method reset()
Reset the throttle object so the next poll()
returns
TRUE
, and reset the wait time interval to its initial value.
Usage
crew_class_throttle$reset()
Returns
NULL
(invisibly).
Method update()
Reset the throttle when there is activity and decelerate it gradually when there is no activity.
Usage
crew_class_throttle$update(activity)
Arguments
activity
TRUE
if there is activity,FALSE
otherwise.
Returns
NULL
(invisibly).
See Also
Other throttle:
crew_throttle()
Examples
throttle <- crew_throttle(seconds_max = 1)
throttle$poll()
throttle$poll()
## ------------------------------------------------
## Method `crew_class_throttle$new`
## ------------------------------------------------
throttle <- crew_throttle(seconds_max = 1)
throttle$poll()
throttle$poll()
R6
TLS class.
Description
R6
class for TLS configuration.
Details
See crew_tls()
.
Active bindings
mode
See
crew_tls()
.key
See
crew_tls()
.password
See
crew_tls()
.certificates
See
crew_tls()
.
Methods
Public methods
Method new()
TLS configuration constructor.
Usage
crew_class_tls$new( mode = NULL, key = NULL, password = NULL, certificates = NULL )
Arguments
mode
Argument passed from
crew_tls()
.key
Argument passed from
crew_tls()
.password
Argument passed from
crew_tls()
.certificates
Argument passed from
crew_tls()
.
Returns
An R6
object with TLS configuration.
Examples
crew_tls(mode = "automatic")
Method validate()
Validate the object.
Usage
crew_class_tls$validate(test = TRUE)
Arguments
test
Logical of length 1, whether to test the TLS configuration with
nanonext::tls_config()
.
Returns
NULL
(invisibly).
Method client()
TLS credentials for the crew
client.
Usage
crew_class_tls$client()
Returns
NULL
or character vector, depending on the mode.
Method worker()
TLS credentials for crew
workers.
Usage
crew_class_tls$worker(profile)
Arguments
profile
Character of length 1 with the
mirai
compute profile.
Returns
NULL
or character vector, depending on the mode.
Method url()
Form the URL for crew
worker connections.
Usage
crew_class_tls$url(host, port)
Arguments
host
Character string with the host name or IP address.
port
Non-negative integer with the port number (0 to let NNG select a random ephemeral port).
Returns
Character string with the URL.
See Also
Other tls:
crew_tls()
Examples
crew_tls(mode = "automatic")
## ------------------------------------------------
## Method `crew_class_tls$new`
## ------------------------------------------------
crew_tls(mode = "automatic")
Terminate dispatchers and/or workers
Description
Terminate mirai
dispatchers and/or crew
workers
which may be lingering from previous workloads.
Usage
crew_clean(
dispatchers = TRUE,
workers = TRUE,
user = ps::ps_username(),
seconds_interval = 1,
seconds_timeout = 60,
verbose = TRUE
)
Arguments
dispatchers |
Logical of length 1, whether to terminate dispatchers. |
workers |
Logical of length 1, whether to terminate workers. |
user |
Character of length 1. Terminate dispatchers and/or workers associated with this user name. |
seconds_interval |
Seconds to between polling intervals waiting for a process to exit. |
seconds_timeout |
Seconds to wait for a process to exit. |
verbose |
Logical of length 1, whether to print an informative message every time a process is terminated. |
Details
Behind the scenes, mirai
uses an external R process
called a "dispatcher" to send tasks to crew
workers.
This dispatcher usually shuts down when you terminate the controller
or quit your R session, but sometimes it lingers. Likewise,
sometimes crew
workers do not shut down on their own.
The crew_clean()
function searches the process table on your
local machine and manually terminates any mirai
dispatchers
and crew
workers associated with your user name (or the
user name you select in the user
argument.
Unfortunately, it cannot reach remote workers such as those
launched by a crew.cluster
controller.
Value
NULL
(invisibly). If verbose
is TRUE
, it does
print out a message for every terminated process.
See Also
Other utility:
crew_assert()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
crew_clean()
}
Create a client object.
Description
Create an R6
wrapper object to manage the mirai
client.
Usage
crew_client(
name = NULL,
workers = NULL,
host = NULL,
port = NULL,
tls = crew::crew_tls(),
tls_enable = NULL,
tls_config = NULL,
serialization = NULL,
seconds_interval = 1,
seconds_timeout = 60,
retry_tasks = NULL
)
Arguments
name |
Deprecated on 2025-01-14 ( |
workers |
Deprecated on 2025-01-13 ( |
host |
IP address of the |
port |
TCP port to listen for the workers. If |
tls |
A TLS configuration object from |
tls_enable |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
tls_config |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
serialization |
Either |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete,
such as checking |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
retry_tasks |
Deprecated on 2025-01-13 ( |
See Also
Other client:
crew_class_client
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
client$summary()
client$terminate()
}
Create a controller object from a client and launcher.
Description
This function is for developers of crew
launcher plugins.
Users should use a specific controller helper such as
crew_controller_local()
.
Usage
crew_controller(
client,
launcher,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_max = 5L,
backup = NULL,
auto_scale = NULL
)
Arguments
client |
An |
launcher |
An |
reset_globals |
|
reset_packages |
|
reset_options |
|
garbage_collection |
|
crashes_max |
In rare cases, a worker may exit unexpectedly
before it completes its current task. If this happens,
|
backup |
An optional Limitations of |
auto_scale |
Deprecated. Use the |
See Also
Other controller:
crew_class_controller
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
launcher <- crew_launcher_local()
controller <- crew_controller(client = client, launcher = launcher)
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
Create a controller group.
Description
Create an R6
object to submit tasks and launch workers
through multiple crew
controllers.
Usage
crew_controller_group(..., seconds_interval = 1)
Arguments
... |
|
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete,
such as checking |
See Also
Other controller_group:
crew_class_controller_group
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
persistent <- crew_controller_local(name = "persistent")
transient <- crew_controller_local(
name = "transient",
tasks_max = 1L
)
group <- crew_controller_group(persistent, transient)
group$start()
group$push(name = "task", command = sqrt(4), controller = "transient")
group$wait()
group$pop()
group$terminate()
}
Create a controller with a local process launcher.
Description
Create an R6
object to submit tasks and
launch workers on local processes.
Usage
crew_controller_local(
name = NULL,
workers = 1L,
host = "127.0.0.1",
port = NULL,
tls = crew::crew_tls(),
tls_enable = NULL,
tls_config = NULL,
serialization = NULL,
seconds_interval = 1,
seconds_timeout = 60,
seconds_launch = 30,
seconds_idle = 300,
seconds_wall = Inf,
seconds_exit = NULL,
retry_tasks = NULL,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE,
crashes_error = NULL,
launch_max = NULL,
r_arguments = c("--no-save", "--no-restore"),
crashes_max = 5L,
backup = NULL,
options_metrics = crew::crew_options_metrics(),
options_local = crew::crew_options_local(),
local_log_directory = NULL,
local_log_join = NULL
)
Arguments
name |
Character string, name of the launcher. If the name is
|
workers |
Maximum number of workers to run concurrently
when auto-scaling, excluding task retries and manual calls to |
host |
IP address of the |
port |
TCP port to listen for the workers. If |
tls |
A TLS configuration object from |
tls_enable |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
tls_config |
Deprecated on 2023-09-15 in version 0.4.1.
Use argument |
serialization |
Either |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete. In certain cases, exponential
backoff is used with this argument passed to |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
seconds_launch |
Seconds of startup time to allow.
A worker is unconditionally assumed to be alive
from the moment of its launch until |
seconds_idle |
Maximum number of seconds that a worker can idle
since the completion of the last task. If exceeded, the worker exits.
But the timer does not launch until |
seconds_wall |
Soft wall time in seconds.
The timer does not launch until |
seconds_exit |
Deprecated on 2023-09-21 in version 0.5.0.9002. No longer necessary. |
retry_tasks |
Deprecated on 2025-01-13 ( |
tasks_max |
Maximum number of tasks that a worker will do before
exiting. See the |
tasks_timers |
Number of tasks to do before activating
the timers for |
reset_globals |
|
reset_packages |
|
reset_options |
|
garbage_collection |
|
crashes_error |
Deprecated on 2025-01-13 ( |
launch_max |
Deprecated on 2024-11-04 ( |
r_arguments |
Optional character vector of command line arguments
to pass to |
crashes_max |
In rare cases, a worker may exit unexpectedly
before it completes its current task. If this happens,
|
backup |
An optional Limitations of |
options_metrics |
Either |
options_local |
An object generated by |
local_log_directory |
Deprecated on 2024-10-08. Use
|
local_log_join |
Deprecated on 2024-10-08. Use
|
See Also
Other plugin_local:
crew_class_launcher_local
,
crew_launcher_local()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
controller <- crew_controller_local()
controller$start()
controller$push(name = "task", command = sqrt(4))
controller$wait()
controller$pop()
controller$terminate()
}
Create a sequential controller.
Description
The sequential controller runs tasks on the same R process where the controller object exists. Tasks run sequentially rather than in parallel.
Usage
crew_controller_sequential()
See Also
Other sequential controllers:
crew_class_controller_sequential
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
controller <- crew_controller_sequential()
controller$push(name = "task", command = sqrt(4))
controller$pop()
}
Deprecate a crew
feature.
Description
Show an informative warning when a crew
feature is
deprecated.
Usage
crew_deprecate(
name,
date,
version,
alternative,
condition = "warning",
value = "x",
skip_cran = FALSE,
frequency = "always"
)
Arguments
name |
Name of the feature (function or argument) to deprecate. |
date |
Date of deprecation. |
version |
Package version when deprecation was instated. |
alternative |
Message about an alternative. |
condition |
Either "warning" or "message" to indicate the type of condition thrown on deprecation. |
value |
Value of the object. Deprecation is skipped
if |
skip_cran |
Logical of length 1, whether to skip the deprecation warning or message on CRAN. |
frequency |
Character of length 1, passed to the |
Value
NULL
(invisibly). Throws a warning if a feature is deprecated.
See Also
Other utility:
crew_assert()
,
crew_clean()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
Examples
suppressWarnings(
crew_deprecate(
name = "auto_scale",
date = "2023-05-18",
version = "0.2.0",
alternative = "use the scale argument of push(), pop(), and wait()."
)
)
Evaluate an R command and return results as a monad.
Description
Not a user-side function. Do not call directly.
Usage
crew_eval(
command,
name,
data = list(),
globals = list(),
seed = NULL,
algorithm = NULL,
packages = character(0),
library = NULL,
reset_globals = TRUE,
reset_packages = FALSE,
reset_options = FALSE,
garbage_collection = FALSE
)
Arguments
command |
Language object with R code to run. |
name |
Character of length 1, name of the task. |
data |
Named list of local data objects in the evaluation environment. |
globals |
Named list of objects to temporarily assign to the global environment for the task. |
seed |
Integer of length 1 with the pseudo-random number generator
seed to set for the evaluation of the task. Passed to the
|
algorithm |
Integer of length 1 with the pseudo-random number
generator algorithm to set for the evaluation of the task.
Passed to the |
packages |
Character vector of packages to load for the task. |
library |
Library path to load the packages. See the |
reset_globals |
|
reset_packages |
|
reset_options |
|
garbage_collection |
|
Details
The crew_eval()
function evaluates an R expression
in an encapsulated environment and returns a monad with the results,
including warnings and error messages if applicable.
The random number generator seed, globals
, and global options
are restored to their original values on exit.
Value
A monad object with results and metadata.
See Also
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
Examples
crew_eval(quote(1 + 1), name = "task_name")
Run an asynchronous task in the crew launcher.
Description
Called internally, not for users.
Usage
crew_eval_async(
command,
data = list(),
packages = character(0L),
library = NULL
)
Arguments
command |
Language object with R code to run. |
data |
Named list of objects that |
packages |
Character vector of packages to load. |
library |
Character vector of library paths to load the packages from. |
Value
The result of running command
.
Create an abstract launcher.
Description
This function is useful for inheriting argument documentation
in functions that create custom third-party launchers. See
@inheritParams crew::crew_launcher
in the source code file of
crew_launcher_local()
.
Usage
crew_launcher(
name = NULL,
workers = 1L,
seconds_interval = 1,
seconds_timeout = 60,
seconds_launch = 30,
seconds_idle = 300,
seconds_wall = Inf,
seconds_exit = NULL,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
crashes_error = NULL,
launch_max = NULL,
tls = crew::crew_tls(),
processes = NULL,
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics()
)
Arguments
name |
Character string, name of the launcher. If the name is
|
workers |
Maximum number of workers to run concurrently
when auto-scaling, excluding task retries and manual calls to |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete. In certain cases, exponential
backoff is used with this argument passed to |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
seconds_launch |
Seconds of startup time to allow.
A worker is unconditionally assumed to be alive
from the moment of its launch until |
seconds_idle |
Maximum number of seconds that a worker can idle
since the completion of the last task. If exceeded, the worker exits.
But the timer does not launch until |
seconds_wall |
Soft wall time in seconds.
The timer does not launch until |
seconds_exit |
Deprecated on 2023-09-21 in version 0.5.0.9002. No longer necessary. |
tasks_max |
Maximum number of tasks that a worker will do before
exiting. See the |
tasks_timers |
Number of tasks to do before activating
the timers for |
reset_globals |
Deprecated on 2025-05-30 ( |
reset_packages |
Deprecated on 2025-05-30 ( |
reset_options |
Deprecated on 2025-05-30 ( |
garbage_collection |
Deprecated on 2025-05-30
( |
crashes_error |
Deprecated on 2025-01-13 ( |
launch_max |
Deprecated on 2024-11-04 ( |
tls |
A TLS configuration object from |
processes |
|
r_arguments |
Optional character vector of command line arguments
to pass to |
options_metrics |
Either |
See Also
Other launcher:
crew_class_launcher
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local()
launcher$start(url = client$url, profile = client$profile)
launcher$launch()
task <- mirai::mirai("result", .compute = client$profile)
mirai::call_mirai(task)
task$data
client$terminate()
}
Create a launcher with local process workers.
Description
Create an R6
object to launch and maintain
local process workers.
Usage
crew_launcher_local(
name = NULL,
workers = 1L,
seconds_interval = 1,
seconds_timeout = 60,
seconds_launch = 30,
seconds_idle = Inf,
seconds_wall = Inf,
seconds_exit = NULL,
tasks_max = Inf,
tasks_timers = 0L,
reset_globals = NULL,
reset_packages = NULL,
reset_options = NULL,
garbage_collection = NULL,
crashes_error = NULL,
launch_max = NULL,
tls = crew::crew_tls(),
r_arguments = c("--no-save", "--no-restore"),
options_metrics = crew::crew_options_metrics(),
options_local = crew::crew_options_local(),
local_log_directory = NULL,
local_log_join = NULL
)
Arguments
name |
Character string, name of the launcher. If the name is
|
workers |
Maximum number of workers to run concurrently
when auto-scaling, excluding task retries and manual calls to |
seconds_interval |
Number of seconds between
polling intervals waiting for certain internal
synchronous operations to complete. In certain cases, exponential
backoff is used with this argument passed to |
seconds_timeout |
Number of seconds until timing
out while waiting for certain synchronous operations to complete,
such as checking |
seconds_launch |
Seconds of startup time to allow.
A worker is unconditionally assumed to be alive
from the moment of its launch until |
seconds_idle |
Maximum number of seconds that a worker can idle
since the completion of the last task. If exceeded, the worker exits.
But the timer does not launch until |
seconds_wall |
Soft wall time in seconds.
The timer does not launch until |
seconds_exit |
Deprecated on 2023-09-21 in version 0.5.0.9002. No longer necessary. |
tasks_max |
Maximum number of tasks that a worker will do before
exiting. See the |
tasks_timers |
Number of tasks to do before activating
the timers for |
reset_globals |
Deprecated on 2025-05-30 ( |
reset_packages |
Deprecated on 2025-05-30 ( |
reset_options |
Deprecated on 2025-05-30 ( |
garbage_collection |
Deprecated on 2025-05-30
( |
crashes_error |
Deprecated on 2025-01-13 ( |
launch_max |
Deprecated on 2024-11-04 ( |
tls |
A TLS configuration object from |
r_arguments |
Optional character vector of command line arguments
to pass to |
options_metrics |
Either |
options_local |
An object generated by |
local_log_directory |
Deprecated on 2024-10-08. Use
|
local_log_join |
Deprecated on 2024-10-08. Use
|
See Also
Other plugin_local:
crew_class_launcher_local
,
crew_controller_local()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
client <- crew_client()
client$start()
launcher <- crew_launcher_local(name = client$name)
launcher$start(url = client$url, profile = client$profile)
launcher$launch()
task <- mirai::mirai("result", .compute = client$profile)
mirai::call_mirai(task)
task$data
client$terminate()
}
Create a local monitor object.
Description
Create an R6
object to monitor local processes created by
crew
and mirai
.
Usage
crew_monitor_local()
See Also
Other monitor:
crew_class_monitor_local
Local crew
launcher options.
Description
Options for the local crew
launcher.
Usage
crew_options_local(log_directory = NULL, log_join = TRUE)
Arguments
log_directory |
Either |
log_join |
Logical of length 1. If |
Value
A classed list of options for the local launcher.
See Also
Other options:
crew_options_metrics()
Examples
crew_options_local()
Options for logging resource usage metrics.
Description
crew_options_metrics()
configures the
crew
to record resource usage metrics (such as CPU and memory usage)
for each running worker.
To be activate resource usage logging,
the autometric
R package version 0.1.0 or higher
must be installed.
Logging happens in the background (through a detached POSIX)
so as not to disrupt
the R session. On Unix-like systems, crew_options_metrics()
can specify /dev/stdout
or /dev/stderr
as the log files, which will
redirect output to existing logs you are already using.
autometric::log_read()
and autometric::log_plot()
can read and
visualize resource usage data from multiple log files, even
if those files are mixed with other messages.
Usage
crew_options_metrics(path = NULL, seconds_interval = 5)
Arguments
path |
Where to write resource metric log entries for workers.
After running enough tasks in |
seconds_interval |
Positive number, seconds between resource metric
log entries written to |
Value
A classed list of options for logging resource usage metrics.
See Also
Other options:
crew_options_local()
Examples
crew_options_metrics()
Create a crew
queue object.
Description
Create an R6
crew
queue object.
Usage
crew_queue(data = character(0L), step = 1000L)
Arguments
data |
Character vector of initial queue data. |
step |
Positive integer with the number of elements to extend the
queue on each call to the |
Details
A crew
queue is a classical first-in-first-out data structure
that extends itself in chunks (of size step
) to avoid
overhead.
crew
uses queues to efficiently track the names of resolved
tasks and backlogged tasks.
Value
A queue object.
See Also
Other queue:
crew_class_queue
Random name
Description
Generate a random string that can be used as a name for a worker or task.
Usage
crew_random_name(n = 12L)
Arguments
n |
Number of bytes of information in the random string
hashed to generate the name. Larger |
Details
The randomness is not reproducible and cannot be set with
e.g. set.seed()
in R.
Value
A random character string.
See Also
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
Examples
crew_random_name()
Create a crew
relay object.
Description
Create an R6
crew
relay object.
Usage
crew_relay(throttle = crew_throttle())
Arguments
throttle |
A |
Details
A crew
relay object keeps the signaling relationships
among condition variables.
Value
An R6
crew
relay object.
See Also
Other relay:
crew_class_relay
Examples
crew_relay()
Retry code.
Description
Repeatedly retry a function while it keeps returning FALSE
and exit the loop when it returns TRUE
Usage
crew_retry(
fun,
args = list(),
seconds_interval = 1,
seconds_timeout = 60,
max_tries = Inf,
error = TRUE,
message = character(0),
envir = parent.frame(),
assertions = TRUE
)
Arguments
fun |
Function that returns |
args |
A named list of arguments to |
seconds_interval |
Nonnegative numeric of length 1,
number of seconds to wait between calls to |
seconds_timeout |
Nonnegative numeric of length 1, number of seconds to loop before timing out. |
max_tries |
Maximum number of calls to |
error |
Whether to throw an error on a timeout or max tries. |
message |
Character of length 1, optional error message if the wait times out. |
envir |
Environment to evaluate |
assertions |
|
Value
NULL
(invisibly).
See Also
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_terminate_process()
,
crew_terminate_signal()
,
crew_worker()
Examples
crew_retry(fun = function() TRUE)
Manually terminate a local process.
Description
Manually terminate a local process.
Usage
crew_terminate_process(pid)
Arguments
pid |
Integer of length 1, process ID to terminate. |
Value
NULL
(invisibly).
See Also
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_signal()
,
crew_worker()
Examples
if (identical(Sys.getenv("CREW_EXAMPLES"), "true")) {
process <- processx::process$new("sleep", "60")
process$is_alive()
crew_terminate_process(pid = process$get_pid())
process$is_alive()
}
Get the termination signal.
Description
Get a supported operating system signal for terminating a local process.
Usage
crew_terminate_signal()
Value
An integer of length 1: tools::SIGTERM
if your platform
supports SIGTERM
. If not, then crew_crew_terminate_signal()()
checks
SIGQUIT
, then SIGINT
, then SIGKILL
, and then returns the first
signal it finds that your operating system can use.
See Also
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_worker()
Examples
crew_terminate_signal()
Create a stateful throttling object.
Description
Create an R6
object for throttling.
Usage
crew_throttle(
seconds_max = 1,
seconds_min = 0.001,
seconds_start = seconds_min,
base = 2
)
Arguments
seconds_max |
Positive numeric scalar, maximum throttling interval |
seconds_min |
Positive numeric scalar, minimum throttling interval. |
seconds_start |
Positive numeric scalar,
the initial wait time interval in seconds.
The default is |
base |
Numeric scalar greater than 1, base of the exponential
backoff algorithm. |
Details
Throttling is a technique that limits how often a function is
called in a given period of time. crew_throttle()
objects support
the throttle
argument of controller methods, which ensures auto-scaling
does not induce superfluous overhead.
The throttle uses deterministic exponential backoff algorithm
(https://en.wikipedia.org/wiki/Exponential_backoff) which
increases wait times when there is nothing to do and decreases
wait times when there is something to do. The controller decreases
or increases the wait time with methods accelerate()
and decelerate()
in the throttle object, respectively,
by dividing or multiplying by base
(but keeping the wait time
between seconds_min
and seconds_max
).
In practice, crew
calls reset()
instead of update()
in order to respond quicker to surges of activity (see the
update()
method).
Value
An R6
object with throttle configuration settings and methods.
See Also
Other throttle:
crew_class_throttle
Examples
throttle <- crew_throttle(seconds_max = 1)
throttle$poll()
throttle$poll()
Configure TLS.
Description
Create an R6
object with transport layer security (TLS)
configuration for crew
.
Usage
crew_tls(
mode = "none",
key = NULL,
password = NULL,
certificates = NULL,
validate = TRUE
)
Arguments
mode |
Character of length 1. Must be one of the following:
|
key |
If |
password |
If |
certificates |
If |
validate |
Logical of length 1, whether to validate the configuration
object on creation. If |
Details
crew_tls()
objects are input to the tls
argument of
crew_client()
, crew_controller_local()
, etc.
See https://wlandau.github.io/crew/articles/risks.html for details.
Value
An R6
object with TLS configuration settings and methods.
See Also
Other tls:
crew_class_tls
Examples
crew_tls(mode = "automatic")
Crew worker.
Description
Launches a crew
worker which runs a mirai
daemon.
Not a user-side function. Users should not call crew_worker()
directly. See launcher plugins like crew_launcher_local()
for examples.
Usage
crew_worker(
settings,
launcher,
worker,
options_metrics = crew::crew_options_metrics()
)
Arguments
settings |
Named list of arguments to |
launcher |
Character string, name of the launcher |
worker |
Character of length 1 to uniquely identify the current worker. |
options_metrics |
Either |
Value
NULL
(invisibly)
See Also
Other utility:
crew_assert()
,
crew_clean()
,
crew_deprecate()
,
crew_eval()
,
crew_random_name()
,
crew_retry()
,
crew_terminate_process()
,
crew_terminate_signal()
Objects exported from other packages
Description
These objects are imported from other packages. Follow the links below to see their documentation.
- tidyselect
all_of
,any_of
,contains
,ends_with
,everything
,last_col
,matches
,num_range
,one_of
,starts_with