Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/js-udf.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ Let's take an example of a function to get the second maximum values from the gr
| 5 | deserialize(str) | No | Opposite to serialize(). Read the string and convert back to JS internal state. | function(str)\{<br />let s=JSON.parse(str);<br />this.max=s['max'];<br />this.sec_max=s['sec_max'];<br />} |
| 6 | merge(str) | No | Merges two states into one. Used for multiple shards processing. | function(str)\{<br />let s=JSON.parse(str);<br />if..else..} |

**Emit strategy and return shape**

- Default (no `has_customized_emit`): `finalize()` must return a single value matching the declared return type. Any value returned from `process()` is ignored.
- Custom emit (`has_customized_emit: true`): `process()` should return an integer (or `true`/`false`) indicating how many results to emit now, and `finalize()` must return an array whose length equals that emit count.



### Example: get second largest number {#udaf-example}
Expand Down
8 changes: 7 additions & 1 deletion docs/py-udf.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ The function list:
* `deserialize` load the state from checkpoint to the internal state, optional.
* `merge` for multi-shard processing, merge the states from each shard, optional.

**Emit strategy and return shape**

- By default, Proton calls `finalize()` once and expects a *single* value of the declared return type (not a list). Any value returned from `process()` is ignored in this mode.
- If you need to emit multiple results per group, set `self.has_customized_emit = True` in `__init__`, return either an integer count or `True`/`False` from `process()` to indicate how many results to emit, and return a list from `finalize()` whose length matches that count.
- Only one UDA with `has_customized_emit = True` is supported per streaming query.

## Examples

### A simple UDF without dependency
Expand Down Expand Up @@ -166,7 +172,7 @@ class getMax:
if item > self.max:
self.max = item
def finalize(self):
return [self.max]
return self.max
$$;
```

Expand Down