Spark SQL Window Functions
Window functions SQL dünyasında yaygın olarak bilinen fonksiyonlar. SQL’de kullandığımız bir çok fonksiyonu Spark ile de kullanabiliyoruz. Bu yazımızda Spark Scala ve SQL söz dizimleriyle window functions örnekleri yapacağız.
Window functions nedir, ne işe yarar?
Window fonksiyonlar, window olarak adlandırılan bir satır grubu üzerinde çalışır ve satır grubuna dayalı olarak her satır için bir dönüş değeri hesaplar. Window fonksiyonlar, hareketli bir ortalamayı hesaplamak, kümülatif bir istatistik hesaplamak veya geçerli satırın göreli konumu verilen satırların değerine erişmek gibi görevleri işlemek için kullanışlıdır.
Aggregation sorgularında ( Örneğin MAX) bir çok satırdan hesaplama yapıp tek bir satır elde ederiz. Eğer GROUP BY ile kullanmışsak gruplanan kategori sayısı kadar satır elde ederiz. Window fonksiyonlar da benzer şekilde aggregation yapar ancak satır sayısını korur, sonucu her bir satır için yeni bir sütunda tutar. Window fonksiyonlar ile bir sürü iç içe sorgu yazarak elde edeceğimiz sonuca daha sade ve pratik bir şekilde ulaşabiliriz.
Window functions ne tarz sorulara cevap verir?
Her bir departman için kıdemi en büyük 2 kişiyi bulmak. Her bir market şubesinde en çok satılan 5 ürün kategorisini bulmak vb.
Spark dokümanları Window fonksiyonları üç kategoriye ayırmış:
1. Ranking Functions
Syntax: RANK | DENSE_RANK | PERCENT_RANK | NTILE | ROW_NUMBER
2. Analytic Functions
Syntax: CUME_DIST | LAG | LEAD | NTH_VALUE | FIRST_VALUE | LAST_VALUE
3. Aggregate Functions
Syntax: MAX | MIN | COUNT | SUM | AVG | ...
Temel söz dizimi
MAX(salary) over(partition by dept) as dept_max_salary
İlk kısım MAX(salary), standart aggregation fonksiyonlara çok benziyor. OVER eklemek, onu bir pencere işlevi olarak belirler. OVER içine bir partition belirtilmez ise tüm set bir pencere olarak kabul edilir. PARTITION pencere belirlemek içindir. Yukarıdaki örnekte departman kullanılmıştır. Yani veri içinde kaç tane benzersiz departman var ise o kadar pencere oluşacaktır.
Örnekler
Örnekleri yaptığımız ortam bilgileri şu şekilde:
- İşletim sistemi: CentOS7
- IDE: IntelliJ 2022
- Spark: 3.2.1
- Scala: 2.12.12
- Java: Java 11
Spark Session ve Dataframe
val spark = SparkSession.builder() .master("local[4]") .appName("Spark Window Functions") .config("spark.driver.memory","2g") .getOrCreate() import spark.implicits._ val df = Seq( (1001, "Satılmış", "İdari", 4000), (1002, "Özge", "Personel", 3000), (1003, "Hüsnü", "Bilgi Sistemleri", 4000), (1004, "Menşure", "Muhasebe", 6500), (1005, "Doruk", "Personel", 3000), (1006, "Şilan", "Muhasebe", 5000), (1007, "Baran", "Personel", 7000), (1008, "Ülkü", "İdari", 4000), (1009, "Cüneyt", "Bilgi Sistemleri", 6500), (1010, "Gülşen", "Bilgi Sistemleri", 7000), (1011, "Melih", "Bilgi Sistemleri", 8000), (1012, "Gülbahar", "Bilgi Sistemleri", 10000), (1013, "Tuna", "İdari", 2000), (1014, "Raşel", "Personel", 3000), (1015, "Şahabettin", "Bilgi Sistemleri", 4500), (1016, "Elmas", "Muhasebe", 6500), (1017, "Ahmet Hamdi", "Personel", 3500), (1018, "Leyla", "Muhasebe", 5500), (1019, "Cuma", "Personel", 8000), (1020, "Yelda", "İdari", 5000), (1021, "Rojda", "Bilgi Sistemleri", 6000), (1022, "İbrahim", "Bilgi Sistemleri", 8000), (1023, "Davut", "Bilgi Sistemleri", 8000), (1024, "Arzu", "Bilgi Sistemleri", 11000) ).toDF("id", "name", "dept", "salary") df.createOrReplaceTempView("employee")
1. Ranking Functions RANK | DENSE_RANK | PERCENT_RANK | NTILE | ROW_NUMBER
Spark Scala
df.withColumn("rn", F.row_number() .over(Window.partitionBy("dept").orderBy("salary"))) .withColumn("rank", F.rank() .over(Window.partitionBy("dept").orderBy("salary"))) .withColumn("dense_rank", F.dense_rank() .over(Window.partitionBy("dept").orderBy("salary"))) .withColumn("percent_rank", F.percent_rank() .over(Window.partitionBy("dept").orderBy("salary"))) .withColumn("ntile", F.ntile(3) .over(Window.partitionBy("dept").orderBy("salary"))) .show()
Spark SQL
spark.sql( """ |SELECT a.*, |ROW_NUMBER() OVER(PARTITION BY dept ORDER BY salary) as rn, |RANK() OVER(PARTITION BY dept ORDER BY salary) as rank, |DENSE_RANK() OVER(PARTITION BY dept ORDER BY salary) as dense_rank, |PERCENT_RANK() OVER(PARTITION BY dept ORDER BY salary) as percent_rank, |NTILE(3) OVER(PARTITION BY dept ORDER BY salary) as ntile |FROM employee a |""".stripMargin).show(false)
- Çıktı
+----+-----------+----------------+------+---+----+----------+------------------+-----+ | id| name| dept|salary| rn|rank|dense_rank| percent_rank|ntile| +----+-----------+----------------+------+---+----+----------+------------------+-----+ |1003| Hüsnü|Bilgi Sistemleri| 4000| 1| 1| 1| 0.0| 1| |1015| Şahabettin|Bilgi Sistemleri| 4500| 2| 2| 2|0.1111111111111111| 1| |1021| Rojda|Bilgi Sistemleri| 6000| 3| 3| 3|0.2222222222222222| 1| |1009| Cüneyt|Bilgi Sistemleri| 6500| 4| 4| 4|0.3333333333333333| 1| |1010| Gülşen|Bilgi Sistemleri| 7000| 5| 5| 5|0.4444444444444444| 2| |1011| Melih|Bilgi Sistemleri| 8000| 6| 6| 6|0.5555555555555556| 2| |1022| İbrahim|Bilgi Sistemleri| 8000| 7| 6| 6|0.5555555555555556| 2| |1023| Davut|Bilgi Sistemleri| 8000| 8| 6| 6|0.5555555555555556| 3| |1012| Gülbahar|Bilgi Sistemleri| 10000| 9| 9| 7|0.8888888888888888| 3| |1024| Arzu|Bilgi Sistemleri| 11000| 10| 10| 8| 1.0| 3| |1006| Şilan| Muhasebe| 5000| 1| 1| 1| 0.0| 1| |1018| Leyla| Muhasebe| 5500| 2| 2| 2|0.3333333333333333| 1| |1004| Menşure| Muhasebe| 6500| 3| 3| 3|0.6666666666666666| 2| |1016| Elmas| Muhasebe| 6500| 4| 3| 3|0.6666666666666666| 3| |1002| Özge| Personel| 3000| 1| 1| 1| 0.0| 1| |1005| Doruk| Personel| 3000| 2| 1| 1| 0.0| 1| |1014| Raşel| Personel| 3000| 3| 1| 1| 0.0| 2| |1017|Ahmet Hamdi| Personel| 3500| 4| 4| 2| 0.6| 2| |1007| Baran| Personel| 7000| 5| 5| 3| 0.8| 3| |1019| Cuma| Personel| 8000| 6| 6| 4| 1.0| 3| +----+-----------+----------------+------+---+----+----------+------------------+-----+ only showing top 20 rows
Row number her bir window içinde tüm satırları ardışık olarak sıralıyor. Rank ise aynı maaşa sahip çalışanlara aynı sıra numarasını veriyor ancak müteakip çalışan (bir düşük maaş) için row number’da kaldığı yerden devam ediyor. Dense rank, rank’ten farklı olarak aynı maaşta olanları takip edene row number değil ardışık rakamı veriyor. percent_rank dense_rank’in bir benzeri. Sadece yüzdelik dilimleri gösteriyor. Ntile ise içine tam sayı argüman alan bir fonksiyon. Kullanıcı tarafından belirlenen bu n tam sayıya göre pencereyi n adede bölüyor ve her bir gruba ardışık artacak şekilde rakam atıyor.
2. Analytic Functions CUME_DIST | LAG | LEAD | NTH_VALUE | FIRST_VALUE | LAST_VALUE
2.1. CUME_DIST
Spark Scala
df.withColumn("cume_dist", F.cume_dist() .over(Window.partitionBy("dept").orderBy(F.col("salary")))) .show(false)
- Çıktı
+----+-----------+----------------+------+------------------+ |id |name |dept |salary|cume_dist | +----+-----------+----------------+------+------------------+ |1003|Hüsnü |Bilgi Sistemleri|4000 |0.1 | |1015|Şahabettin |Bilgi Sistemleri|4500 |0.2 | |1021|Rojda |Bilgi Sistemleri|6000 |0.3 | |1009|Cüneyt |Bilgi Sistemleri|6500 |0.4 | |1010|Gülşen |Bilgi Sistemleri|7000 |0.5 | |1011|Melih |Bilgi Sistemleri|8000 |0.8 | |1022|İbrahim |Bilgi Sistemleri|8000 |0.8 | |1023|Davut |Bilgi Sistemleri|8000 |0.8 | |1012|Gülbahar |Bilgi Sistemleri|10000 |0.9 | |1024|Arzu |Bilgi Sistemleri|11000 |1.0 | |1006|Şilan |Muhasebe |5000 |0.25 | |1018|Leyla |Muhasebe |5500 |0.5 | |1004|Menşure |Muhasebe |6500 |1.0 | |1016|Elmas |Muhasebe |6500 |1.0 | |1002|Özge |Personel |3000 |0.5 | |1005|Doruk |Personel |3000 |0.5 | |1014|Raşel |Personel |3000 |0.5 | |1017|Ahmet Hamdi|Personel |3500 |0.6666666666666666| |1007|Baran |Personel |7000 |0.8333333333333334| |1019|Cuma |Personel |8000 |1.0 | +----+-----------+----------------+------+------------------+
2.2. LAG ve LEAD
Özellikle verileri anlamlı bir sırayla aldıysanız, satırları önceki veya sonraki satırlarla karşılaştırmak yararlı olabilir. Diğer satırlardan değer çeken sütunlar oluşturmak için LAG veya LEAD’i kullanabilirsiniz; tek yapmanız gereken hangi sütundan çekeceğinizi ve çekmeyi kaç satır uzaktakinden yapmak istediğinizi belirtmektir. LAG üstteki, LEAD alttaki satırlardan çeker.
Şimdi id sırasına göre (muhtemelen işe giriş sırası) bir önceki ve bir sonraki çalışandan ne kadar farklı maaşı var onu görelim.
Spark Scala
df.withColumn("lag", F.lag("salary", 1, 0) .over(Window.partitionBy("dept").orderBy(F.col("id")))) .withColumn("lead", F.lead("salary", 1, 0) .over(Window.partitionBy("dept").orderBy(F.col("id")))) .withColumn("lag_diff", $"salary" - $"lag") .withColumn("lead_diff", $"salary" - $"lead") .show(false)
Spark SQL
spark.sql( """ |SELECT b.*, |(b.salary - b.lag) as lag_diff, |(b.salary - b.lead) as lead_diff | FROM ( |SELECT a.*, |LAG(salary, 1, 0) OVER(PARTITION BY dept ORDER BY id) as lag, |LEAD(salary, 1, 0) OVER(PARTITION BY dept ORDER BY id) as lead |FROM employee a) b |""".stripMargin).show(false)
- Çıktı
+----+-----------+----------------+------+-----+-----+--------+---------+ |id |name |dept |salary|lag |lead |lag_diff|lead_diff| +----+-----------+----------------+------+-----+-----+--------+---------+ |1003|Hüsnü |Bilgi Sistemleri|4000 |0 |6500 |4000 |-2500 | |1009|Cüneyt |Bilgi Sistemleri|6500 |4000 |7000 |2500 |-500 | |1010|Gülşen |Bilgi Sistemleri|7000 |6500 |8000 |500 |-1000 | |1011|Melih |Bilgi Sistemleri|8000 |7000 |10000|1000 |-2000 | |1012|Gülbahar |Bilgi Sistemleri|10000 |8000 |4500 |2000 |5500 | |1015|Şahabettin |Bilgi Sistemleri|4500 |10000|6000 |-5500 |-1500 | |1021|Rojda |Bilgi Sistemleri|6000 |4500 |8000 |1500 |-2000 | |1022|İbrahim |Bilgi Sistemleri|8000 |6000 |8000 |2000 |0 | |1023|Davut |Bilgi Sistemleri|8000 |8000 |11000|0 |-3000 | |1024|Arzu |Bilgi Sistemleri|11000 |8000 |0 |3000 |11000 | |1004|Menşure |Muhasebe |6500 |0 |5000 |6500 |1500 | |1006|Şilan |Muhasebe |5000 |6500 |6500 |-1500 |-1500 | |1016|Elmas |Muhasebe |6500 |5000 |5500 |1500 |1000 | |1018|Leyla |Muhasebe |5500 |6500 |0 |-1000 |5500 | |1002|Özge |Personel |3000 |0 |3000 |3000 |0 | |1005|Doruk |Personel |3000 |3000 |7000 |0 |-4000 | |1007|Baran |Personel |7000 |3000 |3000 |4000 |4000 | |1014|Raşel |Personel |3000 |7000 |3500 |-4000 |-500 | |1017|Ahmet Hamdi|Personel |3500 |3000 |8000 |500 |-4500 | |1019|Cuma |Personel |8000 |3500 |0 |4500 |8000 | +----+-----------+----------------+------+-----+-----+--------+---------+
F.lag("salary", 1, 0) ve LAG(salary, 1, 0)
ifadelerinde 1 kaç satır üst veya alta bakacağı, 0 ise varsayılan değeri gösterir. Eğer varsayılan değer kullanmazsak üst veya alt satır olmadığında null değer gelir.
3. Aggregate Functions MAX | MIN | COUNT | SUM | AVG | ...
Spark Scala
df.withColumn("dept_max_salary", F.max("salary") .over(Window.partitionBy("dept"))) .withColumn("dept_min_salary", F.min("salary") .over(Window.partitionBy("dept"))) .withColumn("dept_avg_salary", F.avg("salary") .over(Window.partitionBy("dept"))) .show()
Spark SQL
spark.sql( """ |SELECT a.*, |MAX(salary) over(partition by dept) as dept_max_salary, |MIN(salary) over(partition by dept) as dept_min_salary, |AVG(salary) over(partition by dept) as dept_avg_salary |FROM employee a |""".stripMargin).show(false)
- Çıktı
+----+-----------+----------------+------+---------------+---------------+-----------------+ | id| name| dept|salary|dept_max_salary|dept_min_salary| dept_avg_salary| +----+-----------+----------------+------+---------------+---------------+-----------------+ |1003| Hüsnü|Bilgi Sistemleri| 4000| 11000| 4000| 7300.0| |1009| Cüneyt|Bilgi Sistemleri| 6500| 11000| 4000| 7300.0| |1010| Gülşen|Bilgi Sistemleri| 7000| 11000| 4000| 7300.0| |1011| Melih|Bilgi Sistemleri| 8000| 11000| 4000| 7300.0| |1012| Gülbahar|Bilgi Sistemleri| 10000| 11000| 4000| 7300.0| |1015| Şahabettin|Bilgi Sistemleri| 4500| 11000| 4000| 7300.0| |1021| Rojda|Bilgi Sistemleri| 6000| 11000| 4000| 7300.0| |1022| İbrahim|Bilgi Sistemleri| 8000| 11000| 4000| 7300.0| |1023| Davut|Bilgi Sistemleri| 8000| 11000| 4000| 7300.0| |1024| Arzu|Bilgi Sistemleri| 11000| 11000| 4000| 7300.0| |1004| Menşure| Muhasebe| 6500| 6500| 5000| 5875.0| |1006| Şilan| Muhasebe| 5000| 6500| 5000| 5875.0| |1016| Elmas| Muhasebe| 6500| 6500| 5000| 5875.0| |1018| Leyla| Muhasebe| 5500| 6500| 5000| 5875.0| |1002| Özge| Personel| 3000| 8000| 3000|4583.333333333333| |1005| Doruk| Personel| 3000| 8000| 3000|4583.333333333333| |1007| Baran| Personel| 7000| 8000| 3000|4583.333333333333| |1014| Raşel| Personel| 3000| 8000| 3000|4583.333333333333| |1017|Ahmet Hamdi| Personel| 3500| 8000| 3000|4583.333333333333| |1019| Cuma| Personel| 8000| 8000| 3000|4583.333333333333| +----+-----------+----------------+------+---------------+---------------+-----------------+ only showing top 20 rows
3.1. Her bir departmandan en yüksek maaşa sahip 2 çalışan
Spark Scala
df.withColumn("rank", F.rank() .over(Window.partitionBy("dept").orderBy(F.col("salary").desc))) .where($"rank".lt(3)) .show(false)
Spark SQL
spark.sql( """ |SELECT * FROM( |SELECT a.*, |RANK() OVER(PARTITION BY dept ORDER BY salary DESC) as rank |FROM employee a) b |WHERE b.rank < 3 |""".stripMargin).show(false)
- Çıktı
+----+--------+----------------+------+----+ |id |name |dept |salary|rank| +----+--------+----------------+------+----+ |1024|Arzu |Bilgi Sistemleri|11000 |1 | |1012|Gülbahar|Bilgi Sistemleri|10000 |2 | |1004|Menşure |Muhasebe |6500 |1 | |1016|Elmas |Muhasebe |6500 |1 | |1019|Cuma |Personel |8000 |1 | |1007|Baran |Personel |7000 |2 | |1020|Yelda |İdari |5000 |1 | |1001|Satılmış|İdari |4000 |2 | |1008|Ülkü |İdari |4000 |2 | +----+--------+----------------+------+----+
İdari’den 3 kişi geldi. Çünkü 2’nci ve 3’üncünün maaşı aynı. Başka bir kritere bakmaksızın hangisinin gerçekten 2. olduğuna karar veremeyiz.
Başka bir yazıda daha görüşmek dileğiyle çav!
Kaynaklar
- Kapak görseli: Photo by Waldemar Brandt on Unsplash
- https://www.youtube.com/watch?v=Ww71knvhQ-s
- https://sparkbyexamples.com/spark/spark-sql-window-functions/
- https://spark.apache.org/docs/3.2.1/sql-ref-syntax-qry-select-window.html