--- title: "Stopping Jobs" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Stopping Jobs} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Introduction You can save compute resources by stopping the execution of Jobs that are no longer needed. If that Job is currently running on a Worker, then the associated background process is terminated. Terminated background process are automatically replaced by new ones. There are several ways to stop a Job: * `$stop()`: return an interrupt condition. * `$output<-`: return a custom result. * `$stop_id`: replace redundant Jobs. * `$copy_id`: combine redundant Jobs. * `$timeout`: cap a Job's compute time. ## Setup ```r library(jobqueue) q <- Queue$new(workers = 1) ``` ## Immediate Stop As soon as a Job is created, it can be stopped by either calling `$stop()` or by assigning to `$output`. These methods work up until the Job state is `'done'`. Once the Job state is `'done'`, `$output` is immutable and calling `$stop()` will have no effect (since it's already stopped). Calling `$stop()` and assigning to `$output` both have the side effect of setting the Job state to `'done'`. ### Interrupting with `$stop()` ```r job <- Job$new({ Sys.sleep(5); 10 }) job$stop() job$result #> job <- Job$new({ Sys.sleep(5); 10 }) job$stop('my reason') job$result #> class(job$result) #> [1] "interrupt" "condition" ``` You can call `$stop()` from within callback hooks. ```r hooks <- list( 'created' = function (job) { if (length(job$vars$x) > 100) job$stop('Job is too large.') } ) job <- q$run({ sum(x) }, vars = list(x = 1:200), hooks = hooks) job$result #> ``` ### Assigning to `$output` `$output` can only be assigned to once. If the Job is not yet done when `$output` is assigned, then any ongoing processing will be halted. ```r job <- Job$new({ Sys.sleep(5); 10 }) job$state #> [1] "created" job$output <- "Custom result" job$state #> [1] "done" job$result #> [1] "Custom result" ``` If you've set `$reformat`, it will determine `$result` as usual. See `vignette('results')`. ## Replacing and Combining Sometimes repeat jobs may be sent to the Queue, for instance, a user clicking a 'submit' button repeatedly. `stop_id` and `copy_id` can prevent such requests from overloading the server. ### Stop ID New jobs will replace existing jobs with the same `stop_id`. ```r job1 <- q$run({ Sys.sleep(5); 'A' }, stop_id = 123) job2 <- q$run({ 'B' }, stop_id = 123) job1$result #> job2$result #> [1] "B" ``` ### Copy ID New jobs will mirror the output of existing jobs with the same `copy_id`. ```r job1 <- q$run({ Sys.sleep(5); 'A' }, copy_id = 456) job2 <- q$run({ 'B' }, copy_id = 456) job1$result #> [1] "A" job2$result #> [1] "A" ``` ### In `Queue$new()` If you set `stop_id` and/or `copy_id` in `Queue$new()`, then they must be functions (or lambda notation functions) that accept a Job object as input and returns a value `id`. If `id` is `NULL`, then no comparisons will be done. Otherwise, `id` can be any data type and will be compared to other `id`s with `identical()`. ```r q <- Queue$new(stop_id = function (job) list(job$vars$f, job$user_id)) job1 <- q$run({ f(x) }, vars = list(f = sum, x = 1:10), user_id = 'A') job2 <- q$run({ f(x) }, vars = list(f = sum, x = 1:2), user_id = 'A') job3 <- q$run({ f(x) }, vars = list(f = mean, x = 1:10), user_id = 'A') job1$result #> job2$result #> [1] 3 job3$result #> [1] 5.5 ``` ## Timeouts ```r job <- Job$new({ Sys.sleep(5); 'Zzzzz' }, timeout = 0.2) q$submit(job) job$result #> job <- Job$new({ 10 }, timeout = list(created = 0.2)) job$result #> ``` Limits (in seconds) can be set on: * The total `'submitted'` to `'done'` time: `timeout = 2` * On a per-state basis: `timeout = list(queued = 1, running = 2)` * Or both: `timeout = list(total = 3, queued = 2, running = 2)`