How to solve complex conditions with spark sql window function

Problem 1 : user login to a website ,statistical activity duration

+----------------+----------+
| user_name|login_date|
+----------------+----------+
|smith |2020-01-04|
|bob |2020-01-04|
|bob |2020-01-06|
|john |2020-01-10|
|smith |2020-01-11|
+----------------+----------+
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|smith |2020-01-04| 2020-01-04|
|bob |2020-01-04| 2020-01-04|
|bob |2020-01-06| 2020-01-04|
|john |2020-01-10| 2020-01-10|
|smith |2020-01-11| 2020-01-11|
+----------------+----------+-------------+
package com.empgoimport org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
import org.apache.spark.sql.SparkSession
object Demo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[4]").getOrCreateimport spark.implicits._val df = Seq(
("smith", "2020-01-04",100), ("bob", "2020-01-04",200),
("bob", "2020-01-06",1300), ("john", "2020-01-10",100),
("smith", "2020-01-11",200), ("smith", "2020-01-14",1400),
("smith", "2020-08-11",100)
).toDF("user_name", "login_date","distance")
val userWindow = Window.partitionBy("user_name").orderBy("login_date")val userSessionWindow = Window.partitionBy("user_name", "session")val newSession = (coalesce(datediff($"login_date", lag($"login_date", 1).over(userWindow)),lit(0)) > 5).cast("bigint")val sessionized = df.withColumn("session",sum(newSession).over(userWindow))val result = sessionized.withColumn("became_active", min($"login_date").over(userSessionWindow)).drop("session")result.show()}}

Problem 2: Traffic itinerary segmentation problem

|terminalid|time         |distance     |
+----------+-------- --+-------------+
|ecar001 |1597023302000| 100 |
|ecar002 |1597022891000| 200 |
|ecar002 |1597022892000| 900 |
|ecar003 |1597022893000| 100 |
|ecar001 |1597023303000| 200 |
|ecar001 |1597023304000| 1400 |
|ecar001 |1597022882000| 100 |
+----------+-------------+--------+-------------+
|terminalid|time |distance|road_id |
+----------+-------------+--------+-------------+
|ecar001 |1597022882000|100 |1597022882000|
|ecar001 |1597023302000|100 |1597023302000|
|ecar001 |1597023303000|200 |1597023302000|
|ecar001 |1597023304000|1400 |1597023304000|
|ecar003 |1597022893000|100 |1597022893000|
|ecar002 |1597022891000|200 |1597022891000|
|ecar002 |1597022892000|900 |1597022891000|
+----------+-------------+--------+-------------+
package com.empgoimport org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions._import org.apache.spark.sql.functions.{coalesce, col, datediff, isnull, lag, lit, min, sum, when}import org.apache.spark.sql.{Row, SparkSession}import scala.collection.mutable.WrappedArrayobject Demo {def main(args: Array[String]): Unit = {val spark = SparkSession.builder.master("local[4]").getOrCreateimport spark.implicits._val df = Seq(("ecar001", "1597023302000",100), ("ecar002", "1597022891000",200),("ecar002", "1597022892000",900), ("ecar003", "1597022893000",100),("ecar001", "1597023303000",200), ("ecar001", "1597023304000",1400),("ecar001", "1597022882000",100)).toDF("terminalid", "time","distance")val userWindow = Window.partitionBy("terminalid").orderBy("time")val df_time = df.withColumn("before_time",lag("time",1).over(userWindow))val df_difftime = df_time.withColumn("diff_minutes", when(isnull(col("time") - col("before_time")), 0).otherwise((col("time") - col("before_time"))/60000))val userSessionWindow = Window.partitionBy("terminalid", "session")val newSession =  coalesce($"distance" >1000 || $"diff_minutes" > 5).cast("bigint")val sessionized = df_difftime.withColumn("session",sum(newSession).over(userWindow))val result = sessionized.withColumn("road_id", min($"time").over(userSessionWindow)).drop("session","before_time","diff_minutes")
result.show()
}
}

--

--

--

spark kafak flink develop

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Deploying Spring boot application to Google Kubernetes Engine from Gitlab CI

Building a Serverless Web Application — Part 1

Project: OpenAPI definition generator

Why I write DRY code

Automatic Vulnerability ApacheDruid Remote Code Execute Detection and Exploitation

Building Your Own Chatbot Using Dialogflow

Basic Introduction To DASK

IBM Quantum Challenge 2020: my approach to get the magic 46!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
hivefans

hivefans

spark kafak flink develop

More from Medium

Matrix Multiplication in Big data frameworks

Spark Tutorials |Spark Directed Acyclic Graph

Spark Directed Acyclic Graph Spark Learning

Spark Architecture (The functioning of spark)

Spark Shines brighter with Project Tungsten