Stopping Jobs

Introduction

You can save compute resources by stopping the execution of Jobs that are no longer needed. If that Job is currently running on a Worker, then the associated background process is terminated. Terminated background process are automatically replaced by new ones.

There are several ways to stop a Job:

Setup

library(jobqueue)
q <- Queue$new(workers = 1)

Immediate Stop

As soon as a Job is created, it can be stopped by either calling <Job>$stop() or by assigning to <Job>$output. These methods work up until the Job state is 'done'. Once the Job state is 'done', <Job>$output is immutable and calling <Job>$stop() will have no effect (since it’s already stopped). Calling <Job>$stop() and assigning to <Job>$output both have the side effect of setting the Job state to 'done'.

Interrupting with <Job>$stop()

job <- Job$new({ Sys.sleep(5); 10 })
job$stop()
job$result
#> <interrupt: job stopped by user>

job <- Job$new({ Sys.sleep(5); 10 })
job$stop('my reason')
job$result
#> <interrupt: my reason>

class(job$result)
#> [1] "interrupt" "condition"

You can call <Job>$stop() from within callback hooks.

hooks <- list(
  'created' = function (job) {
    if (length(job$vars$x) > 100)
      job$stop('Job is too large.')
  }
)

job <- q$run({ sum(x) }, vars = list(x = 1:200), hooks = hooks)
job$result
#> <interrupt: Job is too large.>

Assigning to <Job>$output

<Job>$output can only be assigned to once. If the Job is not yet done when <Job>$output is assigned, then any ongoing processing will be halted.

job <- Job$new({ Sys.sleep(5); 10 })

job$state
#> [1] "created"

job$output <- "Custom result"

job$state
#> [1] "done"

job$result
#> [1] "Custom result"

If you’ve set <Job>$reformat, it will determine <Job>$result as usual. See vignette('results').

Replacing and Combining

Sometimes repeat jobs may be sent to the Queue, for instance, a user clicking a ‘submit’ button repeatedly. stop_id and copy_id can prevent such requests from overloading the server.

Stop ID

New jobs will replace existing jobs with the same stop_id.

job1 <- q$run({ Sys.sleep(5); 'A' }, stop_id = 123)
job2 <- q$run({ 'B' },               stop_id = 123)
job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] "B"

Copy ID

New jobs will mirror the output of existing jobs with the same copy_id.

job1 <- q$run({ Sys.sleep(5); 'A' }, copy_id = 456)
job2 <- q$run({ 'B' },               copy_id = 456)
job1$result
#> [1] "A"
job2$result
#> [1] "A"

In Queue$new()

If you set stop_id and/or copy_id in Queue$new(), then they must be functions (or lambda notation functions) that accept a Job object as input and returns a value id. If id is NULL, then no comparisons will be done. Otherwise, id can be any data type and will be compared to other ids with identical().

q <- Queue$new(stop_id = function (job) list(job$vars$f, job$user_id))

job1 <- q$run({ f(x) }, vars = list(f = sum,  x = 1:10), user_id = 'A')
job2 <- q$run({ f(x) }, vars = list(f = sum,  x = 1:2),  user_id = 'A')
job3 <- q$run({ f(x) }, vars = list(f = mean, x = 1:10), user_id = 'A')

job1$result
#> <interrupt: duplicated stop_id>
job2$result
#> [1] 3
job3$result
#> [1] 5.5

Timeouts

job <- Job$new({ Sys.sleep(5); 'Zzzzz' }, timeout = 0.2)
q$submit(job)
job$result
#> <interrupt: total runtime exceeded 0.2 seconds>

job <- Job$new({ 10 }, timeout = list(created = 0.2))
job$result
#> <interrupt: exceeded 0.2 seconds while in "created" state>

Limits (in seconds) can be set on: