From 79eb9ac44324013a5388029d82a6b874a789a953 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 31 Oct 2025 19:52:53 +0000 Subject: [PATCH 1/3] perf: Avoid requery for some result downsample methods --- bigframes/core/blocks.py | 71 ++++++++++++---------------- bigframes/core/bq_data.py | 13 ++++- bigframes/core/local_data.py | 11 ++++- bigframes/session/executor.py | 15 +++--- tests/system/small/test_dataframe.py | 6 +-- 5 files changed, 62 insertions(+), 54 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 41986ce5df..e7d7e3bfaa 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -814,49 +814,36 @@ def _materialize_local( total_rows = result_batches.approx_total_rows # Remove downsampling config from subsequent invocations, as otherwise could result in many # iterations if downsampling undershoots - return self._downsample( - total_rows=total_rows, - sampling_method=sample_config.sampling_method, - fraction=fraction, - random_state=sample_config.random_state, - )._materialize_local( - MaterializationOptions(ordered=materialize_options.ordered) - ) - else: - df = result_batches.to_pandas() - df = self._copy_index_to_pandas(df) - df.set_axis(self.column_labels, axis=1, copy=False) - return df, execute_result.query_job - - def _downsample( - self, total_rows: int, sampling_method: str, fraction: float, random_state - ) -> Block: - # either selecting fraction or number of rows - if sampling_method == _HEAD: - filtered_block = self.slice(stop=int(total_rows * fraction)) - return filtered_block - elif (sampling_method == _UNIFORM) and (random_state is None): - filtered_expr = self.expr._uniform_sampling(fraction) - block = Block( - filtered_expr, - index_columns=self.index_columns, - column_labels=self.column_labels, - index_labels=self.index.names, - ) - return block - elif sampling_method == _UNIFORM: - block = self.split( - fracs=(fraction,), - random_state=random_state, - sort=False, - )[0] - return block + if sample_config.sampling_method == "head": + # Just truncates the result iterator without a follow-up query + raw_df = result_batches.to_pandas(limit=int(total_rows * fraction)) + elif ( + sample_config.sampling_method == "uniform" + and sample_config.random_state is None + ): + # Pushes sample into result without new query + sampled_batches = execute_result.batches(sample_rate=fraction) + raw_df = sampled_batches.to_pandas() + else: # uniform sample with random state requires a full follow-up query + return self._downsample( + fraction=fraction, + random_state=sample_config.random_state, + )._materialize_local( + MaterializationOptions(ordered=materialize_options.ordered) + ) else: - # This part should never be called, just in case. - raise NotImplementedError( - f"The downsampling method {sampling_method} is not implemented, " - f"please choose from {','.join(_SAMPLING_METHODS)}." - ) + raw_df = result_batches.to_pandas() + df = self._copy_index_to_pandas(raw_df) + df.set_axis(self.column_labels, axis=1, copy=False) + return df, execute_result.query_job + + def _downsample(self, fraction: float, random_state) -> Block: + block = self.split( + fracs=(fraction,), + random_state=random_state, + sort=False, + )[0] + return block def split( self, diff --git a/bigframes/core/bq_data.py b/bigframes/core/bq_data.py index c72de6ead6..0125b7d07d 100644 --- a/bigframes/core/bq_data.py +++ b/bigframes/core/bq_data.py @@ -171,11 +171,22 @@ def get_arrow_batches( columns: Sequence[str], storage_read_client: bigquery_storage_v1.BigQueryReadClient, project_id: str, + sample_rate: Optional[float] = None, ) -> ReadResult: table_mod_options = {} read_options_dict: dict[str, Any] = {"selected_fields": list(columns)} + + predicates = [] if data.sql_predicate: - read_options_dict["row_restriction"] = data.sql_predicate + predicates.append(data.sql_predicate) + if sample_rate is not None: + assert isinstance(sample_rate, float) + predicates.append(f"RAND() < {sample_rate}") + + if predicates: + full_predicates = " AND ".join(f"( {pred} )" for pred in predicates) + read_options_dict["row_restriction"] = full_predicates + read_options = bq_storage_types.ReadSession.TableReadOptions(**read_options_dict) if data.at_time: diff --git a/bigframes/core/local_data.py b/bigframes/core/local_data.py index fa18f00483..086fbce95c 100644 --- a/bigframes/core/local_data.py +++ b/bigframes/core/local_data.py @@ -25,6 +25,7 @@ import uuid import geopandas # type: ignore +import numpy import numpy as np import pandas as pd import pyarrow as pa @@ -124,12 +125,20 @@ def to_arrow( geo_format: Literal["wkb", "wkt"] = "wkt", duration_type: Literal["int", "duration"] = "duration", json_type: Literal["string"] = "string", + sample_rate: Optional[float] = None, ) -> tuple[pa.Schema, Iterable[pa.RecordBatch]]: if geo_format != "wkt": raise NotImplementedError(f"geo format {geo_format} not yet implemented") assert json_type == "string" - batches = self.data.to_batches() + data = self.data + + # This exists for symmetry with remote sources, but sampling local data like this shouldn't really happen + if sample_rate is not None: + to_take = numpy.random.rand(data.num_rows) < sample_rate + data = data.filter(to_take) + + batches = data.to_batches() schema = self.data.schema if duration_type == "int": schema = _schema_durations_to_ints(schema) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index bca98bfb2f..49617540db 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -107,8 +107,8 @@ def to_arrow_table(self) -> pyarrow.Table: # Bug with some pyarrow versions, empty_table only supports base storage types, not extension types. return self._schema.to_pyarrow(use_storage_types=True).empty_table() - def to_pandas(self) -> pd.DataFrame: - return io_pandas.arrow_to_pandas(self.to_arrow_table(), self._schema) + def to_pandas(self, limit: Optional[int] = None) -> pd.DataFrame: + return pd.concat(self.to_pandas_batches(max_results=limit)) def to_pandas_batches( self, page_size: Optional[int] = None, max_results: Optional[int] = None @@ -158,7 +158,7 @@ def schema(self) -> bigframes.core.schema.ArraySchema: ... @abc.abstractmethod - def batches(self) -> ResultsIterator: + def batches(self, sample_rate: Optional[float] = None) -> ResultsIterator: ... @property @@ -200,9 +200,9 @@ def execution_metadata(self) -> ExecutionMetadata: def schema(self) -> bigframes.core.schema.ArraySchema: return self._data.schema - def batches(self) -> ResultsIterator: + def batches(self, sample_rate: Optional[float] = None) -> ResultsIterator: return ResultsIterator( - iter(self._data.to_arrow()[1]), + iter(self._data.to_arrow(sample_rate=sample_rate)[1]), self.schema, self._data.metadata.row_count, self._data.metadata.total_bytes, @@ -226,7 +226,7 @@ def execution_metadata(self) -> ExecutionMetadata: def schema(self) -> bigframes.core.schema.ArraySchema: return self._schema - def batches(self) -> ResultsIterator: + def batches(self, sample_rate: Optional[float] = None) -> ResultsIterator: return ResultsIterator(iter([]), self.schema, 0, 0) @@ -260,12 +260,13 @@ def schema(self) -> bigframes.core.schema.ArraySchema: source_ids = [selection[0] for selection in self._selected_fields] return self._data.schema.select(source_ids).rename(dict(self._selected_fields)) - def batches(self) -> ResultsIterator: + def batches(self, sample_rate: Optional[float] = None) -> ResultsIterator: read_batches = bq_data.get_arrow_batches( self._data, [x[0] for x in self._selected_fields], self._storage_client, self._project_id, + sample_rate=sample_rate, ) arrow_batches: Iterator[pa.RecordBatch] = map( functools.partial( diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 79f8efd00f..17b6ad511c 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4473,7 +4473,7 @@ def test_df_kurt(scalars_dfs): "n_default", ], ) -def test_sample(scalars_dfs, frac, n, random_state): +def test_df_to_pandas_sample(scalars_dfs, frac, n, random_state): scalars_df, _ = scalars_dfs df = scalars_df.sample(frac=frac, n=n, random_state=random_state) bf_result = df.to_pandas() @@ -4484,7 +4484,7 @@ def test_sample(scalars_dfs, frac, n, random_state): assert bf_result.shape[1] == scalars_df.shape[1] -def test_sample_determinism(penguins_df_default_index): +def test_df_to_pandas_sample_determinism(penguins_df_default_index): df = penguins_df_default_index.sample(n=100, random_state=12345).head(15) bf_result = df.to_pandas() bf_result2 = df.to_pandas() @@ -4492,7 +4492,7 @@ def test_sample_determinism(penguins_df_default_index): pandas.testing.assert_frame_equal(bf_result, bf_result2) -def test_sample_raises_value_error(scalars_dfs): +def test_df_to_pandas_sample_raises_value_error(scalars_dfs): scalars_df, _ = scalars_dfs with pytest.raises( ValueError, match="Only one of 'n' or 'frac' parameter can be specified." From 227ad25a8271846c623d852b8519bb4ec15ff926 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 11 Nov 2025 21:37:20 +0000 Subject: [PATCH 2/3] fix empty table handling --- bigframes/session/executor.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 49617540db..2cbf6d8705 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -88,7 +88,7 @@ def arrow_batches(self) -> Iterator[pyarrow.RecordBatch]: yield batch - def to_arrow_table(self) -> pyarrow.Table: + def to_arrow_table(self, limit: Optional[int] = None) -> pyarrow.Table: # Need to provide schema if no result rows, as arrow can't infer # If ther are rows, it is safest to infer schema from batches. # Any discrepencies between predicted schema and actual schema will produce errors. @@ -97,9 +97,12 @@ def to_arrow_table(self) -> pyarrow.Table: peek_value = list(peek_it) # TODO: Enforce our internal schema on the table for consistency if len(peek_value) > 0: - return pyarrow.Table.from_batches( - itertools.chain(peek_value, batches), # reconstruct - ) + batches = itertools.chain(peek_value, batches) # reconstruct + if limit: + batches = pyarrow_utils.truncate_pyarrow_iterable( + batches, max_results=limit + ) + return pyarrow.Table.from_batches(batches) else: try: return self._schema.to_pyarrow().empty_table() @@ -108,7 +111,7 @@ def to_arrow_table(self) -> pyarrow.Table: return self._schema.to_pyarrow(use_storage_types=True).empty_table() def to_pandas(self, limit: Optional[int] = None) -> pd.DataFrame: - return pd.concat(self.to_pandas_batches(max_results=limit)) + return io_pandas.arrow_to_pandas(self.to_arrow_table(limit=limit), self._schema) def to_pandas_batches( self, page_size: Optional[int] = None, max_results: Optional[int] = None From 1bc6a6cf28f6aabb025cc740d4e42882176761e6 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 13 Nov 2025 21:16:39 +0000 Subject: [PATCH 3/3] cleanup --- bigframes/core/blocks.py | 16 +++++----------- tests/system/small/test_anywidget.py | 2 +- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 180b72b25f..10fb8e989d 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -829,10 +829,12 @@ def _materialize_local( sampled_batches = execute_result.batches(sample_rate=fraction) raw_df = sampled_batches.to_pandas() else: # uniform sample with random state requires a full follow-up query - return self._downsample( - fraction=fraction, + down_sampled_block = self.split( + fracs=(fraction,), random_state=sample_config.random_state, - )._materialize_local( + sort=False, + )[0] + return down_sampled_block._materialize_local( MaterializationOptions(ordered=materialize_options.ordered) ) else: @@ -841,14 +843,6 @@ def _materialize_local( df.set_axis(self.column_labels, axis=1, copy=False) return df, execute_result.query_job - def _downsample(self, fraction: float, random_state) -> Block: - block = self.split( - fracs=(fraction,), - random_state=random_state, - sort=False, - )[0] - return block - def split( self, ns: Iterable[int] = (), diff --git a/tests/system/small/test_anywidget.py b/tests/system/small/test_anywidget.py index 51c39c2aec..d8a8e64edd 100644 --- a/tests/system/small/test_anywidget.py +++ b/tests/system/small/test_anywidget.py @@ -132,7 +132,7 @@ def execution_metadata(self) -> ExecutionMetadata: def schema(self): return schema - def batches(self) -> ResultsIterator: + def batches(self, sample_rate=None) -> ResultsIterator: return ResultsIterator( arrow_batches_val, self.schema,