Implementing a Work Queue using RPostgres

Jamie Lentin

## Could not initialise default postgres database. If postgres is running
## check that the environment variables PGHOST, PGPORT, 
## PGUSER, PGPASSWORD, and PGDATABASE, are defined and
## point to your database.

Imagine you have an R process that is relatively intensive, based on user input.

To keep things as fast as possible, you may want to use several servers to all process incoming requests for square roots. However, to do this you need to co-ordinate between all of your servers (or workers). How do you decide which server works on what? What if one server dies mid-way? To decide this, we need a work queue, also known as a job queue or task queue. This document will show show you how to build a work queue system using R and PostgreSQL that would ordinarily require an external tool, like RabbitMQ.

In this example, our work will be generating square roots. We’ll keep track of the results in a table:

library(DBI)

con <- dbConnect(RPostgres::Postgres())

dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;")
dbExecute(con, "
    CREATE TABLE sqroot_vignette_example (
        in_val INTEGER PRIMARY KEY,
        out_val DOUBLE PRECISION NULL
    )
")

When a client wants a square root value, it can insert a new row into a table, filling in_val. We’ll then have a bunch of workers that will calculate the results for the client, and fill in out_val.

To manage these workers, we will combine 2 PostgreSQL concepts:

LISTEN / NOTIFY

The Postgres LISTEN and NOTIFY commands allow you to send and receive messages between clients connected to a PostgreSQL database. This is known as a publish/subscribe architecture.

We tell Postgres that we are interested in receiving messages using LISTEN. For example:

con <- dbConnect(RPostgres::Postgres())
dbExecute(con, "LISTEN grapevine")

…in this case, “grapevine” is arbitrary, we don’t need to create channels ahead of time. To make sure we have something to receive, we can start a separate R process using callr. Ordinarily this would be part of another R script, maybe on another computer. This will wait a bit, and use NOTIFY to send a message, then finish:

rp <- callr::r_bg(function() {
  library(DBI)
  Sys.sleep(0.3)
  db_notify <- dbConnect(RPostgres::Postgres())
  dbExecute(db_notify, "NOTIFY grapevine, 'psst'")
  dbDisconnect(db_notify)
})

Finally, we should wait for any incoming messages. To do this, use postgresWaitForNotify. The payload will contain the message from the other R process:

# Sleep until we get the message
n <- NULL
while (is.null(n)) {
  n <- RPostgres::postgresWaitForNotify(con)
}
n$payload

SKIP LOCKED

We can use LISTEN/NOTIFY to inform all workers that there is something to be done, but how do we decide which worker actually does the work? This is done using SKIP LOCKED.

We notify all workers that the input 99 is ready for processing. After receiving this, they all do the following:

rs <- dbSendQuery(con, "
    SELECT in_val
      FROM sqroot_vignette_example
     WHERE in_val = $1
       FOR UPDATE
      SKIP LOCKED
", params = list(99))

One lucky worker will get a row back, but thanks to FOR UPDATE, the row is now locked. For any other worker, as the row is now locked, they will skip over it (SKIP LOCKED) and find something else to do. If there are no other jobs available, then nothing will be returned.

Using SKIP LOCKED is discussed in more detail in this article.

Implementing our worker

Now we can put the concepts together. The following implements our worker as a function (again, this would be running as a script on several servers):

worker <- function() {
  library(DBI)
  db_worker <- dbConnect(RPostgres::Postgres())
  on.exit(dbDisconnect(db_worker))
  dbExecute(db_worker, "LISTEN sqroot")
  dbExecute(db_worker, "LISTEN sqroot_shutdown")

  while (TRUE) {
    # Wait for new work to do
    n <- RPostgres::postgresWaitForNotify(db_worker, 60)
    if (is.null(n)) {
      # If nothing to do, send notifications of any not up-to-date work
      dbExecute(db_worker, "
                SELECT pg_notify('sqroot', in_val::TEXT)
                  FROM sqroot_vignette_example
                 WHERE out_val IS NULL
            ")
      next
    }

    # If we've been told to shutdown, stop right away
    if (n$channel == 'sqroot_shutdown') {
      writeLines("Shutting down.")
      break
    }

    in_val <- strtoi(n$payload)
    tryCatch(
      {
        dbWithTransaction(db_worker, {
          # Try and fetch the item we got notified about
          rs <- dbSendQuery(db_worker, "
                    SELECT in_val
                      FROM sqroot_vignette_example
                     WHERE out_val IS NULL -- if another worker already finished, don't reprocess
                       AND in_val = $1
                       FOR UPDATE SKIP LOCKED -- Don't let another worker work on this at the same time
                ", params = list(in_val))
          in_val <- dbFetch(rs)[1, 1]
          dbClearResult(rs)

          if (!is.na(in_val)) {
            # Actually do the sqrt
            writeLines(paste("Sqroot-ing", in_val, "... "))
            Sys.sleep(in_val * 0.1)
            out_val <- sqrt(in_val)

            # Update the datbase with the result
            dbExecute(db_worker, "
                      UPDATE sqroot_vignette_example
                         SET out_val = $1
                       WHERE in_val = $2
                  ", params = list(out_val, in_val))
          } else {
            writeLines(paste("Not sqroot-ing as another worker got there first"))
          }
        })
      },
      error = function(e) {
        # Something went wrong. Report error and carry on
        writeLines(paste("Failed to sqroot:", e$message))
      })
  }
}

The worker connects to the database, starts listening and loops indefinitely.

Let’s use callr again to start 2 workers:

stdout_1 <- tempfile()
stdout_2 <- tempfile()
rp <- callr::r_bg(worker, stdout = stdout_1, stderr = stdout_1)
rp <- callr::r_bg(worker, stdout = stdout_2, stderr = stdout_2)
Sys.sleep(1)  # Give workers a chance to set themselves up

Now our client can add some values to our table and notify the workers that there’s something to do:

con <- dbConnect(RPostgres::Postgres())

add_sqroot <- function(in_val) {
  dbExecute(con, "
        INSERT INTO sqroot_vignette_example (in_val) VALUES ($1)
    ", params = list(in_val))
  dbExecute(con, "
        SELECT pg_notify('sqroot', $1)
    ", params = list(in_val))
}

add_sqroot(7)
add_sqroot(8)
add_sqroot(9)

…after a wait, the answers should have been populated by the workers for us:

Sys.sleep(3)
rs <- dbSendQuery(con, "SELECT * FROM sqroot_vignette_example ORDER BY in_val")
dbFetch(rs)
dbClearResult(rs) ; rs <- NULL

Finally, we can use NOTIFY to stop all the workers:

dbExecute(con, "NOTIFY sqroot_shutdown, ''")

And see what messages were printed as they run:

# We can't control which worker will process the first entry,
# so we sort the results so the vignette output stays the same.
outputs <- sort(c(
  paste(readLines(con = stdout_1), collapse = "\n"),
  paste(readLines(con = stdout_2), collapse = "\n")))

writeLines(outputs[[1]])
writeLines(outputs[[2]])

Notice that the work has been shared between the 2 workers. If these 2 weren’t enough, we could happily add more to keep the system going.