How to solve complex conditions with spark sql window function

Problem 1 : user login to a website ,statistical activity duration

+----------------+----------+
| user_name|login_date|
+----------------+----------+
|smith |2020-01-04|
|bob |2020-01-04|
|bob |2020-01-06|
|john |2020-01-10|
|smith |2020-01-11|
+----------------+----------+
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|smith |2020-01-04| 2020-01-04|
|bob |2020-01-04| 2020-01-04|
|bob |2020-01-06| 2020-01-04|
|john |2020-01-10| 2020-01-10|
|smith |2020-01-11| 2020-01-11|
+----------------+----------+-------------+
package com.empgoimport org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
import org.apache.spark.sql.SparkSession
object Demo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[4]").getOrCreateimport spark.implicits._val df = Seq(
("smith", "2020-01-04",100), ("bob", "2020-01-04",200),
("bob", "2020-01-06",1300), ("john", "2020-01-10",100),
("smith", "2020-01-11",200), ("smith", "2020-01-14",1400),
("smith", "2020-08-11",100)
).toDF("user_name", "login_date","distance")
val userWindow = Window.partitionBy("user_name").orderBy("login_date")val userSessionWindow = Window.partitionBy("user_name", "session")val newSession = (coalesce(datediff($"login_date", lag($"login_date", 1).over(userWindow)),lit(0)) > 5).cast("bigint")val sessionized = df.withColumn("session",sum(newSession).over(userWindow))val result = sessionized.withColumn("became_active", min($"login_date").over(userSessionWindow)).drop("session")result.show()}}

Problem 2: Traffic itinerary segmentation problem

|terminalid|time         |distance     |
+----------+-------- --+-------------+
|ecar001 |1597023302000| 100 |
|ecar002 |1597022891000| 200 |
|ecar002 |1597022892000| 900 |
|ecar003 |1597022893000| 100 |
|ecar001 |1597023303000| 200 |
|ecar001 |1597023304000| 1400 |
|ecar001 |1597022882000| 100 |
+----------+-------------+--------+-------------+
|terminalid|time |distance|road_id |
+----------+-------------+--------+-------------+
|ecar001 |1597022882000|100 |1597022882000|
|ecar001 |1597023302000|100 |1597023302000|
|ecar001 |1597023303000|200 |1597023302000|
|ecar001 |1597023304000|1400 |1597023304000|
|ecar003 |1597022893000|100 |1597022893000|
|ecar002 |1597022891000|200 |1597022891000|
|ecar002 |1597022892000|900 |1597022891000|
+----------+-------------+--------+-------------+
package com.empgoimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._import org.apache.spark.sql.functions.{coalesce, col, datediff, isnull, lag, lit, min, sum, when}import org.apache.spark.sql.{Row, SparkSession}import scala.collection.mutable.WrappedArrayobject Demo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[4]").getOrCreateimport spark.implicits._val df = Seq(("ecar001", "1597023302000",100), ("ecar002", "1597022891000",200),("ecar002", "1597022892000",900), ("ecar003", "1597022893000",100),("ecar001", "1597023303000",200), ("ecar001", "1597023304000",1400),("ecar001", "1597022882000",100)).toDF("terminalid", "time","distance")val userWindow = Window.partitionBy("terminalid").orderBy("time")val df_time = df.withColumn("before_time",lag("time",1).over(userWindow))val df_difftime = df_time.withColumn("diff_minutes", when(isnull(col("time") - col("before_time")), 0).otherwise((col("time") - col("before_time"))/60000))val userSessionWindow = Window.partitionBy("terminalid", "session")val newSession =  coalesce($"distance" >1000 || $"diff_minutes" > 5).cast("bigint")val sessionized = df_difftime.withColumn("session",sum(newSession).over(userWindow))val result = sessionized.withColumn("road_id", min($"time").over(userSessionWindow)).drop("session","before_time","diff_minutes")
result.show()
}
}

--

--

--

spark kafak flink develop

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Android App Modularisation and Navigation

Project does not exists or it does not contain an Active Cloud Store

The Cannoneer Formula

Creating Gstreamer Multimedia Pipeline With C++ Part 2

A Newspaper for COVID-19 — The CoronaTimes

EPISODE #78: As Automation Advances, How Will IT Adapt?

Python

System Design: Netflix Design in Detail

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
hivefans

hivefans

spark kafak flink develop

More from Medium

An action-packed topic in pyspark is udf

In Search of Relationships between Temperature, Homeless Encampments, and Criminal Summons in NYC…

SPARK : An Efficient way of handling Big Data

Broadcast and Accumulator variable in PySpark