ReadStream

class ReadStream

ReadStreams are an advanced concept that can be used for processing large tabular data in parallel. For example, a table can be broken into multiple ReadStreams for processing, where each stream can be read simultaneous in a separate thread / process.

Constructors

Create parallel read streams to read an upload.

Query$to_read_streams(target_count)

Create parallel read streams to read query results.

Table$to_read_streams(target_count)

Create parallel read streams to read a table.

Examples

library(redivis)
library(arrow)
library(parallel)

table <- redivis$table(
  "demo.cms_2014_medicare_data:349j.physicians_and_other_supplier:kn00"
)

streams <- table$to_read_streams(
  variables = list("average_submitted_chrg_amt"),
  target_count = 4L
)

process_stream <- function(stream) {
  reader <- stream$to_arrow_batch_reader()
  on.exit(reader$close())
  total <- 0
  count <- 0L

  while (TRUE) {
    batch <- reader$read_next_batch()
    if (is.null(batch)) break

    column <- as.vector(batch[["average_submitted_chrg_amt"]])
    total <- total + as.numeric(sum(column, na.rm = TRUE))
    count <- count + length(column)
  }

  list(total = total, count = count)
}

# Process streams in parallel
future::plan(future::multisession, workers = length(streams))
results <- furrr::future_map(streams, process_stream)

grand_total <- sum(vapply(results, `[[`, numeric(1), "total"))
grand_count <- sum(vapply(results, `[[`, integer(1), "count"))

average <- if (grand_count > 0) grand_total / grand_count else 0

cat(sprintf("Average submitted charge amount: $%.2f\n", average))
cat(sprintf(
  "Computed across %d rows using %d parallel streams\n",
  grand_count,
  length(streams)
))

Attributes

properties

A dict containing the attributes id and estimatedRows.

Methods

ReadStream$to_*([max_results, *, ...])

Various methods to read query results. Mirrors the various Table$to_* methods (e.g., ReadStream$to_tibble()

Last updated

Was this helpful?