Context:
- The following function is used a JVM wrapper for a Python function, which is executed in a SQL-like environment. If you're interested in example usage you can check Cumulate arrays from earlier rows (PySpark dataframe) on Stack Overflow but it shouldn't be necessary.
- Objects passed between are converted using Pyrolite and / or Py4J and matching exact types expected by the JVM counterpart is crucial.
- There are two contexts of execution with internal
flatten_distinct_
being executed in a remote interpreter which is not directly accessible. - Docstrings have been omitted intentionally.
from typing import List, Union, Hashable as Hble
from toolz import unique, concat, compose
import pyspark.sql.functions as f
from pyspark.sql import Column
from pyspark.sql.types import ArrayType, StringType, DataType as DT
def flatten_distinct(col: Union[Column, str], dt: DT=StringType()) -> Column:
assert isinstance(col, (Column, str)), (
"col should be Column or str got {}".format(type(col)))
def flatten_distinct_(xss: Union[List[List[Hble]], None]) -> List[Hble]:
return compose(list, unique, concat)(xss or [])
return f.udf(flatten_distinct_, ArrayType(dt))(col)
My main concern is if this code is self-explanatory and readable for a Python user with focus on:
- Usage of the type annotations.
- Function composition (would
pipe(xxs or [], concat, unique, list)
be a better choice?). - Immediate call of the anonymous functions.