scdataset.strategy.Streaming#
- class scdataset.strategy.Streaming(indices: _Buffer | _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes] | None = None, shuffle: bool = False)[source]
Bases:
SamplingStrategySequential streaming sampling strategy with optional buffer-level shuffling.
This strategy provides indices in sequential order, with optional shuffling at the buffer level (defined by fetch_factor in scDataset). When shuffle=True, batches within each fetch buffer are shuffled, similar to Ray Dataset or WebDataset behavior, while maintaining overall sequential order across buffers.
- Parameters:
indices (array-like, optional) – Subset of indices to use for sampling. If None, uses all indices from 0 to len(data_collection)-1.
shuffle (bool, default=False) – Whether to shuffle batches within each fetch buffer. When True, enables buffer-level shuffling that maintains sequential order between buffers but randomizes the order of batches within each buffer (defined by fetch_factor * batch_size).
- Variables:
_shuffle_before_yield (bool) – Controlled by the shuffle parameter. True if buffer-level shuffling is enabled, False otherwise.
_indices (numpy.ndarray or None) – Stored subset of indices if provided.
shuffle (bool) – Whether buffer-level shuffling is enabled.
Examples
>>> # Stream through entire dataset without shuffling >>> strategy = Streaming() >>> indices = strategy.get_indices(range(100)) >>> len(indices) 100
>>> # Stream through subset of indices >>> subset_strategy = Streaming(indices=[10, 20, 30]) >>> indices = subset_strategy.get_indices(range(100)) >>> list(indices) [10, 20, 30]
>>> # Stream with buffer-level shuffling (like Ray Dataset/WebDataset) >>> shuffle_strategy = Streaming(shuffle=True) >>> # Batches within each fetch buffer will be shuffled, >>> # but buffers themselves maintain sequential order
See also
BlockShufflingFor shuffled block-based sampling
BlockWeightedSamplingFor weighted sampling with shuffling
Notes
When shuffle=True, this strategy provides behavior similar to:
Ray Dataset’s local shuffling within windows
WebDataset’s shuffle buffer functionality
The key difference from BlockShuffling is that Streaming maintains the overall sequential order of fetch buffers, only shuffling within each buffer, while BlockShuffling shuffles the order of blocks themselves.
Methods
__init__([indices, shuffle])Initialize streaming strategy.
get_indices(data_collection[, seed, rng])Get indices for streaming sampling.
get_len(data_collection)Get the effective length of the data collection for this strategy.
- __init__(indices: _Buffer | _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes] | None = None, shuffle: bool = False)[source]
Initialize streaming strategy.
- Parameters:
indices (array-like, optional) – Subset of indices to stream through. If None, streams through all available indices.
shuffle (bool, default=False) – Whether to enable buffer-level shuffling. When True, batches within each fetch buffer are shuffled while maintaining sequential order between buffers.
- get_len(data_collection) int[source]
Get the effective length of the data collection for this strategy.
- Parameters:
data_collection (object) – The data collection to get length from. Must support
len().- Returns:
Number of samples that will be yielded by this strategy.
- Return type:
Examples
>>> strategy = Streaming() >>> strategy.get_len(range(100)) 100
>>> subset_strategy = Streaming(indices=[1, 3, 5]) >>> subset_strategy.get_len(range(100)) 3
- get_indices(data_collection, seed: int | None = None, rng: Generator | None = None) ndarray[tuple[int, ...], dtype[int64]][source]
Get indices for streaming sampling.
Returns indices in sequential order. If shuffle=True was set during initialization, the _shuffle_before_yield attribute will cause buffer-level shuffling during iteration.
- Parameters:
data_collection (object) – The data collection to sample from. Must support
len().seed (int, optional) – Random seed. Only used if shuffle=True for buffer-level shuffling during iteration, not for index generation which remains sequential.
rng (numpy.random.Generator, optional) – Random number generator. Only used if shuffle=True for buffer-level shuffling during iteration.
- Returns:
Array of indices in sequential order.
- Return type:
Examples
>>> strategy = Streaming() >>> indices = strategy.get_indices(range(5)) >>> list(indices) [0, 1, 2, 3, 4]
>>> subset_strategy = Streaming(indices=[2, 4, 6]) >>> indices = subset_strategy.get_indices(range(10)) >>> list(indices) [2, 4, 6]
>>> # With shuffle=True, indices are still sequential >>> shuffle_strategy = Streaming(shuffle=True) >>> indices = shuffle_strategy.get_indices(range(5)) >>> list(indices) # Still sequential - shuffling happens at buffer level [0, 1, 2, 3, 4]