crew lets users write custom launchers
for different types of workers that connect over the local network. The
crew.cluster
package already has plugins for traditional high-performance computing
schedulers (SLURM, SGE, LSF,
and PBS/TORQUE).
These launcher
plugins need not become part of the crew package itself.
You can write your plugin in a simple R script, or you write it in a
custom R package that depends on
crew. Published packages with launcher
plugins are powerful extensions that enhance crew for the
entire open-source community. See R
Packages by Hadley Wickham
and Jenny Bryan for how to
write an R package.
This vignette demonstrates how to write a crew launcher
plugin. It assumes prior familiarity with R6
classes and the computing platform of your plugin.
To create your own launcher plugin, write an R6
subclass of crew_class_launcher
with a launch_worker()
method analogous the one in the local
process launcher. launch_worker() must accept the same
arguments as the local
process launch_worker() method, generate a call to crew_worker(),
and then submit a new job or process to run that call.
Each worker that launches must be able to dial into the client over
the local network. The host argument of
crew_client() provides the local IP address, and the
port argument provides the TCP port. The controller helper
function (see below) should expose arguments host and
port in order to solve potential network problems like this
one.
By default, host is the local IP address.
crew assumes the local network is secure. Please take the
time to assess the network security risks of your computing environment.
Use at your own risk.
We recommend you implement an optional terminate_worker()
method. Although mirai has its own way of terminating
workers, it only works if the worker already connected, and it cannot
reach workers that fail to connect and hang in a crashed state. An
optional terminate_worker()
method in your crew launcher plugin is extra assurance that
these workers will exit.
If you implement a custom terminate_worker()
method, it must not throw an error (and should not throw a warning or
message) if the worker is already terminated. In addition, it must
accept a handle that identifies the worker, and this handle must be the
return value of the previous call to launch_worker().
A handle can be any kind of R object: a process ID, a job name, an
R6 object returned by callr::r_bg(), etc.
The following is a custom custom launcher class whose workers are local R processes on Unix-like systems.
custom_launcher_class <- R6::R6Class(
  classname = "custom_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    launch_worker = function(call, name, launcher, worker, instance) {
      bin <- file.path(R.home("bin"), "Rscript")
      processx::process$new(
        command = bin,
        args = c(self$r_arguments, "-e", call),
        cleanup = FALSE
      )
    },
    terminate_worker = function(handle) {
      handle$signal(crew::crew_terminate_signal())
    }
  )
)Inside launch_worker(), the
processx::process$new(command = bin, args = c(self$r_arguments, "-e", call))
line runs the crew_worker()
call in an external R process with the command line arguments from
r_arguments (supplied when the launcher is created). This
process runs in the background, connects back to crew and
mirai over the local network, and accepts the tasks you
push to the controller. processx::process$new() also
returns a handle which the terminate_worker() method can
use to force-terminate the process if appropriate. mirai
has its own way to terminate workers, so a
terminate_worker() method is not strictly required, but it
is a useful safeguard in case a worker hangs in a crashed state before
it establishes a connection.
Every launch_worker() method must accept arguments
call, launcher, worker, and
instance. The method does not actually need to use all
these arguments, but they must be present in the function signature.
call: text string with a call to crew_worker().name: text string with an informative name for the
worker instance.launcher: text string with the name of the
launcher.worker: positive integer index of the worker. Ranges
from 1 to the maximum number of simultaneous workers configured for the
controller.instance: text string with the instance of the worker
in the launcher at the given worker index.To see what the call object looks like, create a new
launcher and run the call() method.
library(crew)
launcher <- crew_launcher_local()
launcher$start(url = "tcp://127.0.0.1:57000", profile = "example_profile")
launcher$call(worker = "worker_name")
#> [1] "crew::crew_worker(settings = list(url = \"tcp://127.0.0.1:57000\", dispatcher = TRUE, asyncdial = FALSE, autoexit = 15L, cleanup = 1L, output = TRUE, maxtasks = Inf, idletime = Inf, walltime = Inf, timerstart = 0L, id = 1L, tls = NULL, rs = NULL), launcher = \"27548330\", worker = \"worker_name\", options_metrics = crew::crew_options_metrics(path = NULL, seconds_interval = 5))"It is useful to have a helper function that creates controllers with your custom launcher. It should:
crew_controller_local().crew_client().new()
method of your custom launcher class.crew_controller().validate()
method of the controller.Feel free to borrow from the crew_controller_local()
source code. For packages, you can use the
@inheritParams roxygen2 tag to
inherit the documentation of all the arguments instead of writing it by
hand. You may want to adjust the default arguments based on the
specifics of your platform, especially seconds_launch if
workers take a long time to launch.
#' @title Create a controller with the custom launcher.
#' @export
#' @description Create an `R6` object to submit tasks and
#'   launch workers.
#' @inheritParams crew::crew_controller_local
crew_controller_custom <- function(
  name = "custom controller name",
  workers = 1L,
  host = NULL,
  port = NULL,
  tls = crew::crew_tls(),
  serialization = NULL,
  seconds_interval = 0.5,
  seconds_timeout = 30,
  seconds_launch = 30,
  seconds_idle = Inf,
  seconds_wall = Inf,
  tasks_max = Inf,
  tasks_timers = 0L,
  reset_globals = TRUE,
  reset_packages = FALSE,
  reset_options = FALSE,
  garbage_collection = FALSE,
  r_arguments = NULL,
  options_metrics = crew::crew_options_metrics(),
  crashes_max = 5L,
  backup = NULL
) {
  client <- crew::crew_client(
    host = host,
    port = port,
    tls = tls,
    serialization = serialization,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout
  )
  launcher <- custom_launcher_class$new(
    name = name,
    workers = workers,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout,
    seconds_launch = seconds_launch,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers,
    tls = tls,
    r_arguments = r_arguments,
    options_metrics = options_metrics
  )
  controller <- crew::crew_controller(
    client = client,
    launcher = launcher,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection,
    crashes_max = crashes_max,
    backup = backup
  )
  controller$validate()
  controller
}Before you begin testing, please begin monitoring local processes and
remote jobs on your platform. In the case of the above crew
launcher which only creates local processes, it is sufficient to start
htop and filter for R
processes, or launch a new R session to monitor the process table from
ps::ps().
However, for more ambitious launchers that submit workers to e.g. AWS Batch, you may need to open
the CloudWatch
dashboard, then view the AWS billing dashboard after testing.
When you are ready to begin testing, try out the example in the README, but
use your your custom controller helper instead of crew_controller_local().
First, create and start a controller. You may wish to monitor local
processes on your computer to make sure the mirai
dispatcher starts.
Try pushing a task that gets the local IP address and process ID of the worker instance.
controller$push(
  name = "get worker IP address and process ID",
  command = paste(nanonext::ip_addr()[1], ps::ps_pid())
)Wait for the task to complete and look at the result.
Please use the result to verify that the task really ran on a worker
as intended. The process ID above should agree with the one from the
handle (except on
Windows because the actual R process may be different from the
Rscript.exe process created first). In addition, if the
worker is running on a different computer, the worker IP address should
be different than the local IP address. Since our custom launcher
creates local processes, the IP addresses are the same in this case, but
they should be different for a SLURM or AWS Batch launcher.
as.character(nanonext::ip_addr())[1]
#> "192.168.0.2"
controller$launcher$instances$handle[[1]]$get_pid()
#> [1] 27336If you did not set any timeouts or task limits, the worker that ran the task should still be running. The other worker had no tasks, so it did not need to launch.
When you are done, terminate the controller. This terminates the
mirai dispatcher process and the crew
workers.
Finally, use the process monitoring interface of your computing
platform or operating system to verify that all mirai
dispatchers and crew workers are terminated.
If the informal testing succeeded, we recommend you scale up testing to more ambitious scenarios. As one example, you can test that your workers can auto-scale and quickly churn through a large number of tasks.
library(crew)
controller <- crew_controller_custom(
  seconds_idle = 2L,
  workers = 2L
)
controller$start()
# Push 100 tasks
for (index in seq_len(100L)) {
  name <- paste0("task_", index)
  controller$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
# Wait for the tasks to complete.
controller$wait(mode = "all")
# Do the same for 100 more tasks.
for (index in (seq_len(100L) + 100L)) {
  name <- paste0("task_", index)
  controller$push(name = name, command = index, data = list(index = index))
  message(paste("push", name))
}
controller$wait(mode = "all")
# Collect the results.
results <- controller$collect()
# Check the results
all(sort(unlist(results$result)) == seq_len(200L))
#> [1] TRUE
# View the controller summary.
controller$summary()
# Terminate the controller.
controller$terminate()
# Now outside crew, verify that the mirai dispatcher
# and crew workers successfully terminated.Depending on the launcher plugin, worker launches and terminations
can be time-consuming. For example, each HTTP request to AWS Batch can
take a couple seconds, and this latency becomes burdensome when it there
are hundreds of workers. Fortunately, crew launchers can
run launches and terminations asynchronously. As a launcher plugin
developer, all you need to do is:
processes argument of
launcher$new(). The processes field sets how
many mirai daemons run locally and churn through quick
requests.self$async$eval(), and return the resulting value from
launch_worker() and terminate_worker().Let’s demonstrate on the simple processx example. The
use case itself may silly because the workers are local
processx processes, but the same principles apply if you
replace processx with a cloud computing service like AWS
Batch and you replace the process IDs with AWS Batch job IDs.
Here is what the launcher class looks like. We work with
processx PIDs directly because they are light and easy to
send to local async mirai daemons. The
self$async$eval() function accepts R code, data, and
packages to run a quick local asynchronous task, and it returns a
mirai::mirai() task object as the handle. crew
internally resolves launch tasks so the handles you accept in
terminate_worker() are the exact objects returned from
async$eval() in launcher_worker().
async_launcher_class <- R6::R6Class(
  classname = "custom_launcher_class",
  inherit = crew::crew_class_launcher,
  public = list(
    launch_worker = function(call, name, launcher, worker, instance) {
      self$async$eval(
        command = list(pid = process$new(bin, args = c("-e", call))$get_pid()),
        data = list(bin = file.path(R.home("bin"), "R"), call = call),
        packages = "processx"
      )
    },
    terminate_worker = function(handle) {
      self$async$eval(
        command = crew::crew_terminate_process(handle$pid),
        data = list(pid = handle$pid)
      )
    }
  )
)The controller helper includes a processes argument
which sets how many asynchronous mirai daemons to create.
Set processes to NULL to disable async and use
it like an ordinary synchronous controller.
crew_controller_async <- function(
  name = "async controller name",
  workers = 1L,
  host = "127.0.0.1",
  port = NULL,
  tls = crew::crew_tls(mode = "none"),
  seconds_interval = 0.5,
  seconds_timeout = 30,
  seconds_launch = 30,
  seconds_idle = Inf,
  seconds_wall = Inf,
  tasks_max = Inf,
  tasks_timers = 0L,
  reset_globals = TRUE,
  reset_packages = FALSE,
  reset_options = FALSE,
  garbage_collection = FALSE,
  r_arguments = NULL,
  options_metrics = crew::crew_options_metrics(),
  crashes_max = 5L,
  backup = NULL,
  processes = NULL # Number of local async daemons for worker launches etc.
) {
  client <- crew::crew_client(
    host = host,
    port = port,
    tls = tls,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout
  )
  launcher <- async_launcher_class$new(
    name = name,
    workers = workers,
    seconds_interval = seconds_interval,
    seconds_timeout = seconds_timeout,
    seconds_launch = seconds_launch,
    seconds_idle = seconds_idle,
    seconds_wall = seconds_wall,
    tasks_max = tasks_max,
    tasks_timers = tasks_timers
    crashes_error = crashes_error,
    tls = tls,
    r_arguments = r_arguments,
    options_metrics = options_metrics,
    processes = processes
  )
  controller <- crew::crew_controller(
    client = client,
    launcher = launcher,
    reset_globals = reset_globals,
    reset_packages = reset_packages,
    reset_options = reset_options,
    garbage_collection = garbage_collection,
    crashes_max = crashes_max,
    backup = backup
  )
  controller$validate()
  controller
}Creating a controller is the same as before, except the user sets
both the workers and processes arguments.
Remember, these are two different things: workers is the
number of serious workers that run serious tasks from
push(), whereas processes is the number of
mirai daemons that asynchronously launch and terminate
those serious workers. Workers may or may not be local, but
processes are always local.
async_controller$start() automatically launches 4 local
processes to asynchronously auto-scale the workers, and
async_controller$terminate() automatically shuts down those
4 processes. Beyond that, usage is the exactly same as before.
Usually crew workers terminate themselves when the
parent R session exits or the controller terminates, but under rare
circumstances they may continue running. To help users of your plugin
monitor and manually terminate workers, please consider implementing job
management utilities to go with your launcher plugin. As described in
the introduction
vignette, crew_monitor_local() helps manually list and
terminate local processes relevant to crew. Source code for
the local monitor is on
GitHub, methods are documented
in the package website, and example usage is in the introduction
vignette. In addition, crew_monitor_aws_batch()
implements several
methods for listing and terminating AWS Batch jobs, as well as
viewing CloudWatch logs.
The source code for the local monitor is copied below:
crew_monitor_local <- function() {
  crew_class_monitor_local$new()
}
crew_class_monitor_local <- R6::R6Class(
  classname = "crew_class_monitor_local",
  cloneable = FALSE,
  public = list(
    dispatchers = function() {
      crew_monitor_pids(pattern = "mirai::dispatcher")
    },
    daemons = function() {
      crew_monitor_pids(pattern = "mirai::daemon")
    },
    workers = function() {
      crew_monitor_pids(pattern = "crew::crew_worker")
    },
    terminate = function(pids) {
      lapply(as.integer(pids), crew::crew_terminate_process)
    }
  )
)
crew_monitor_pids <- function(pattern) {
  processes <- ps::ps()
  commands <- map(
    processes$ps_handle,
    ~tryCatch(ps::ps_cmdline(.x), error = function(condition) "")
  )
  filter <- grepl(pattern = pattern, x = as.character(commands), fixed = TRUE)
  as.integer(sort(processes$pid[filter]))
}Example usage: