## ----setup, include=FALSE----------------------------------------------------- knitr::opts_chunk$set(collapse = TRUE, comment = "#>") if (!requireNamespace("bigmemory", quietly = TRUE)) { cat("This vignette requires the 'bigmemory' package.\n") knitr::knit_exit() } library(bigKNN) library(bigmemory) ## ----helpers, include=FALSE--------------------------------------------------- job_summary <- function(job) { data.frame( status = job$status, metric = job$metric, queries = job$n_query, edges = job$n_edge, checkpoint = basename(job$checkpoint_path), row.names = NULL ) } checkpoint_summary <- function(spec, fields) { data.frame( field = fields, value = vapply(fields, function(field) { value <- spec[[field]] if (is.null(value)) { "NULL" } else { paste(value, collapse = ", ") } }, character(1)), row.names = NULL ) } read_graph_store <- function(xp_from, xp_to, xp_value = NULL) { out <- data.frame( from = as.integer(as.vector(bigmemory::as.matrix(xp_from))), to = as.integer(as.vector(bigmemory::as.matrix(xp_to))), row.names = NULL ) if (!is.null(xp_value)) { out$value <- as.numeric(as.vector(bigmemory::as.matrix(xp_value))) } out } read_radius_store <- function(xp_index, xp_distance, xp_offset, query_ids) { index <- as.integer(as.vector(bigmemory::as.matrix(xp_index))) distance <- as.numeric(as.vector(bigmemory::as.matrix(xp_distance))) offset <- as.integer(as.vector(bigmemory::as.matrix(xp_offset))) counts <- diff(offset) data.frame( query = rep(query_ids, times = counts), neighbor = index, distance = signif(distance, 5), row.names = NULL ) } ## ----create-workspace--------------------------------------------------------- scratch_dir <- file.path(tempdir(), "bigknn-resumable-jobs") dir.create(scratch_dir, recursive = TRUE, showWarnings = FALSE) ## ----graph-reference---------------------------------------------------------- graph_points <- data.frame( id = paste0("g", 1:4), x1 = c(0, 1, 5, 6), x2 = c(0, 0, 0, 0) ) graph_ref <- filebacked.big.matrix( nrow = nrow(graph_points), ncol = 2, type = "double", backingfile = "graph-ref.bin", descriptorfile = "graph-ref.desc", backingpath = scratch_dir ) graph_ref[,] <- as.matrix(graph_points[c("x1", "x2")]) graph_points ## ----graph-job---------------------------------------------------------------- k <- 1L n_edge <- nrow(graph_ref) * k graph_from <- filebacked.big.matrix( nrow = n_edge, ncol = 1, type = "integer", backingfile = "graph-from.bin", descriptorfile = "graph-from.desc", backingpath = scratch_dir ) graph_to <- filebacked.big.matrix( nrow = n_edge, ncol = 1, type = "integer", backingfile = "graph-to.bin", descriptorfile = "graph-to.desc", backingpath = scratch_dir ) graph_value <- filebacked.big.matrix( nrow = n_edge, ncol = 1, type = "double", backingfile = "graph-value.bin", descriptorfile = "graph-value.desc", backingpath = scratch_dir ) graph_checkpoint <- file.path(scratch_dir, "graph-job.rds") graph_job <- knn_graph_stream_bigmatrix( graph_ref, k = k, xpFrom = graph_from, xpTo = graph_to, xpValue = graph_value, checkpoint_path = graph_checkpoint ) job_summary(graph_job) read_graph_store(graph_from, graph_to, graph_value) ## ----radius-reference--------------------------------------------------------- radius_points <- data.frame( id = paste0("r", 1:4), x1 = c(1, 0, 1, 2), x2 = c(0, 1, 1, 1) ) radius_ref <- filebacked.big.matrix( nrow = nrow(radius_points), ncol = 2, type = "double", backingfile = "radius-ref.bin", descriptorfile = "radius-ref.desc", backingpath = scratch_dir ) radius_ref[,] <- as.matrix(radius_points[c("x1", "x2")]) radius_points ## ----radius-job--------------------------------------------------------------- radius_counts <- count_within_radius_bigmatrix(radius_ref, radius = 1.1) total_matches <- sum(radius_counts) radius_index <- filebacked.big.matrix( nrow = total_matches, ncol = 1, type = "integer", backingfile = "radius-index.bin", descriptorfile = "radius-index.desc", backingpath = scratch_dir ) radius_distance <- filebacked.big.matrix( nrow = total_matches, ncol = 1, type = "double", backingfile = "radius-distance.bin", descriptorfile = "radius-distance.desc", backingpath = scratch_dir ) radius_offset <- filebacked.big.matrix( nrow = length(radius_counts) + 1L, ncol = 1, type = "double", backingfile = "radius-offset.bin", descriptorfile = "radius-offset.desc", backingpath = scratch_dir ) radius_checkpoint <- file.path(scratch_dir, "radius-job.rds") radius_job <- radius_stream_job_bigmatrix( radius_ref, xpIndex = radius_index, xpDistance = radius_distance, xpOffset = radius_offset, radius = 1.1, checkpoint_path = radius_checkpoint ) radius_counts job_summary(radius_job) as.integer(as.vector(bigmemory::as.matrix(radius_offset))) read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id) ## ----resume-graph------------------------------------------------------------- graph_expected <- as.data.frame( knn_graph_bigmatrix(graph_ref, k = 1, format = "edge_list", symmetrize = "none") ) attr(graph_expected, "bigknn_graph") <- NULL graph_spec <- readRDS(graph_checkpoint) graph_spec$status <- "running" graph_spec$next_row <- 3L graph_spec$next_edge <- 3L saveRDS(graph_spec, graph_checkpoint) graph_from[, 1] <- 0L graph_to[, 1] <- 0L graph_value[, 1] <- 0 graph_from[1:2, 1] <- as.integer(graph_expected$from[1:2]) graph_to[1:2, 1] <- as.integer(graph_expected$to[1:2]) graph_value[1:2, 1] <- graph_expected$distance[1:2] resumed_graph_job <- resume_knn_job(graph_checkpoint) job_summary(resumed_graph_job) read_graph_store(graph_from, graph_to, graph_value) ## ----resume-radius------------------------------------------------------------ radius_expected <- radius_bigmatrix(radius_ref, radius = 1.1) radius_spec <- readRDS(radius_checkpoint) radius_spec$status <- "running" radius_spec$phase <- "collect" radius_spec$next_row <- 3L saveRDS(radius_spec, radius_checkpoint) radius_index[, 1] <- 0L radius_distance[, 1] <- 0 radius_offset[, 1] <- 0 prefix_end <- radius_expected$offset[3L] - 1L radius_index[seq_len(prefix_end), 1] <- as.integer(radius_expected$index[seq_len(prefix_end)]) radius_distance[seq_len(prefix_end), 1] <- radius_expected$distance[seq_len(prefix_end)] resumed_radius_job <- resume_knn_job(radius_checkpoint) job_summary(resumed_radius_job) as.integer(as.vector(bigmemory::as.matrix(radius_offset))) read_radius_store(radius_index, radius_distance, radius_offset, radius_points$id) ## ----checkpoint-fields-------------------------------------------------------- graph_spec <- readRDS(graph_checkpoint) radius_spec <- readRDS(radius_checkpoint) checkpoint_summary(graph_spec, c("type", "status", "k", "next_row", "next_edge")) checkpoint_summary(radius_spec, c("type", "status", "phase", "next_row", "total_matches"))