How to solve complex conditions with spark sql window function

hivefans
3 min readAug 13, 2020

In our work, we often encounter the problem of doing various aggregation calculations based on time or other dimensions.

Problem 1 : user login to a website ,statistical activity duration

Suppose there is a DataFrame of user logins to a website, for instance:

+----------------+----------+
| user_name|login_date|
+----------------+----------+
|smith |2020-01-04|
|bob |2020-01-04|
|bob |2020-01-06|
|john |2020-01-10|
|smith |2020-01-11|
+----------------+----------+

I would like to add to this a column indicating when they became an active user on the site. But there is one caveat: there is a time period during which a user is considered active, and after this period, if they log in again, their became_active date resets. Suppose this period is 5 days. Then the desired table derived from the above table would be something like this

+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|smith |2020-01-04| 2020-01-04|
|bob |2020-01-04| 2020-01-04|
|bob |2020-01-06| 2020-01-04|
|john |2020-01-10| 2020-01-10|
|smith |2020-01-11| 2020-01-11|
+----------------+----------+-------------+

The implementation code is as follows

package com.empgoimport org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
import org.apache.spark.sql.SparkSession
object Demo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[4]").getOrCreateimport spark.implicits._val df = Seq(
("smith", "2020-01-04",100), ("bob", "2020-01-04",200),
("bob", "2020-01-06",1300), ("john", "2020-01-10",100),
("smith", "2020-01-11",200), ("smith", "2020-01-14",1400),
("smith", "2020-08-11",100)
).toDF("user_name", "login_date","distance")
val userWindow = Window.partitionBy("user_name").orderBy("login_date")val userSessionWindow = Window.partitionBy("user_name", "session")val newSession = (coalesce(datediff($"login_date", lag($"login_date", 1).over(userWindow)),lit(0)) > 5).cast("bigint")val sessionized = df.withColumn("session",sum(newSession).over(userWindow))val result = sessionized.withColumn("became_active", min($"login_date").over(userSessionWindow)).drop("session")result.show()}}

Problem 2: Traffic itinerary segmentation problem

Suppose there is a DataFrame of Car track data, for instance:

|terminalid|time         |distance     |
+----------+-------- --+-------------+
|ecar001 |1597023302000| 100 |
|ecar002 |1597022891000| 200 |
|ecar002 |1597022892000| 900 |
|ecar003 |1597022893000| 100 |
|ecar001 |1597023303000| 200 |
|ecar001 |1597023304000| 1400 |
|ecar001 |1597022882000| 100 |

I would like to add to this a column indicating when they became a Road section on the road. But there is one caveat: there is a time period or distance during which a car is considered same a Road section road_id, and after this period or distance, if the car drives again, their road_id date resets. Suppose this period is 5 minutes and distance is 1000 km. Then the desired table derived from the above table would be something like this

+----------+-------------+--------+-------------+
|terminalid|time |distance|road_id |
+----------+-------------+--------+-------------+
|ecar001 |1597022882000|100 |1597022882000|
|ecar001 |1597023302000|100 |1597023302000|
|ecar001 |1597023303000|200 |1597023302000|
|ecar001 |1597023304000|1400 |1597023304000|
|ecar003 |1597022893000|100 |1597022893000|
|ecar002 |1597022891000|200 |1597022891000|
|ecar002 |1597022892000|900 |1597022891000|
+----------+-------------+--------+-------------+

The implementation code is as follows

package com.empgoimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._import org.apache.spark.sql.functions.{coalesce, col, datediff, isnull, lag, lit, min, sum, when}import org.apache.spark.sql.{Row, SparkSession}import scala.collection.mutable.WrappedArrayobject Demo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[4]").getOrCreateimport spark.implicits._val df = Seq(("ecar001", "1597023302000",100), ("ecar002", "1597022891000",200),("ecar002", "1597022892000",900), ("ecar003", "1597022893000",100),("ecar001", "1597023303000",200), ("ecar001", "1597023304000",1400),("ecar001", "1597022882000",100)).toDF("terminalid", "time","distance")val userWindow = Window.partitionBy("terminalid").orderBy("time")val df_time = df.withColumn("before_time",lag("time",1).over(userWindow))val df_difftime = df_time.withColumn("diff_minutes", when(isnull(col("time") - col("before_time")), 0).otherwise((col("time") - col("before_time"))/60000))val userSessionWindow = Window.partitionBy("terminalid", "session")val newSession =  coalesce($"distance" >1000 || $"diff_minutes" > 5).cast("bigint")val sessionized = df_difftime.withColumn("session",sum(newSession).over(userWindow))val result = sessionized.withColumn("road_id", min($"time").over(userSessionWindow)).drop("session","before_time","diff_minutes")
result.show()
}
}

--

--