Table$to_read_streams

Table$to_read_streams(target_count=parallelly::availableCores(), *, variables=None) → list(ReadStream)

Returns a list of ReadStreams that can be used to consume and process the table in parallel.

circle-info

Also callable on Queries and Uploads →  query$to_read_streams(...) | upload$to_read_streams(...)

Parameters:

target_count : int, default parallelly::availableCores() The target number of streams to return. Note that the actual number of streams returned may be different than this number – smaller tables will generally return fewer streams, and unreleased tables that span across mutliple uploads may return more streams.

variables : list<str>, default None A list of variable names to read, improving performance when not all variables are needed. If unspecified, all variables will be represented in the returned rows. Variable names are case-insensitive, though the names in the results will reflect the variable's true casing. The order of the columns returned will correspond to the order of names in this list.

Returns:

list( ReadStream )

Examples:

library(redivis)
library(arrow)
library(parallel)

table <- redivis$table(
  "demo.cms_2014_medicare_data:349j.physicians_and_other_supplier:kn00"
)

streams <- table$to_read_streams(
  variables = list("average_submitted_chrg_amt"),
  target_count = 4L
)

process_stream <- function(stream) {
  reader <- stream$to_arrow_batch_reader()
  on.exit(reader$close())
  total <- 0
  count <- 0L

  while (TRUE) {
    batch <- reader$read_next_batch()
    if (is.null(batch)) break

    column <- as.vector(batch[["average_submitted_chrg_amt"]])
    total <- total + as.numeric(sum(column, na.rm = TRUE))
    count <- count + length(column)
  }

  list(total = total, count = count)
}

# Process streams in parallel
future::plan(future::multisession, workers = length(streams))
results <- furrr::future_map(streams, process_stream)

grand_total <- sum(vapply(results, `[[`, numeric(1), "total"))
grand_count <- sum(vapply(results, `[[`, integer(1), "count"))

average <- if (grand_count > 0) grand_total / grand_count else 0

cat(sprintf("Average submitted charge amount: $%.2f\n", average))
cat(sprintf(
  "Computed across %d rows using %d parallel streams\n",
  grand_count,
  length(streams)
))

Last updated

Was this helpful?