---
title: "Stopping Jobs"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Stopping Jobs}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
## Introduction
You can save compute resources by stopping the execution of Jobs that are no longer needed. If that Job is currently running on a Worker, then the associated background process is terminated. Terminated background process are automatically replaced by new ones.
There are several ways to stop a Job:
* `$stop()`: return an interrupt condition.
* `$output<-`: return a custom result.
* `$stop_id`: replace redundant Jobs.
* `$copy_id`: combine redundant Jobs.
* `$timeout`: cap a Job's compute time.
## Setup
```r
library(jobqueue)
q <- Queue$new(workers = 1)
```
## Immediate Stop
As soon as a Job is created, it can be stopped by either calling `$stop()` or
by assigning to `$output`. These methods work up until the Job state is `'done'`.
Once the Job state is `'done'`, `$output` is immutable and calling `$stop()`
will have no effect (since it's already stopped). Calling `$stop()` and assigning
to `$output` both have the side effect of setting the Job state to `'done'`.
### Interrupting with `$stop()`
```r
job <- Job$new({ Sys.sleep(5); 10 })
job$stop()
job$result
#>
job <- Job$new({ Sys.sleep(5); 10 })
job$stop('my reason')
job$result
#>
class(job$result)
#> [1] "interrupt" "condition"
```
You can call `$stop()` from within callback hooks.
```r
hooks <- list(
'created' = function (job) {
if (length(job$vars$x) > 100)
job$stop('Job is too large.')
}
)
job <- q$run({ sum(x) }, vars = list(x = 1:200), hooks = hooks)
job$result
#>
```
### Assigning to `$output`
`$output` can only be assigned to once. If the Job is not yet done when
`$output` is assigned, then any ongoing processing will be halted.
```r
job <- Job$new({ Sys.sleep(5); 10 })
job$state
#> [1] "created"
job$output <- "Custom result"
job$state
#> [1] "done"
job$result
#> [1] "Custom result"
```
If you've set `$reformat`, it will determine `$result` as usual.
See `vignette('results')`.
## Replacing and Combining
Sometimes repeat jobs may be sent to the Queue, for instance, a user clicking a
'submit' button repeatedly. `stop_id` and `copy_id` can prevent such requests from
overloading the server.
### Stop ID
New jobs will replace existing jobs with the same `stop_id`.
```r
job1 <- q$run({ Sys.sleep(5); 'A' }, stop_id = 123)
job2 <- q$run({ 'B' }, stop_id = 123)
job1$result
#>
job2$result
#> [1] "B"
```
### Copy ID
New jobs will mirror the output of existing jobs with the same `copy_id`.
```r
job1 <- q$run({ Sys.sleep(5); 'A' }, copy_id = 456)
job2 <- q$run({ 'B' }, copy_id = 456)
job1$result
#> [1] "A"
job2$result
#> [1] "A"
```
### In `Queue$new()`
If you set `stop_id` and/or `copy_id` in `Queue$new()`, then they must be functions
(or lambda notation functions) that accept a Job object as input and returns a value
`id`. If `id` is `NULL`, then no comparisons will be done. Otherwise, `id` can be any
data type and will be compared to other `id`s with `identical()`.
```r
q <- Queue$new(stop_id = function (job) list(job$vars$f, job$user_id))
job1 <- q$run({ f(x) }, vars = list(f = sum, x = 1:10), user_id = 'A')
job2 <- q$run({ f(x) }, vars = list(f = sum, x = 1:2), user_id = 'A')
job3 <- q$run({ f(x) }, vars = list(f = mean, x = 1:10), user_id = 'A')
job1$result
#>
job2$result
#> [1] 3
job3$result
#> [1] 5.5
```
## Timeouts
```r
job <- Job$new({ Sys.sleep(5); 'Zzzzz' }, timeout = 0.2)
q$submit(job)
job$result
#>
job <- Job$new({ 10 }, timeout = list(created = 0.2))
job$result
#>
```
Limits (in seconds) can be set on:
* The total `'submitted'` to `'done'` time: `timeout = 2`
* On a per-state basis: `timeout = list(queued = 1, running = 2)`
* Or both: `timeout = list(total = 3, queued = 2, running = 2)`