--- title: "Callback Hooks" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Callback Hooks} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ## Setup ```r library(jobqueue) q <- Queue$new(workers = 1) ``` ## Introduction Callbacks are the cornerstone of asynchronous programming. If you want to calculate `2 + 2` and show the results, the synchronous programming approach would be: ```r message('Result = ', 2 + 2) #> Result = 20 ``` In asynchronous programming, this task is broken apart into two discrete steps: *computation* and *result handling*. Using jobqueue, this looks like: ```r job <- q$run({ 2 + 2 }) job$on('done', ~message('Result = ', .$result)) #> Result = 20 ``` The asynchronous format allows parts of your code to run independently, in this case on separate R processes. When a part finishes, the callback will be executed to allow you to work with the result. Callbacks are not limited to just the Job finishing. See the "State Triggers" section for a list of all events that can trigger a callback. > **Important** > > Hooks are evaluated on the main process, not the background processes. Therefore, ensure > that the callback functions execute quickly so as to not delay Job handling. ## A Callback Function The callback function should accept one argument: the object that triggered the callback. Functions accepting zero or multiple arguments are also allowed. This is a great place for shorthand function definitions introduced by R 4.1.0. jobqueue also understands the lambda syntax for functions (see `rlang::as_function()`). ```r # Queue Hooks hook <- function (queue) { message('Queue is ', queue$state) } hook <- \(q) message('Queue is ', q$state) hook <- ~message('Queue is ', .$state) # Worker Hooks hook <- function (worker) { message('Worker is ', worker$state) } hook <- \(w) message('Worker is ', w$state) hook <- ~message('Worker is ', .$state) # Job Hooks hook <- function (job) { message('Job is ', job$state) } hook <- \(j) message('Job is ', j$state) hook <- ~message('Job is ', .$state) ``` ## Triggers Queue, Worker, and Job objects update their `$state` as described in the tables below. Each time the state changes, any callbacks registered to that state are executed. In addition, you can register `state = '*'` or `state='.next'` which trigger regardless of the present state name. ### Special Triggers | State | Triggers | |----------------|---------------------------------------------------------| | `'*'` | Every time the state changes. | | `'.next'` | Only one time, the next time the state changes. | ### Queue States | State | Triggers | |----------------|---------------------------------------------------------| | `'starting'` | After initialization, before Workers are started. | | `'idle'` | When all Workers are idle. Also, after initial startup. | | `'busy'` | At least one worker is busy. | | `'stopped'` | After `$stop()` is called. | | `'error'` | Workers did not start cleanly. | ### Worker States | State | Triggers | |----------------|---------------------------------------------------------| | `'starting'` | Background process is being configured. | | `'idle'` | Waiting on Jobs to be submitted. | | `'busy'` | After a Job starts running. | | `'stopped'` | After `$stop()` is called. | ### Job States | State | Triggers | |----------------|---------------------------------------------------------| | `'created'` | After `Job$new()` initialization. | | `'submitted'` | After `$queue` is assigned. | | `'queued'` | After `stop_id` and `copy_id` are resolved. | | `'starting'` | Before evaluation begins. | | `'running'` | After `$worker` is set and evaluation begins. | | `'done'` | After `$output` is assigned. | ## Attaching Callbacks can be attached to Queue, Worker, or Job objects. You can add callbacks either when you create the object with `$new()`, or later with `$on()`. ```r hook <- ~message(.$uid, ' is ', .$state) q <- Queue$new(hooks = list(q_idle = hook)) w <- Worker$new(hooks = list(idle = hook)) j <- Job$new(hooks = list(done = hook)) q$on('busy', hook) w$on('busy', hook) j$on('starting', hook) ``` In `Queue$new()`, `hooks` can set hooks for the Queue, Worker, and Job objects. The rules are: * Prefixing with `q_`, `w_`, or `j_` attaches the hook to the Queue, Workers, or Jobs, respectively. * Non-prefixed hooks are attached to Jobs. * Alternatively, a list of lists can be assigned to hooks, of the format: ```r Queue$new( 'hooks' = list( 'queue' = list(idle = hook), 'worker' = list(idle = hook), 'job' = list(done = hook) )) ``` ## Removing Callbacks attached with `$new()` cannot be removed. When you attach a callback with `$on()`, the return value is a function, which, when called, will remove that callback from the object. ```r job <- Job$new( 'expr' = { 3.14 }, 'hooks' = list(done = ~message('ABC')) ) off <- job$on('done', ~message('XYZ')) off() q$submit(job) #> ABC ``` ## Default Job Hooks When you create a Queue (with `Queue$new()`), you can define a set of callbacks to automatically apply to any Jobs that are created with the `$run()` command. ```r n <- 0 q <- Queue$new(hooks = list(created = ~{ n <<- n + 1 } )) for (i in 1:5) q$run({ 'Hi' }) n #> [1] 5 ``` How does `Queue$new()` know to apply the `hooks` to Jobs instead of Queues or Workers? Unless otherwise indicated, `Queue$new()` `hooks` are assumed to be for Jobs. You can also explicitly specify that `hooks` are for Jobs by using the formats described in the "Attaching Callbacks" section above. ```r q <- Queue$new(hooks = list(j_created = ~{ n <<- n + 1 } )) # or q <- Queue$new(hooks = list(job = list(created = ~{ n <<- n + 1 } ))) ``` If you set `hooks` in `$run()`, those hooks will REPLACE the Job hooks from `Queue$new()`. ```r n <- 0 q <- Queue$new(hooks = list(created = ~{ n <<- n + 1 } )) for (i in 1:3) q$run({ 'Hi' }, hooks = list(done = ~message(.$result))) #> Hi #> Hi #> Hi n #> [1] 0 ``` ## Use Case: Priority Setting Below, we'll set up a callback function that triggers when each Job enters the `'queued'` state. It will modify the Job, adding a custom `$priority` field to the Job object. Then it will modify the Queue's internal list of jobs (`$jobs`), sorting them according to each Job's `$priority`. Last, it will attach additional callbacks to the Job to output timing information upon exit from the `'queued`' state and upon entry into the `'done'` state. ```r library(glue) library(jobqueue) # Our callback/hook function. prioritize <- function (job) { queue <- job$queue queue_jobs <- job$queue$jobs # Apply a random priority to this job. job$priority <- round(runif(1) * 10) - 5 # Sort all this Queue's jobs by priority (including this job). priorities <- sapply(queue_jobs, `[[`, 'priority') job$queue$jobs <- queue_jobs[order(priorities)] # Add hooks to this job to report queued/total times. t1 <- Sys.time() tdiff <- function () format(round(Sys.time() - t1, 1)) job$on('.next', ~message(glue( 'Job {.$uid} (priority {.$priority}) was {.$state} after {tdiff()}' ))) job$on('done', ~message(glue( 'Job {.$uid} (priority {.$priority}) finished in {tdiff()}' ))) } # A single worker best illustrates processing order. q <- Queue$new( 'workers' = 1, 'hooks' = list(queued = prioritize) ) for (i in 1:5) { job <- q$run({ 3.14 }) message(glue_data(job, 'Created Job {uid} with priority {priority}')) } #> Job J11 (priority -3) was dispatched after 0.1 secs #> Created Job J11 with priority -3 #> Created Job J12 with priority -2 #> Created Job J13 with priority 1 #> Created Job J14 with priority 0 #> Created Job J15 with priority -1 #> Job J11 (priority -3) finished in 0.7 secs #> Job J12 (priority -2) was dispatched after 0.6 secs #> Job J12 (priority -2) finished in 1.1 secs #> Job J15 (priority -1) was dispatched after 0.7 secs #> Job J15 (priority -1) finished in 1.3 secs #> Job J14 (priority 0) was dispatched after 1.4 secs #> Job J14 (priority 0) finished in 2 secs #> Job J13 (priority 1) was dispatched after 2.1 secs #> Job J13 (priority 1) finished in 2.6 secs ``` In an actual application, you could set `$priority` based on `$vars`. ```r prioritize <- function (job) { # Give priority to lower number of replications job$priority <- job$vars$replications queue_jobs <- job$queue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$queue$jobs <- queue_jobs[order(priorities)] } q <- Queue$new(hooks = list(queued = prioritize)) # vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps)) ``` Or, define the custom `$priority` field in `$run()`. ```r prioritize <- function (job) { queue_jobs <- job$queue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$queue$jobs <- queue_jobs[order(priorities)] } q <- Queue$new(hooks = list(queued = prioritize)) # vvvvvvvvvvvvvvv for (reps in 5:1) job <- q$run({ 3.14 }, priority = reps) ``` Or, set `$priority` in a hook that triggers before `prioritize()` is triggered. ```r set_priority <- function (job) { job$priority <- job$vars$replications } prioritize <- function (job) { queue_jobs <- job$queue$jobs priorities <- sapply(queue_jobs, `[[`, 'priority') job$queue$jobs <- queue_jobs[order(priorities)] } # vvvvvvvvvvvvvvvvvvvvvv q <- Queue$new(hooks = list(created = set_priority, queued = prioritize)) for (reps in 5:1) job <- q$run({ 3.14 }, vars = list(replications = reps)) ``` ## Use Case: Rate Limiting Say you're hosting a web service, where users are allowed to submit one Job every 30 seconds. Jobs can take more than 30 seconds, so the solution is more complex than setting `stop_id = user_id`. And what if you want to stop the new Job instead of the old one? ```r rate_limit <- function (job) { job$t_start <- Sys.time() for (j in job$queue$jobs) if (j$user_id == job$user_id) if (job$t_start - j$t_start < 30) job$stop('Rate Limit Exceeded') } q <- Queue$new(hooks = list(submitted = rate_limit)) j_A1 <- q$run({ 42 }, user_id = 'A') j_B1 <- q$run({ 42 }, user_id = 'B') j_B2 <- q$run({ 42 }, user_id = 'B') j_B1$result #> [1] 42 j_B2$result #> ``` Note that the above code won't completely solve the rate limiting task. If a user's Job only takes five seconds to complete, then they could submit a Job every six seconds and the Queue would be none the wiser. To give the Queue awareness of previously completed Jobs, you'll need to persistently store per-user Job start times - like in the below solution. ```r t_user <- list() rate_limit <- function (job) { t_start <- Sys.time() t_diff <- t_user[[job$user_id]] - t_start if (isTRUE(t_diff < 30)) { job$stop('Rate Limit Exceeded') } else { t_user[[job$user_id]] <<- t_start } } q <- Queue$new(hooks = list(created = rate_limit)) ``` In the first example, we attached a hook to `'submitted'` because that's when `$queue` becomes available in callbacks. In the latter example, we attached a hook to `'created'` instead because we didn't need `$queue` for that solution. Check the trigger order listed in the "Callback Triggers" section above, and attach callbacks as early as possible to expedite Job handling.