From 7a4989b4a5bd5719317716f28cc0c585a279ff50 Mon Sep 17 00:00:00 2001 From: yokofly Date: Wed, 3 Dec 2025 23:50:00 -0800 Subject: [PATCH] Add emit strategy and return shape details for JS and Python UDFs --- docs/js-udf.md | 5 +++++ docs/py-udf.md | 8 +++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/js-udf.md b/docs/js-udf.md index cff2a852..52861b08 100644 --- a/docs/js-udf.md +++ b/docs/js-udf.md @@ -143,6 +143,11 @@ Let's take an example of a function to get the second maximum values from the gr | 5 | deserialize(str) | No | Opposite to serialize(). Read the string and convert back to JS internal state. | function(str)\{
let s=JSON.parse(str);
this.max=s['max'];
this.sec_max=s['sec_max'];
} | | 6 | merge(str) | No | Merges two states into one. Used for multiple shards processing. | function(str)\{
let s=JSON.parse(str);
if..else..} | +**Emit strategy and return shape** + +- Default (no `has_customized_emit`): `finalize()` must return a single value matching the declared return type. Any value returned from `process()` is ignored. +- Custom emit (`has_customized_emit: true`): `process()` should return an integer (or `true`/`false`) indicating how many results to emit now, and `finalize()` must return an array whose length equals that emit count. + ### Example: get second largest number {#udaf-example} diff --git a/docs/py-udf.md b/docs/py-udf.md index e0843580..4517af7a 100644 --- a/docs/py-udf.md +++ b/docs/py-udf.md @@ -99,6 +99,12 @@ The function list: * `deserialize` load the state from checkpoint to the internal state, optional. * `merge` for multi-shard processing, merge the states from each shard, optional. +**Emit strategy and return shape** + +- By default, Proton calls `finalize()` once and expects a *single* value of the declared return type (not a list). Any value returned from `process()` is ignored in this mode. +- If you need to emit multiple results per group, set `self.has_customized_emit = True` in `__init__`, return either an integer count or `True`/`False` from `process()` to indicate how many results to emit, and return a list from `finalize()` whose length matches that count. +- Only one UDA with `has_customized_emit = True` is supported per streaming query. + ## Examples ### A simple UDF without dependency @@ -166,7 +172,7 @@ class getMax: if item > self.max: self.max = item def finalize(self): - return [self.max] + return self.max $$; ```