diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4d594ddfbc..7a34b152fd 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1969,7 +1969,7 @@ def to_pandas_batches( max_results: Optional[int] = None, *, allow_large_results: Optional[bool] = None, - ) -> Iterable[pandas.DataFrame]: + ) -> blocks.PandasBatches: """Stream DataFrame results to an iterable of pandas DataFrame. page_size and max_results determine the size and number of batches, diff --git a/bigframes/display/anywidget.py b/bigframes/display/anywidget.py index 5c1db93dce..a81aff9080 100644 --- a/bigframes/display/anywidget.py +++ b/bigframes/display/anywidget.py @@ -20,7 +20,8 @@ from importlib import resources import functools import math -from typing import Any, Dict, Iterator, List, Optional, Type +import threading +from typing import Any, Iterator, Optional import uuid import pandas as pd @@ -39,15 +40,15 @@ import anywidget import traitlets - ANYWIDGET_INSTALLED = True + _ANYWIDGET_INSTALLED = True except Exception: - ANYWIDGET_INSTALLED = False + _ANYWIDGET_INSTALLED = False -WIDGET_BASE: Type[Any] -if ANYWIDGET_INSTALLED: - WIDGET_BASE = anywidget.AnyWidget +_WIDGET_BASE: type[Any] +if _ANYWIDGET_INSTALLED: + _WIDGET_BASE = anywidget.AnyWidget else: - WIDGET_BASE = object + _WIDGET_BASE = object @dataclasses.dataclass(frozen=True) @@ -56,7 +57,7 @@ class _SortState: ascending: bool -class TableWidget(WIDGET_BASE): +class TableWidget(_WIDGET_BASE): """An interactive, paginated table widget for BigFrames DataFrames. This widget provides a user-friendly way to display and navigate through @@ -65,12 +66,8 @@ class TableWidget(WIDGET_BASE): page = traitlets.Int(0).tag(sync=True) page_size = traitlets.Int(0).tag(sync=True) - row_count = traitlets.Union( - [traitlets.Int(), traitlets.Instance(type(None))], - default_value=None, - allow_none=True, - ).tag(sync=True) - table_html = traitlets.Unicode().tag(sync=True) + row_count = traitlets.Int(allow_none=True, default_value=None).tag(sync=True) + table_html = traitlets.Unicode("").tag(sync=True) sort_column = traitlets.Unicode("").tag(sync=True) sort_ascending = traitlets.Bool(True).tag(sync=True) orderable_columns = traitlets.List(traitlets.Unicode(), []).tag(sync=True) @@ -86,9 +83,10 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): Args: dataframe: The Bigframes Dataframe to display in the widget. """ - if not ANYWIDGET_INSTALLED: + if not _ANYWIDGET_INSTALLED: raise ImportError( - "Please `pip install anywidget traitlets` or `pip install 'bigframes[anywidget]'` to use TableWidget." + "Please `pip install anywidget traitlets` or " + "`pip install 'bigframes[anywidget]'` to use TableWidget." ) self._dataframe = dataframe @@ -99,8 +97,10 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): self._table_id = str(uuid.uuid4()) self._all_data_loaded = False self._batch_iter: Optional[Iterator[pd.DataFrame]] = None - self._cached_batches: List[pd.DataFrame] = [] + self._cached_batches: list[pd.DataFrame] = [] self._last_sort_state: Optional[_SortState] = None + # Lock to ensure only one thread at a time is updating the table HTML. + self._setting_html_lock = threading.Lock() # respect display options for initial page size initial_page_size = bigframes.options.display.max_rows @@ -108,6 +108,7 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): # set traitlets properties that trigger observers # TODO(b/462525985): Investigate and improve TableWidget UX for DataFrames with a large number of columns. self.page_size = initial_page_size + # TODO(b/469861913): Nested columns from structs (e.g., 'struct_col.name') are not currently sortable. # TODO(b/463754889): Support non-string column labels for sorting. if all(isinstance(col, str) for col in dataframe.columns): self.orderable_columns = [ @@ -118,13 +119,24 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): else: self.orderable_columns = [] + self._initial_load() + + # Signals to the frontend that the initial data load is complete. + # Also used as a guard to prevent observers from firing during initialization. + self._initial_load_complete = True + + def _initial_load(self) -> None: + """Get initial data and row count.""" # obtain the row counts # TODO(b/428238610): Start iterating over the result of `to_pandas_batches()` # before we get here so that the count might already be cached. self._reset_batches_for_new_page_size() if self._batches is None: - self._error_message = "Could not retrieve data batches. Data might be unavailable or an error occurred." + self._error_message = ( + "Could not retrieve data batches. Data might be unavailable or " + "an error occurred." + ) self.row_count = None elif self._batches.total_rows is None: # Total rows is unknown, this is an expected state. @@ -138,12 +150,8 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame): # get the initial page self._set_table_html() - # Signals to the frontend that the initial data load is complete. - # Also used as a guard to prevent observers from firing during initialization. - self._initial_load_complete = True - @traitlets.observe("_initial_load_complete") - def _on_initial_load_complete(self, change: Dict[str, Any]): + def _on_initial_load_complete(self, change: dict[str, Any]): if change["new"]: self._set_table_html() @@ -158,7 +166,7 @@ def _css(self): return resources.read_text(bigframes.display, "table_widget.css") @traitlets.validate("page") - def _validate_page(self, proposal: Dict[str, Any]) -> int: + def _validate_page(self, proposal: dict[str, Any]) -> int: """Validate and clamp the page number to a valid range. Args: @@ -191,7 +199,7 @@ def _validate_page(self, proposal: Dict[str, Any]) -> int: return max(0, min(value, max_page)) @traitlets.validate("page_size") - def _validate_page_size(self, proposal: Dict[str, Any]) -> int: + def _validate_page_size(self, proposal: dict[str, Any]) -> int: """Validate page size to ensure it's positive and reasonable. Args: @@ -255,95 +263,112 @@ def _reset_batch_cache(self) -> None: def _reset_batches_for_new_page_size(self) -> None: """Reset the batch iterator when page size changes.""" - self._batches = self._dataframe._to_pandas_batches(page_size=self.page_size) + self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size) self._reset_batch_cache() def _set_table_html(self) -> None: """Sets the current html data based on the current page and page size.""" - if self._error_message: - self.table_html = ( - f"
{self._error_message}
" - ) - return - - # Apply sorting if a column is selected - df_to_display = self._dataframe - if self.sort_column: - # TODO(b/463715504): Support sorting by index columns. - df_to_display = df_to_display.sort_values( - by=self.sort_column, ascending=self.sort_ascending - ) - - # Reset batches when sorting changes - if self._last_sort_state != _SortState(self.sort_column, self.sort_ascending): - self._batches = df_to_display._to_pandas_batches(page_size=self.page_size) - self._reset_batch_cache() - self._last_sort_state = _SortState(self.sort_column, self.sort_ascending) - self.page = 0 # Reset to first page - - start = self.page * self.page_size - end = start + self.page_size - - # fetch more data if the requested page is outside our cache - cached_data = self._cached_data - while len(cached_data) < end and not self._all_data_loaded: - if self._get_next_batch(): + new_page = None + with self._setting_html_lock: + if self._error_message: + self.table_html = ( + f"
" + f"{self._error_message}
" + ) + return + + # Apply sorting if a column is selected + df_to_display = self._dataframe + if self.sort_column: + # TODO(b/463715504): Support sorting by index columns. + df_to_display = df_to_display.sort_values( + by=self.sort_column, ascending=self.sort_ascending + ) + + # Reset batches when sorting changes + if self._last_sort_state != _SortState( + self.sort_column, self.sort_ascending + ): + self._batches = df_to_display.to_pandas_batches( + page_size=self.page_size + ) + self._reset_batch_cache() + self._last_sort_state = _SortState( + self.sort_column, self.sort_ascending + ) + if self.page != 0: + new_page = 0 # Reset to first page + + if new_page is None: + start = self.page * self.page_size + end = start + self.page_size + + # fetch more data if the requested page is outside our cache cached_data = self._cached_data - else: - break - - # Get the data for the current page - page_data = cached_data.iloc[start:end].copy() - - # Handle index display - # TODO(b/438181139): Add tests for custom multiindex - if self._dataframe._block.has_index: - index_name = page_data.index.name - page_data.insert( - 0, index_name if index_name is not None else "", page_data.index - ) - else: - # Default index - include as "Row" column - page_data.insert(0, "Row", range(start + 1, start + len(page_data) + 1)) - # Handle case where user navigated beyond available data with unknown row count - is_unknown_count = self.row_count is None - is_beyond_data = self._all_data_loaded and len(page_data) == 0 and self.page > 0 - if is_unknown_count and is_beyond_data: - # Calculate the last valid page (zero-indexed) - total_rows = len(cached_data) - if total_rows > 0: - last_valid_page = max(0, math.ceil(total_rows / self.page_size) - 1) - # Navigate back to the last valid page - self.page = last_valid_page - # Recursively call to display the correct page - return self._set_table_html() - else: - # If no data at all, stay on page 0 with empty display - self.page = 0 - return self._set_table_html() - - # Generate HTML table - self.table_html = bigframes.display.html.render_html( - dataframe=page_data, - table_id=f"table-{self._table_id}", - orderable_columns=self.orderable_columns, - ) + while len(cached_data) < end and not self._all_data_loaded: + if self._get_next_batch(): + cached_data = self._cached_data + else: + break + + # Get the data for the current page + page_data = cached_data.iloc[start:end].copy() + + # Handle case where user navigated beyond available data with unknown row count + is_unknown_count = self.row_count is None + is_beyond_data = ( + self._all_data_loaded and len(page_data) == 0 and self.page > 0 + ) + if is_unknown_count and is_beyond_data: + # Calculate the last valid page (zero-indexed) + total_rows = len(cached_data) + last_valid_page = max(0, math.ceil(total_rows / self.page_size) - 1) + if self.page != last_valid_page: + new_page = last_valid_page + + if new_page is None: + # Handle index display + if self._dataframe._block.has_index: + is_unnamed_single_index = ( + page_data.index.name is None + and not isinstance(page_data.index, pd.MultiIndex) + ) + page_data = page_data.reset_index() + if is_unnamed_single_index and "index" in page_data.columns: + page_data.rename(columns={"index": ""}, inplace=True) + + # Default index - include as "Row" column if no index was present originally + if not self._dataframe._block.has_index: + page_data.insert( + 0, "Row", range(start + 1, start + len(page_data) + 1) + ) + + # Generate HTML table + self.table_html = bigframes.display.html.render_html( + dataframe=page_data, + table_id=f"table-{self._table_id}", + ) + + if new_page is not None: + # Navigate to the new page. This triggers the observer, which will + # re-enter _set_table_html. Since we've released the lock, this is safe. + self.page = new_page @traitlets.observe("sort_column", "sort_ascending") - def _sort_changed(self, _change: Dict[str, Any]): + def _sort_changed(self, _change: dict[str, Any]): """Handler for when sorting parameters change from the frontend.""" self._set_table_html() @traitlets.observe("page") - def _page_changed(self, _change: Dict[str, Any]) -> None: + def _page_changed(self, _change: dict[str, Any]) -> None: """Handler for when the page number is changed from the frontend.""" if not self._initial_load_complete: return self._set_table_html() @traitlets.observe("page_size") - def _page_size_changed(self, _change: Dict[str, Any]) -> None: + def _page_size_changed(self, _change: dict[str, Any]) -> None: """Handler for when the page size is changed from the frontend.""" if not self._initial_load_complete: return diff --git a/tests/system/small/test_anywidget.py b/tests/system/small/test_anywidget.py index b0eeb4a3c2..dca8b9e1bb 100644 --- a/tests/system/small/test_anywidget.py +++ b/tests/system/small/test_anywidget.py @@ -918,7 +918,7 @@ def test_repr_mimebundle_should_fallback_to_html_if_anywidget_is_unavailable( "display.repr_mode", "anywidget", "display.max_rows", 2 ): # Mock the ANYWIDGET_INSTALLED flag to simulate absence of anywidget - with mock.patch("bigframes.display.anywidget.ANYWIDGET_INSTALLED", False): + with mock.patch("bigframes.display.anywidget._ANYWIDGET_INSTALLED", False): bundle = paginated_bf_df._repr_mimebundle_() assert "application/vnd.jupyter.widget-view+json" not in bundle assert "text/html" in bundle diff --git a/tests/unit/display/test_anywidget.py b/tests/unit/display/test_anywidget.py new file mode 100644 index 0000000000..2ca8c0da2f --- /dev/null +++ b/tests/unit/display/test_anywidget.py @@ -0,0 +1,80 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import signal +import unittest.mock as mock + +import pandas as pd +import pytest + +import bigframes + +# Skip if anywidget/traitlets not installed, though they should be in the dev env +pytest.importorskip("anywidget") +pytest.importorskip("traitlets") + + +def test_navigation_to_invalid_page_resets_to_valid_page_without_deadlock(): + """ + Given a widget on a page beyond available data, when navigating, + then it should reset to the last valid page without deadlock. + """ + from bigframes.display.anywidget import TableWidget + + mock_df = mock.create_autospec(bigframes.dataframe.DataFrame, instance=True) + mock_df.columns = ["col1"] + mock_df.dtypes = {"col1": "object"} + + mock_block = mock.Mock() + mock_block.has_index = False + mock_df._block = mock_block + + # We mock _initial_load to avoid complex setup + with mock.patch.object(TableWidget, "_initial_load"): + with bigframes.option_context( + "display.repr_mode", "anywidget", "display.max_rows", 10 + ): + widget = TableWidget(mock_df) + + # Simulate "loaded data but unknown total rows" state + widget.page_size = 10 + widget.row_count = None + widget._all_data_loaded = True + + # Populate cache with 1 page of data (10 rows). Page 0 is valid, page 1+ are invalid. + widget._cached_batches = [pd.DataFrame({"col1": range(10)})] + + # Mark initial load as complete so observers fire + widget._initial_load_complete = True + + # Setup timeout to fail fast if deadlock occurs + # signal.SIGALRM is not available on Windows + has_sigalrm = hasattr(signal, "SIGALRM") + if has_sigalrm: + + def handler(signum, frame): + raise TimeoutError("Deadlock detected!") + + signal.signal(signal.SIGALRM, handler) + signal.alarm(2) # 2 seconds timeout + + try: + # Trigger navigation to page 5 (invalid), which should reset to page 0 + widget.page = 5 + + assert widget.page == 0 + + finally: + if has_sigalrm: + signal.alarm(0)