Advanced: Submitting and Polling Multiple Batches

1. Overview

This vignette demonstrates how to use pairwiseLLM for more complex batch workflows, including:

The examples are based on the A/B template testing dev scripts for:

At present, batch helpers are implemented for OpenAI, Anthropic, and Gemini. Together.ai and Ollama are supported only via the live APIs (submit_llm_pairs() / llm_compare_pair()).

Note: All heavy API calls in this vignette are set to eval = FALSE so that the vignette remains CRAN-safe. You can enable them in your own project.

For basic function usage, see the companion vignette:

For prompt evaluation and positional-bias diagnostics, see the companion vignette:

2. Setup and API Keys

library(pairwiseLLM)
library(dplyr)
library(tidyr)
library(purrr)
library(readr)
library(stringr)

Required environment variables:

Provider Environment Variable
OpenAI OPENAI_API_KEY
Anthropic ANTHROPIC_API_KEY
Gemini GEMINI_API_KEY

Check which are set:

check_llm_api_keys()
#> All known LLM API keys are set: OPENAI_API_KEY, ANTHROPIC_API_KEY, GEMINI_API_KEY, TOGETHER_API_KEY.
#> # A tibble: 4 × 4
#>   backend   service       env_var           has_key
#>   <chr>     <chr>         <chr>             <lgl>  
#> 1 openai    OpenAI        OPENAI_API_KEY    TRUE   
#> 2 anthropic Anthropic     ANTHROPIC_API_KEY TRUE   
#> 3 gemini    Google Gemini GEMINI_API_KEY    TRUE   
#> 4 together  Together.ai   TOGETHER_API_KEY  TRUE

3. Example Data and Prompt Template

We use the built-in writing samples and a single trait (overall_quality).

data("example_writing_samples", package = "pairwiseLLM")

td <- trait_description("overall_quality")
td
#> $name
#> [1] "Overall Quality"
#> 
#> $description
#> [1] "Overall quality of the writing, considering how well ideas are expressed,\n      how clearly the writing is organized, and how effective the language and\n      conventions are."

Default prompt template:

tmpl <- set_prompt_template()
cat(substr(tmpl, 1, 400), "...\n")
#> You are a debate adjudicator. Your task is to weigh the comparative strengths of two writing samples regarding a specific trait.
#> 
#> TRAIT: {TRAIT_NAME}
#> DEFINITION: {TRAIT_DESCRIPTION}
#> 
#> SAMPLES:
#> 
#> === SAMPLE_1 ===
#> {SAMPLE_1}
#> 
#> === SAMPLE_2 ===
#> {SAMPLE_2}
#> 
#> EVALUATION PROCESS (Mental Simulation):
#> 
#> 1.  **Advocate for SAMPLE_1**: Mentally list the single strongest point of evidence that makes SAMPLE_1 the  ...

Construct a modest number of pairs to keep the example light:

set.seed(123)

pairs_all <- example_writing_samples |>
  make_pairs()

n_pairs <- min(40L, nrow(pairs_all))

pairs_forward <- pairs_all |>
  sample_pairs(n_pairs = n_pairs, seed = 123) |>
  randomize_pair_order(seed = 456)

pairs_reverse <- sample_reverse_pairs(
  pairs_forward,
  reverse_pct = 1.0,
  seed        = 789
)

get_pairs_for_direction <- function(direction = c("forward", "reverse")) {
  direction <- match.arg(direction)
  if (identical(direction, "forward")) {
    pairs_forward
  } else {
    pairs_reverse
  }
}

4. Designing the Batch Grid

Suppose we want to test several prompt templates across:

Here we define a small grid:

anthropic_models <- c(
  "claude-sonnet-4-5",
  "claude-haiku-4-5",
  "claude-opus-4-5"
)

gemini_models <- c(
  "gemini-3-pro-preview"
)

openai_models <- c(
  "gpt-4.1",
  "gpt-4o",
  "gpt-5.1"
)

thinking_levels <- c("no_thinking", "with_thinking")
directions <- c("forward", "reverse")

anthropic_grid <- tidyr::expand_grid(
  provider  = "anthropic",
  model     = anthropic_models,
  thinking  = thinking_levels,
  direction = directions
)

gemini_grid <- tidyr::expand_grid(
  provider  = "gemini",
  model     = gemini_models,
  thinking  = "with_thinking",
  direction = directions
)

openai_grid <- tidyr::expand_grid(
  provider  = "openai",
  model     = openai_models,
  thinking  = thinking_levels,
  direction = directions
) |>
  # For example, only allow "with_thinking" for gpt-5.1
  dplyr::filter(model == "gpt-5.1" | thinking == "no_thinking")

batch_grid <- dplyr::bind_rows(
  anthropic_grid,
  gemini_grid,
  openai_grid
)

batch_grid
#> # A tibble: 22 × 4
#>    provider  model             thinking      direction
#>    <chr>     <chr>             <chr>         <chr>    
#>  1 anthropic claude-sonnet-4-5 no_thinking   forward  
#>  2 anthropic claude-sonnet-4-5 no_thinking   reverse  
#>  3 anthropic claude-sonnet-4-5 with_thinking forward  
#>  4 anthropic claude-sonnet-4-5 with_thinking reverse  
#>  5 anthropic claude-haiku-4-5  no_thinking   forward  
#>  6 anthropic claude-haiku-4-5  no_thinking   reverse  
#>  7 anthropic claude-haiku-4-5  with_thinking forward  
#>  8 anthropic claude-haiku-4-5  with_thinking reverse  
#>  9 anthropic claude-opus-4-5   no_thinking   forward  
#> 10 anthropic claude-opus-4-5   no_thinking   reverse  
#> # ℹ 12 more rows

We will also imagine multiple prompt templates have been registered. For simplicity, we use the same tmpl string, but in practice you would substitute different text:

templates_tbl <- tibble::tibble(
  template_id     = c("test1", "test2", "test3", "test4", "test5"),
  prompt_template = list(tmpl, tmpl, tmpl, tmpl, tmpl)
)

templates_tbl
#> # A tibble: 5 × 2
#>   template_id prompt_template
#>   <chr>       <list>         
#> 1 test1       <chr [1]>      
#> 2 test2       <chr [1]>      
#> 3 test3       <chr [1]>      
#> 4 test4       <chr [1]>      
#> 5 test5       <chr [1]>

5. Phase 1: Submitting Many Batches (No Polling Yet)

We will:

  1. Loop over all (template_id, provider, model, thinking, direction) combinations.
  2. Submit a batch for each combination.
  3. Record metadata (including batch_id) in an in-memory jobs list.
  4. Write a batch index CSV to disk.

Create an output directory:

out_dir <- "dev-output/advanced-multi-batch"
dir.create(out_dir, recursive = TRUE, showWarnings = FALSE)

Submit all batches:

jobs <- list()

for (t_row in seq_len(nrow(templates_tbl))) {
  template_id <- templates_tbl$template_id[t_row]
  tmpl_string <- templates_tbl$prompt_template[[t_row]]

  for (i in seq_len(nrow(batch_grid))) {
    row <- batch_grid[i, ]

    provider <- row$provider
    model <- row$model
    thinking <- row$thinking
    direction <- row$direction

    message(
      "Submitting batch: template=", template_id,
      " | ", provider, " / ", model,
      " / ", thinking, " / ", direction
    )

    pairs_use <- get_pairs_for_direction(direction)
    is_thinking <- identical(thinking, "with_thinking")

    prefix <- paste(provider, template_id, model, thinking, direction,
      sep = "_"
    )
    prefix <- gsub("[^A-Za-z0-9_.-]", "-", prefix)

    batch_input_path <- file.path(out_dir, paste0(prefix, "_input.jsonl"))
    batch_output_path <- file.path(out_dir, paste0(prefix, "_output.jsonl"))
    csv_path <- file.path(out_dir, paste0(prefix, ".csv"))

    if (identical(provider, "openai")) {
      # OpenAI: use the helpers from the dev scripts
      include_thoughts <- is_thinking && grepl("^gpt-5\\.1", model)

      pipeline <- run_openai_batch_pipeline(
        pairs             = pairs_use,
        model             = model,
        trait_name        = td$name,
        trait_description = td$description,
        prompt_template   = tmpl_string,
        include_thoughts  = include_thoughts,
        include_raw       = TRUE,
        batch_input_path  = batch_input_path,
        batch_output_path = batch_output_path,
        poll              = FALSE
      )

      jobs[[length(jobs) + 1L]] <- list(
        template_id       = template_id,
        provider          = provider,
        model             = model,
        thinking          = thinking,
        direction         = direction,
        prefix            = prefix,
        batch_type        = "openai",
        batch_id          = pipeline$batch$id,
        batch_input_path  = pipeline$batch_input_path,
        batch_output_path = batch_output_path,
        csv_path          = csv_path,
        done              = FALSE,
        results           = NULL
      )
    } else if (identical(provider, "anthropic")) {
      # Anthropic: use run_anthropic_batch_pipeline()
      reasoning <- if (is_thinking) "enabled" else "none"
      temperature_arg <- if (!is_thinking) 0 else NULL

      pipeline <- run_anthropic_batch_pipeline(
        pairs             = pairs_use,
        model             = model,
        trait_name        = td$name,
        trait_description = td$description,
        prompt_template   = tmpl_string,
        reasoning         = reasoning,
        include_thoughts  = is_thinking,
        batch_input_path  = batch_input_path,
        batch_output_path = batch_output_path,
        poll              = FALSE,
        temperature       = temperature_arg,
        include_raw       = TRUE
      )

      jobs[[length(jobs) + 1L]] <- list(
        template_id       = template_id,
        provider          = provider,
        model             = model,
        thinking          = thinking,
        direction         = direction,
        prefix            = prefix,
        batch_type        = "anthropic",
        batch_id          = pipeline$batch$id,
        batch_input_path  = pipeline$batch_input_path,
        batch_output_path = batch_output_path,
        csv_path          = csv_path,
        done              = FALSE,
        results           = NULL
      )
    } else if (identical(provider, "gemini")) {
      # Gemini: typically use low-level helpers, as in the dev scripts
      req_tbl <- build_gemini_batch_requests(
        pairs             = pairs_use,
        model             = model,
        trait_name        = td$name,
        trait_description = td$description,
        prompt_template   = tmpl_string,
        thinking_level    = "low", # example
        include_thoughts  = TRUE
      )

      batch <- gemini_create_batch(
        requests    = req_tbl$request,
        model       = model,
        api_key     = Sys.getenv("GEMINI_API_KEY"),
        api_version = "v1beta"
      )

      batch_name <- batch$name %||% stop(
        "Gemini batch did not return a `name` field.",
        call. = FALSE
      )

      jobs[[length(jobs) + 1L]] <- list(
        template_id       = template_id,
        provider          = provider,
        model             = model,
        thinking          = thinking,
        direction         = direction,
        prefix            = prefix,
        batch_type        = "gemini",
        batch_id          = batch_name,
        batch_input_path  = batch_input_path,
        batch_output_path = batch_output_path,
        csv_path          = csv_path,
        done              = FALSE,
        results           = NULL
      )
    }
  }
}

5.1 Writing a Batch Registry CSV (Important!)

To avoid losing batch IDs if your session dies, write a compact index of all jobs to disk:

jobs_tbl <- tibble::tibble(
  idx = seq_along(jobs),
  template_id = vapply(jobs, `[[`, character(1), "template_id"),
  provider = vapply(jobs, `[[`, character(1), "provider"),
  model = vapply(jobs, `[[`, character(1), "model"),
  thinking = vapply(jobs, `[[`, character(1), "thinking"),
  direction = vapply(jobs, `[[`, character(1), "direction"),
  prefix = vapply(jobs, `[[`, character(1), "prefix"),
  batch_type = vapply(jobs, `[[`, character(1), "batch_type"),
  batch_id = vapply(jobs, `[[`, character(1), "batch_id"),
  batch_input_path = vapply(jobs, `[[`, character(1), "batch_input_path"),
  batch_output_path = vapply(jobs, `[[`, character(1), "batch_output_path"),
  csv_path = vapply(jobs, `[[`, character(1), "csv_path")
)

jobs_index_path <- file.path(out_dir, "batch_jobs_index.csv")
readr::write_csv(jobs_tbl, jobs_index_path)

jobs_index_path

You can now stop R or close RStudio safely — all critical details are in batch_jobs_index.csv.

6. Phase 2: Polling, Downloading, and Parsing

In a new session, you can:

  1. Reload the batch index CSV
  2. Rebuild the jobs list
  3. Poll providers for batch status
  4. Download and parse results when complete
  5. Save per-job CSVs of parsed results

First, helper functions for terminal states:

is_terminal_openai <- function(status) {
  status %in% c("completed", "failed", "cancelled", "expired")
}

is_terminal_anthropic <- function(status) {
  status %in% c("ended", "errored", "canceled", "expired")
}

is_terminal_gemini <- function(state) {
  state %in% c("SUCCEEDED", "FAILED", "CANCELLED", "EXPIRED")
}

Now the polling loop, with a small delay between jobs to reduce 429 (rate limit) risks:

interval_seconds <- 60
per_job_delay <- 2 # seconds between polling calls

# Reload batch index
jobs_index_path <- file.path(out_dir, "batch_jobs_index.csv")
jobs_tbl <- readr::read_csv(jobs_index_path, show_col_types = FALSE)

# Rebuild jobs list skeleton
jobs <- purrr::pmap(
  jobs_tbl,
  function(idx, template_id, provider, model, thinking, direction,
           prefix, batch_type, batch_id,
           batch_input_path, batch_output_path, csv_path, ...) {
    list(
      template_id       = template_id,
      provider          = provider,
      model             = model,
      thinking          = thinking,
      direction         = direction,
      prefix            = prefix,
      batch_type        = batch_type,
      batch_id          = batch_id,
      batch_input_path  = batch_input_path,
      batch_output_path = batch_output_path,
      csv_path          = csv_path,
      done              = FALSE,
      results           = NULL
    )
  }
)

unfinished <- which(!vapply(jobs, `[[`, logical(1), "done"))

while (length(unfinished) > 0L) {
  message("Polling ", length(unfinished), " unfinished batch(es)...")

  for (j in unfinished) {
    job <- jobs[[j]]
    if (job$done) next

    batch_type <- job$batch_type

    if (identical(batch_type, "openai")) {
      batch <- openai_get_batch(job$batch_id)
      status <- batch$status %||% "unknown"
      message("  [OpenAI] ", job$prefix, " status: ", status)

      if (is_terminal_openai(status)) {
        if (identical(status, "completed")) {
          openai_download_batch_output(
            batch_id = job$batch_id,
            path     = job$batch_output_path
          )

          res <- parse_openai_batch_output(job$batch_output_path)
          jobs[[j]]$results <- res
          readr::write_csv(res, job$csv_path)
          message("    -> Results written to: ", job$csv_path)
        }
        jobs[[j]]$done <- TRUE
      }
    } else if (identical(batch_type, "anthropic")) {
      batch <- anthropic_get_batch(job$batch_id)
      status <- batch$processing_status %||% "unknown"
      message("  [Anthropic] ", job$prefix, " status: ", status)

      if (is_terminal_anthropic(status)) {
        if (identical(status, "ended")) {
          output_path <- anthropic_download_batch_results(
            batch_id    = job$batch_id,
            output_path = job$batch_output_path
          )

          res <- parse_anthropic_batch_output(
            jsonl_path  = output_path,
            tag_prefix  = "<BETTER_SAMPLE>",
            tag_suffix  = "</BETTER_SAMPLE>"
          )

          jobs[[j]]$results <- res
          readr::write_csv(res, job$csv_path)
          message("    -> Results written to: ", job$csv_path)
        }
        jobs[[j]]$done <- TRUE
      }
    } else if (identical(batch_type, "gemini")) {
      batch <- gemini_get_batch(job$batch_id)
      state <- batch$state %||% "STATE_UNSPECIFIED"
      message("  [Gemini] ", job$prefix, " state: ", state)

      if (is_terminal_gemini(state)) {
        if (identical(state, "SUCCEEDED")) {
          raw_res <- gemini_download_batch_results(job$batch_id)

          res <- parse_gemini_batch_output(
            raw_results = raw_res,
            tag_prefix  = "<BETTER_SAMPLE>",
            tag_suffix  = "</BETTER_SAMPLE>"
          )

          jobs[[j]]$results <- res
          readr::write_csv(res, job$csv_path)
          message("    -> Results written to: ", job$csv_path)
        }
        jobs[[j]]$done <- TRUE
      }
    }

    Sys.sleep(per_job_delay)
  }

  unfinished <- which(!vapply(jobs, `[[`, logical(1), "done"))

  if (length(unfinished) > 0L) {
    message("Sleeping ", interval_seconds, " seconds before next poll...")
    Sys.sleep(interval_seconds)
  }
}

message("All batches have reached a terminal state.")

At the end of this loop:

7. Resuming After Interruption

If the polling loop is interrupted:

  1. Restart R.
  2. Reload batch_jobs_index.csv.
  3. Rebuild jobs as above.
  4. Recompute unfinished and re-enter the polling loop.

Because all of the essential metadata (provider, model, template, direction, batch IDs, file paths) is stored in the registry CSV, you can safely recover and continue.

For example:

jobs_index_path <- file.path(out_dir, "batch_jobs_index.csv")
jobs_tbl <- readr::read_csv(jobs_index_path, show_col_types = FALSE)

# Rebuild jobs list as before...
# Then:
unfinished <- which(!vapply(jobs, `[[`, logical(1), "done"))

if (length(unfinished) > 0L) {
  message("Resuming polling for ", length(unfinished), " unfinished batch(es).")
  # ... re-enter the polling loop ...
} else {
  message("All jobs are already complete.")
}

8. Next Steps

Once you have per-job results CSVs (e.g., one per template × model × thinking × direction), you can:

9. Citation

Mercer, S. (2025). Advanced: Submitting and Polling Multiple Batches (Version 1.0.0) [R package vignette]. In pairwiseLLM: Pairwise Comparison Tools for Large Language Model-Based Writing Evaluation. https://shmercer.github.io/pairwiseLLM/