From da37742312fadba93265ab8b5273ff7ff3fbe88f Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Fri, 19 Dec 2025 15:20:38 +0000 Subject: [PATCH 1/6] chore: update log adapter to support session-scoped api method logging Updates `log_adapter` to optionally store API method names in a `Session` instance instead of the global list. This improves label accuracy when running tests in parallel. - Updates `Session` to initialize `_api_methods` list and lock. - Updates `log_adapter.add_api_method` and `get_and_reset_api_methods` to handle session-scoped logging. - Updates `log_adapter.method_logger` and `property_logger` to identify the session from arguments. - Propagates `session` through `start_query_with_client` and its callers to ensure labels are correctly associated with the session. --- bigframes/core/log_adapter.py | 55 +++++++++++++++++----- bigframes/session/__init__.py | 5 ++ bigframes/session/_io/bigquery/__init__.py | 13 +++-- bigframes/session/bq_caching_executor.py | 5 ++ bigframes/session/direct_gbq_execution.py | 3 ++ bigframes/session/loader.py | 2 + 6 files changed, 69 insertions(+), 14 deletions(-) diff --git a/bigframes/core/log_adapter.py b/bigframes/core/log_adapter.py index 8179ffbeed..460b4a4b6e 100644 --- a/bigframes/core/log_adapter.py +++ b/bigframes/core/log_adapter.py @@ -174,7 +174,8 @@ def wrapper(*args, **kwargs): full_method_name = f"{base_name.lower()}-{api_method_name}" # Track directly called methods if len(_call_stack) == 0: - add_api_method(full_method_name) + session = _find_session(*args, **kwargs) + add_api_method(full_method_name, session=session) _call_stack.append(full_method_name) @@ -220,7 +221,8 @@ def wrapped(*args, **kwargs): full_property_name = f"{class_name.lower()}-{property_name.lower()}" if len(_call_stack) == 0: - add_api_method(full_property_name) + session = _find_session(*args, **kwargs) + add_api_method(full_property_name, session=session) _call_stack.append(full_property_name) try: @@ -250,25 +252,40 @@ def wrapper(func): return wrapper -def add_api_method(api_method_name): +def add_api_method(api_method_name, session=None): global _lock global _api_methods - with _lock: - # Push the method to the front of the _api_methods list - _api_methods.insert(0, api_method_name.replace("<", "").replace(">", "")) - # Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed) - _api_methods = _api_methods[:MAX_LABELS_COUNT] + + if session is not None: + with session._api_methods_lock: + session._api_methods.insert( + 0, api_method_name.replace("<", "").replace(">", "") + ) + session._api_methods = session._api_methods[:MAX_LABELS_COUNT] + else: + with _lock: + # Push the method to the front of the _api_methods list + _api_methods.insert(0, api_method_name.replace("<", "").replace(">", "")) + # Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed) + _api_methods = _api_methods[:MAX_LABELS_COUNT] -def get_and_reset_api_methods(dry_run: bool = False): +def get_and_reset_api_methods(dry_run: bool = False, session=None): global _lock + methods = [] + if session is not None: + with session._api_methods_lock: + methods.extend(session._api_methods) + if not dry_run: + session._api_methods.clear() + with _lock: - previous_api_methods = list(_api_methods) + methods.extend(_api_methods) # dry_run might not make a job resource, so only reset the log on real queries. if not dry_run: _api_methods.clear() - return previous_api_methods + return methods def _get_bq_client(*args, **kwargs): @@ -283,3 +300,19 @@ def _get_bq_client(*args, **kwargs): return kwargv._block.session.bqclient return None + + +def _find_session(*args, **kwargs): + # This function cannot import Session at the top level because Session + # imports log_adapter. + # We can't import bigframes.session in type checking block either. + from bigframes.session import Session + + if args and isinstance(args[0], Session): + return args[0] + + session = kwargs.get("session") + if session is not None and isinstance(session, Session): + return session + + return None diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 3cb9d2bb68..1bfcbcdeae 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -208,6 +208,9 @@ def __init__( self._session_id: str = "session" + secrets.token_hex(3) # store table ids and delete them when the session is closed + self._api_methods: list[str] = [] + self._api_methods_lock = threading.Lock() + self._objects: list[ weakref.ReferenceType[ Union[ @@ -2160,6 +2163,7 @@ def _start_query_ml_ddl( query_with_job=True, job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY, publisher=self._publisher, + session=self, ) return iterator, query_job @@ -2188,6 +2192,7 @@ def _create_object_table(self, path: str, connection: str) -> str: timeout=None, query_with_job=True, publisher=self._publisher, + session=self, ) return table diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index aa56dc0040..7e323b74ad 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -228,12 +228,14 @@ def format_option(key: str, value: Union[bool, str]) -> str: return f"{key}={repr(value)}" -def add_and_trim_labels(job_config): +def add_and_trim_labels(job_config, session=None): """ Add additional labels to the job configuration and trim the total number of labels to ensure they do not exceed MAX_LABELS_COUNT labels per job. """ - api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run) + api_methods = log_adapter.get_and_reset_api_methods( + dry_run=job_config.dry_run, session=session + ) job_config.labels = create_job_configs_labels( job_configs_labels=job_config.labels, api_methods=api_methods, @@ -270,6 +272,7 @@ def start_query_with_client( metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[True], publisher: bigframes.core.events.Publisher, + session=None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @@ -286,6 +289,7 @@ def start_query_with_client( metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[False], publisher: bigframes.core.events.Publisher, + session=None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... @@ -303,6 +307,7 @@ def start_query_with_client( query_with_job: Literal[True], job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, + session=None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, bigquery.QueryJob]: ... @@ -320,6 +325,7 @@ def start_query_with_client( query_with_job: Literal[False], job_retry: google.api_core.retry.Retry, publisher: bigframes.core.events.Publisher, + session=None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... @@ -340,6 +346,7 @@ def start_query_with_client( # version 3.36.0 or later. job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, publisher: bigframes.core.events.Publisher, + session=None, ) -> Tuple[google.cloud.bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts query job and waits for results. @@ -347,7 +354,7 @@ def start_query_with_client( # Note: Ensure no additional labels are added to job_config after this # point, as `add_and_trim_labels` ensures the label count does not # exceed MAX_LABELS_COUNT. - add_and_trim_labels(job_config) + add_and_trim_labels(job_config, session=session) try: if not query_with_job: diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 736dbf7be1..ca19d1be86 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -323,6 +323,7 @@ def _export_gbq( iterator, job = self._run_execute_query( sql=sql, job_config=job_config, + session=array_value.session, ) has_timedelta_col = any( @@ -389,6 +390,7 @@ def _run_execute_query( sql: str, job_config: Optional[bq_job.QueryJobConfig] = None, query_with_job: bool = True, + session=None, ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. @@ -415,6 +417,7 @@ def _run_execute_query( timeout=None, query_with_job=True, publisher=self._publisher, + session=session, ) else: return bq_io.start_query_with_client( @@ -427,6 +430,7 @@ def _run_execute_query( timeout=None, query_with_job=False, publisher=self._publisher, + session=session, ) except google.api_core.exceptions.BadRequest as e: @@ -661,6 +665,7 @@ def _execute_plan_gbq( sql=compiled.sql, job_config=job_config, query_with_job=(destination_table is not None), + session=plan.session, ) # we could actually cache even when caching is not explicitly requested, but being conservative for now diff --git a/bigframes/session/direct_gbq_execution.py b/bigframes/session/direct_gbq_execution.py index 748c43e66c..3ec10bf20f 100644 --- a/bigframes/session/direct_gbq_execution.py +++ b/bigframes/session/direct_gbq_execution.py @@ -60,6 +60,7 @@ def execute( iterator, query_job = self._run_execute_query( sql=compiled.sql, + session=plan.session, ) # just immediately downlaod everything for simplicity @@ -75,6 +76,7 @@ def _run_execute_query( self, sql: str, job_config: Optional[bq_job.QueryJobConfig] = None, + session=None, ) -> Tuple[bq_table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts BigQuery query job and waits for results. @@ -89,4 +91,5 @@ def _run_execute_query( metrics=None, query_with_job=False, publisher=self._publisher, + session=session, ) diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 5e415999ff..3d6d576fdf 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -1284,6 +1284,7 @@ def _start_query_with_job_optional( metrics=None, query_with_job=False, publisher=self._publisher, + session=self._session, ) return rows @@ -1310,6 +1311,7 @@ def _start_query_with_job( metrics=None, query_with_job=True, publisher=self._publisher, + session=self._session, ) return query_job From 876f211f0269330780f20f534298a528ddbaae38 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Fri, 19 Dec 2025 15:35:14 +0000 Subject: [PATCH 2/6] chore: update log adapter to support session-scoped api method logging Updates `log_adapter` to optionally store API method names in a `Session` instance instead of the global list. This improves label accuracy when running tests in parallel. - Updates `Session` to initialize `_api_methods` list and lock. - Updates `log_adapter.add_api_method` and `get_and_reset_api_methods` to handle session-scoped logging. - Updates `log_adapter.method_logger` and `property_logger` to identify the session from arguments. - Propagates `session` through `start_query_with_client` and its callers to ensure labels are correctly associated with the session. --- bigframes/core/log_adapter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bigframes/core/log_adapter.py b/bigframes/core/log_adapter.py index 460b4a4b6e..33e910f09b 100644 --- a/bigframes/core/log_adapter.py +++ b/bigframes/core/log_adapter.py @@ -256,16 +256,16 @@ def add_api_method(api_method_name, session=None): global _lock global _api_methods + clean_method_name = api_method_name.replace("<", "").replace(">", "") + if session is not None: with session._api_methods_lock: - session._api_methods.insert( - 0, api_method_name.replace("<", "").replace(">", "") - ) + session._api_methods.insert(0, clean_method_name) session._api_methods = session._api_methods[:MAX_LABELS_COUNT] else: with _lock: # Push the method to the front of the _api_methods list - _api_methods.insert(0, api_method_name.replace("<", "").replace(">", "")) + _api_methods.insert(0, clean_method_name) # Keep the list length within the maximum limit (adjust MAX_LABELS_COUNT as needed) _api_methods = _api_methods[:MAX_LABELS_COUNT] From cb8e5aecc1fd88bdf1157192a3dbb77774220b30 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Fri, 19 Dec 2025 16:06:57 +0000 Subject: [PATCH 3/6] chore: update log adapter to support session-scoped api method logging Updates `log_adapter` to optionally store API method names in a `Session` instance instead of the global list. This improves label accuracy when running tests in parallel. - Updates `Session` to initialize `_api_methods` list and lock. - Updates `log_adapter.add_api_method` and `get_and_reset_api_methods` to handle session-scoped logging. - Updates `log_adapter.method_logger` and `property_logger` to identify the session from arguments. - Propagates `session` through `start_query_with_client` and its callers to ensure labels are correctly associated with the session. - Refactored `add_api_method` to be more DRY. - Fixes method signatures in `create_temp_table` and `create_temp_view`. --- bigframes/core/log_adapter.py | 13 +++++++++++-- bigframes/session/__init__.py | 1 + bigframes/session/_io/bigquery/__init__.py | 2 ++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/bigframes/core/log_adapter.py b/bigframes/core/log_adapter.py index 33e910f09b..65a6a9e255 100644 --- a/bigframes/core/log_adapter.py +++ b/bigframes/core/log_adapter.py @@ -309,10 +309,19 @@ def _find_session(*args, **kwargs): from bigframes.session import Session if args and isinstance(args[0], Session): - return args[0] + # In unit tests, we might be working with a mock Session object that + # passes isinstance but doesn't have the instance attributes set in + # __init__. + session = args[0] + if hasattr(session, "_api_methods_lock") and hasattr(session, "_api_methods"): + return session session = kwargs.get("session") if session is not None and isinstance(session, Session): - return session + # In unit tests, we might be working with a mock Session object that + # passes isinstance but doesn't have the instance attributes set in + # __init__. + if hasattr(session, "_api_methods_lock") and hasattr(session, "_api_methods"): + return session return None diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 1bfcbcdeae..4f32514652 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -23,6 +23,7 @@ import logging import os import secrets +import threading import typing from typing import ( Any, diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index 7e323b74ad..9114770224 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -126,6 +126,7 @@ def create_temp_table( schema: Optional[Iterable[bigquery.SchemaField]] = None, cluster_columns: Optional[list[str]] = None, kms_key: Optional[str] = None, + session=None, ) -> str: """Create an empty table with an expiration in the desired session. @@ -153,6 +154,7 @@ def create_temp_view( *, expiration: datetime.datetime, sql: str, + session=None, ) -> str: """Create an empty table with an expiration in the desired session. From 7d68d536fb6212a18560e57217242defc113dace Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Mon, 22 Dec 2025 19:37:35 +0000 Subject: [PATCH 4/6] chore: clarify that hasattr is needed due to __init__ not running yet --- bigframes/core/log_adapter.py | 36 +++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/bigframes/core/log_adapter.py b/bigframes/core/log_adapter.py index 65a6a9e255..e8186d2bda 100644 --- a/bigframes/core/log_adapter.py +++ b/bigframes/core/log_adapter.py @@ -302,26 +302,34 @@ def _get_bq_client(*args, **kwargs): return None +def _is_session_initialized(session): + """Return True if fully initialized. + + Because the method logger could get called before Session.__init__ has a + chance to run, we use the globals in that case. + """ + return hasattr(session, "_api_methods_lock") and hasattr(session, "_api_methods") + + def _find_session(*args, **kwargs): # This function cannot import Session at the top level because Session # imports log_adapter. - # We can't import bigframes.session in type checking block either. from bigframes.session import Session - if args and isinstance(args[0], Session): - # In unit tests, we might be working with a mock Session object that - # passes isinstance but doesn't have the instance attributes set in - # __init__. - session = args[0] - if hasattr(session, "_api_methods_lock") and hasattr(session, "_api_methods"): - return session + session = args[0] if args else None + if ( + session is not None + and isinstance(session, Session) + and _is_session_initialized(session) + ): + return session session = kwargs.get("session") - if session is not None and isinstance(session, Session): - # In unit tests, we might be working with a mock Session object that - # passes isinstance but doesn't have the instance attributes set in - # __init__. - if hasattr(session, "_api_methods_lock") and hasattr(session, "_api_methods"): - return session + if ( + session is not None + and isinstance(session, Session) + and _is_session_initialized(session) + ): + return session return None From 6d49e47f10c76891728430de314b6b31bcc54985 Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 23 Dec 2025 21:30:38 +0000 Subject: [PATCH 5/6] chore: protect against calls where Session isnt fully initialized --- bigframes/core/log_adapter.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bigframes/core/log_adapter.py b/bigframes/core/log_adapter.py index e8186d2bda..0f8b58bf0a 100644 --- a/bigframes/core/log_adapter.py +++ b/bigframes/core/log_adapter.py @@ -258,7 +258,8 @@ def add_api_method(api_method_name, session=None): clean_method_name = api_method_name.replace("<", "").replace(">", "") - if session is not None: + # The log adapter can get called before the Session has fully initialized. + if session is not None and hasattr(session, "_api_methods_lock"): with session._api_methods_lock: session._api_methods.insert(0, clean_method_name) session._api_methods = session._api_methods[:MAX_LABELS_COUNT] @@ -273,7 +274,9 @@ def add_api_method(api_method_name, session=None): def get_and_reset_api_methods(dry_run: bool = False, session=None): global _lock methods = [] - if session is not None: + + # The log adapter can get called before the Session has fully initialized. + if session is not None and hasattr(session, "_api_methods_lock"): with session._api_methods_lock: methods.extend(session._api_methods) if not dry_run: From 5db9a522631891b29a8a31f3aa08e7f11db35b3a Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Tue, 23 Dec 2025 21:37:13 +0000 Subject: [PATCH 6/6] chore: use shared initialized check --- bigframes/core/log_adapter.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/bigframes/core/log_adapter.py b/bigframes/core/log_adapter.py index 0f8b58bf0a..77c09437c0 100644 --- a/bigframes/core/log_adapter.py +++ b/bigframes/core/log_adapter.py @@ -258,8 +258,7 @@ def add_api_method(api_method_name, session=None): clean_method_name = api_method_name.replace("<", "").replace(">", "") - # The log adapter can get called before the Session has fully initialized. - if session is not None and hasattr(session, "_api_methods_lock"): + if session is not None and _is_session_initialized(session): with session._api_methods_lock: session._api_methods.insert(0, clean_method_name) session._api_methods = session._api_methods[:MAX_LABELS_COUNT] @@ -275,8 +274,7 @@ def get_and_reset_api_methods(dry_run: bool = False, session=None): global _lock methods = [] - # The log adapter can get called before the Session has fully initialized. - if session is not None and hasattr(session, "_api_methods_lock"): + if session is not None and _is_session_initialized(session): with session._api_methods_lock: methods.extend(session._api_methods) if not dry_run: