# ReadStream

## *class* <mark style="color:purple;">ReadStream</mark>

ReadStreams are an advanced concept that can be used for processing large tabular data in parallel. For example, a table can be broken into multiple ReadStreams for processing, where each stream can be read simultaneous in a separate thread / process.

## Constructors

<table data-header-hidden><thead><tr><th width="334.4296875">Method</th><th>Description</th></tr></thead><tbody><tr><td><a href="upload/upload.to_"><strong><code>Upload.to_read_streams</code></strong></a>([target_count])</td><td>Create parallel read streams to read an upload.</td></tr><tr><td><a href="query/query.to_"><strong><code>Query.to_read_streams</code></strong></a>([target_count])</td><td>Create parallel read streams to read query results.</td></tr><tr><td><a href="table/table.to_read_streams"><strong><code>Table.to_read_streams</code></strong></a>([target_count])</td><td>Create parallel read streams to read a table.</td></tr></tbody></table>

## Examples

```python
import redivis
from concurrent.futures import ThreadPoolExecutor, as_completed

table = redivis.table(
    "demo.cms_2014_medicare_data:349j.physicians_and_other_supplier:kn00"
)

streams = table.to_read_streams(
    variables=["average_submitted_chrg_amt"], target_count=4
)

def process_stream(stream):
    """Process a single stream, returning the sum and count for the column."""
    total = 0
    count = 0
    # Each stream can be read as an Arrow record batch reader

    for batch in stream.to_arrow_batch_iterator():
        column = batch.column("average_submitted_chrg_amt")
        # Filter out nulls
        valid = column.drop_null()
        total += valid.to_pylist().__len__() and sum(valid.to_pylist()) or 0
        count += len(valid)
    return total, count

# Process all streams in parallel using a thread pool
grand_total = 0
grand_count = 0

with ThreadPoolExecutor(max_workers=len(streams)) as executor:
    futures = {
        executor.submit(process_stream, stream): stream for stream in streams
    }
    for future in as_completed(futures):
        total, count = future.result()
        grand_total += total
        grand_count += count

average = grand_total / grand_count if grand_count > 0 else 0
print(f"Average submitted charge amount: ${average:.2f}")
print(f"Computed across {grand_count} rows using {len(streams)} parallel streams")
```

## Attributes

| **`properties`** | A dict containing the attributes `id` and `estimatedRows`. |
| ---------------- | ---------------------------------------------------------- |

## Methods

<table data-header-hidden><thead><tr><th width="441"></th><th></th></tr></thead><tbody><tr><td><a href="readstream/readstream.to_"><strong><code>ReadStream.to_*</code></strong></a>([max_results, *, ...])</td><td>Various methods to read query results. Mirrors the various <a href="table">Table.to_*</a> methods (e.g., <code>ReadStream.to_pandas_dataframe()</code></td></tr></tbody></table>
