ReadStream

class ReadStream

ReadStreams are an advanced concept that can be used for processing large tabular data in parallel. For example, a table can be broken into multiple ReadStreams for processing, where each stream can be read simultaneous in a separate thread / process.

Constructors

Upload.to_read_streams([target_count])

Create parallel read streams to read an upload.

Query.to_read_streams([target_count])

Create parallel read streams to read query results.

Table.to_read_streams([target_count])

Create parallel read streams to read a table.

Examples

import redivis
from concurrent.futures import ThreadPoolExecutor, as_completed

table = redivis.table(
    "demo.cms_2014_medicare_data:349j.physicians_and_other_supplier:kn00"
)

streams = table.to_read_streams(
    variables=["average_submitted_chrg_amt"], target_count=4
)

def process_stream(stream):
    """Process a single stream, returning the sum and count for the column."""
    total = 0
    count = 0
    # Each stream can be read as an Arrow record batch reader

    for batch in stream.to_arrow_batch_iterator():
        column = batch.column("average_submitted_chrg_amt")
        # Filter out nulls
        valid = column.drop_null()
        total += valid.to_pylist().__len__() and sum(valid.to_pylist()) or 0
        count += len(valid)
    return total, count

# Process all streams in parallel using a thread pool
grand_total = 0
grand_count = 0

with ThreadPoolExecutor(max_workers=len(streams)) as executor:
    futures = {
        executor.submit(process_stream, stream): stream for stream in streams
    }
    for future in as_completed(futures):
        total, count = future.result()
        grand_total += total
        grand_count += count

average = grand_total / grand_count if grand_count > 0 else 0
print(f"Average submitted charge amount: ${average:.2f}")
print(f"Computed across {grand_count} rows using {len(streams)} parallel streams")

Attributes

properties

A dict containing the attributes id and estimatedRows.

Methods

ReadStream.to_*([max_results, *, ...])

Various methods to read query results. Mirrors the various Table.to_* methods (e.g., ReadStream.to_pandas_dataframe()

Last updated

Was this helpful?