From e4baeb8eb7ad2e275cce318796efc6786155e28b Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 18:43:59 +0000 Subject: [PATCH 1/6] add windowing json support --- sdks/python/apache_beam/yaml/yaml_transform.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index bd1fc8da9018..42f4ee30a30c 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1033,6 +1033,20 @@ def preprocess_windowing(spec): if 'windowing' in spec: spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing') + + if spec.get('config', {}).get('windowing'): + windowing_config = spec['config']['windowing'] + if isinstance(windowing_config, str): + try: + # PyYAML can load a JSON string. + parsed_config = yaml.safe_load(windowing_config) + if not isinstance(parsed_config, dict): + raise TypeError('Windowing config string must be a YAML/JSON map.') + spec['config']['windowing'] = parsed_config + except Exception as e: + raise ValueError( + f'Error parsing windowing config string at \ + {identify_object(spec)}: {e}') from e return spec elif 'windowing' not in spec: # Nothing to do. From 54a54d340413a229261c8ace530d145dad47d7cf Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Tue, 23 Dec 2025 19:29:21 +0000 Subject: [PATCH 2/6] add a few tests --- .../apache_beam/yaml/yaml_transform_test.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index d5950fb9efaf..06064138afdb 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -993,6 +993,43 @@ def test_explicit_window_into(self): providers=TEST_PROVIDERS) assert_that(result, equal_to([6, 9])) + def test_explicit_window_into_with_json_string_config(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + result = p | YamlTransform( + ''' + type: chain + transforms: + - type: CreateTimestamped + config: + elements: [0, 1, 2, 3, 4, 5] + - type: WindowInto + config: + windowing: | + {"type": "fixed", "size": "4s"} + - type: SumGlobally + ''', + providers=TEST_PROVIDERS) + assert_that(result, equal_to([6, 9])) + + def test_explicit_window_into_with_string_config_fails(self): + with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + _ = p | YamlTransform( + ''' + type: chain + transforms: + - type: CreateTimestamped + config: + elements: [0, 1, 2, 3, 4, 5] + - type: WindowInto + config: + windowing: | + 'not a valid yaml' + ''', + providers=TEST_PROVIDERS) + def test_windowing_on_input(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: From 36e531f5837830652cbe239efae49cc9f97f00a9 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 29 Dec 2025 16:57:55 +0000 Subject: [PATCH 3/6] remove code change and just have tests --- sdks/python/apache_beam/yaml/yaml_transform.py | 13 ------------- sdks/python/apache_beam/yaml/yaml_transform_test.py | 3 +-- 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 42f4ee30a30c..2754fc38e507 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1034,19 +1034,6 @@ def preprocess_windowing(spec): spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing') - if spec.get('config', {}).get('windowing'): - windowing_config = spec['config']['windowing'] - if isinstance(windowing_config, str): - try: - # PyYAML can load a JSON string. - parsed_config = yaml.safe_load(windowing_config) - if not isinstance(parsed_config, dict): - raise TypeError('Windowing config string must be a YAML/JSON map.') - spec['config']['windowing'] = parsed_config - except Exception as e: - raise ValueError( - f'Error parsing windowing config string at \ - {identify_object(spec)}: {e}') from e return spec elif 'windowing' not in spec: # Nothing to do. diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 06064138afdb..0fad67afb934 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -1005,8 +1005,7 @@ def test_explicit_window_into_with_json_string_config(self): elements: [0, 1, 2, 3, 4, 5] - type: WindowInto config: - windowing: | - {"type": "fixed", "size": "4s"} + windowing: {"type": "fixed", "size": "4s"} - type: SumGlobally ''', providers=TEST_PROVIDERS) From b4f17635fef8e20e01662dff9b9a58b3c1047c8c Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 29 Dec 2025 16:58:41 +0000 Subject: [PATCH 4/6] remove extra line --- sdks/python/apache_beam/yaml/yaml_transform.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 2754fc38e507..bd1fc8da9018 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1033,7 +1033,6 @@ def preprocess_windowing(spec): if 'windowing' in spec: spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing') - return spec elif 'windowing' not in spec: # Nothing to do. From 402f353a1a0914f089c959e175ec79f96e8dc552 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 29 Dec 2025 17:05:20 +0000 Subject: [PATCH 5/6] add multiline windowing config support back --- .../python/apache_beam/yaml/yaml_transform.py | 16 ++++++++++++++ .../apache_beam/yaml/yaml_transform_test.py | 21 ++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index bd1fc8da9018..6e3793897b92 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1030,9 +1030,25 @@ def push_windowing_to_roots(spec): def preprocess_windowing(spec): if spec['type'] == 'WindowInto': # This is the transform where it is actually applied. + if 'windowing' in spec: spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing') + + if spec.get('config', {}).get('windowing'): + windowing_config = spec['config']['windowing'] + if isinstance(windowing_config, str): + try: + # PyYAML can load a JSON string - one-line and multi-line. + # Without this code, multi-line is not supported. + parsed_config = yaml.safe_load(windowing_config) + if not isinstance(parsed_config, dict): + raise TypeError('Windowing config string must be a YAML/JSON map.') + spec['config']['windowing'] = parsed_config + except Exception as e: + raise ValueError( + f'Error parsing windowing config string at \ + {identify_object(spec)}: {e}') from e return spec elif 'windowing' not in spec: # Nothing to do. diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 0fad67afb934..89e4dc8b951c 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -993,7 +993,7 @@ def test_explicit_window_into(self): providers=TEST_PROVIDERS) assert_that(result, equal_to([6, 9])) - def test_explicit_window_into_with_json_string_config(self): + def test_explicit_window_into_with_json_string_config_one_line(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: result = p | YamlTransform( @@ -1011,6 +1011,25 @@ def test_explicit_window_into_with_json_string_config(self): providers=TEST_PROVIDERS) assert_that(result, equal_to([6, 9])) + def test_explicit_window_into_with_json_string_config_multi_line(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + result = p | YamlTransform( + ''' + type: chain + transforms: + - type: CreateTimestamped + config: + elements: [0, 1, 2, 3, 4, 5] + - type: WindowInto + config: + windowing: | + {"type": "fixed", "size": "4s"} + - type: SumGlobally + ''', + providers=TEST_PROVIDERS) + assert_that(result, equal_to([6, 9])) + def test_explicit_window_into_with_string_config_fails(self): with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( From a9eeefc8db8dbff20cab275f9aadc23f43444ea4 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Mon, 29 Dec 2025 17:06:20 +0000 Subject: [PATCH 6/6] remove extra line --- sdks/python/apache_beam/yaml/yaml_transform.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index 6e3793897b92..ef065d8a3c42 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -1030,7 +1030,6 @@ def push_windowing_to_roots(spec): def preprocess_windowing(spec): if spec['type'] == 'WindowInto': # This is the transform where it is actually applied. - if 'windowing' in spec: spec['config'] = spec.get('config', {}) spec['config']['windowing'] = spec.pop('windowing')