Welcome to Software Development on Codidact!
Will you help us build our independent community of developers helping developers? We're small and trying to grow. We welcome questions about all aspects of software development, from design to code to QA and more. Got questions? Got answers? Got code you'd like someone to review? Please join us.
Post History
Use a collect_list collecting the values from all preceding rows up to the current row into a struct Then filter on that struct based on its value and the current row's threshold Use aggregate ...
Answer
#1: Initial revision
- Use a [`collect_list`](https://www.sparkreference.com/reference/collect_list/) collecting the values from all preceding rows up to the current row into a struct - Then [`filter`](https://spark.apache.org/docs/latest/api/sql/index.html#filter) on that struct based on its value and the current row's threshold - Use [`aggregate`](https://spark.apache.org/docs/latest/api/sql/index.html#aggregate) to calculate the result based on adding the struct's `acc` field Note that doing so may reorder the output so I added an order column. ```python import pyspark.sql.functions as F df = spark.createDataFrame([(1, 3, 1, 1), (2, 1, 2, 2), (3, 2, 3, 2)], ["order", "acc", "value", "threshold"]) display( df .withColumn("output", F.expr(""" aggregate( filter( collect_list(struct(acc, value)) OVER (ORDER BY order ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), s -> s.value >= threshold ), 0L, (output, s) -> output + s.acc ) """)) .orderBy("order") ) ``` | order | acc | value | threshold | output | |-------|-----|-------|-----------|--------| | 1 | 3 | 1 | 1 | 3 | | 2 | 1 | 2 | 2 | 1 | | 3 | 2 | 3 | 2 | 3 |