Table$to_read_streams

Table$to_read_streams(target_count=parallelly::availableCores(), *, variables=None) → list(ReadStream)

Returns a list of ReadStreams that can be used to consume and process the table in parallel.

circle-info

Also callable on Queries and Uploads →  query$to_read_streams(...) | upload$to_read_streams(...)

Parameters:

target_count : int, default parallelly::availableCores() The target number of streams to return. Note that the actual number of streams returned may be different than this number – smaller tables will generally return fewer streams, and unreleased tables that span across mutliple uploads may return more streams.

variables : list<str>, default None A list of variable names to read, improving performance when not all variables are needed. If unspecified, all variables will be represented in the returned rows. Variable names are case-insensitive, though the names in the results will reflect the variable's true casing. The order of the columns returned will correspond to the order of names in this list.

Returns:

list( ReadStream )

Examples:

import redivis
from concurrent.futures import ThreadPoolExecutor, as_completed

table = redivis.table(
    "demo.cms_2014_medicare_data:349j.physicians_and_other_supplier:kn00"
)

streams = table.to_read_streams(
    variables=["average_submitted_chrg_amt"], target_count=4
)

def process_stream(stream):
    """Process a single stream, returning the sum and count for the column."""
    total = 0
    count = 0
    # Each stream can be read as an Arrow record batch reader

    for batch in stream.to_arrow_batch_iterator():
        print(batch)
        column = batch.column("average_submitted_chrg_amt")
        # Filter out nulls
        valid = column.drop_null()
        total += valid.to_pylist().__len__() and sum(valid.to_pylist()) or 0
        count += len(valid)
    return total, count

# Process all streams in parallel using a thread pool
grand_total = 0
grand_count = 0

with ThreadPoolExecutor(max_workers=len(streams)) as executor:
    futures = {
        executor.submit(process_stream, stream): stream for stream in streams
    }
    for future in as_completed(futures):
        total, count = future.result()
        grand_total += total
        grand_count += count

average = grand_total / grand_count if grand_count > 0 else 0
print(f"Average submitted charge amount: ${average:.2f}")
print(f"Computed across {grand_count} rows using {len(streams)} parallel streams")

Last updated

Was this helpful?