Veri Bilimi Okulu

PySpark Dataframe İşlemleri
pyspark_dataframe_kapak

Loading

Bölüm 1

Bu yazımızda Spark’ın Dataframe’inden bahsedeceğim. Spark Dataframe (nedense Spark Tablosu diyesim var 🙂 )  yapısını ilişkisel veri tabanlarındaki tablolara benzetebiliriz; satırlar, sütunlar ve şema. Spark Dataframe; Python, R, Pandas vb. dillerdeki dataframe benzese de en büyük farkı dağıtık işlemeye uygun olmasıdır. Spark Tablosunu; ilişkisel veri tabanı tabloları, Hive tabloları, Spark RDD ler gibi birçok farklı kaynaktan oluşturabiliriz.

Spark Tablosunun bazı avantajları şunlardır: Python, R, Scala, Java gibi API desteği bulunmaktadır. Bir sütun altında tek bir veri tipi bulunduğundan Spark bu tablodan şemayı daha rahat anlar ve işlemlerini daha iyi optimize eder. Çok büyük hacimlerdeki (petabayt) yapısal ve yarı-yapısal verileri işlemeye müsaittir.

RDD’den Spark Tablosu oluşturma:

from pyspark.sql import Row 
liste = [('Cemal',35,'İşçi','Ankara'),('Ceyda',42,'Memur','Kayseri'),('Timur',30,'İşsiz','İstanbul'),'Burcu',29,'Pazarlamacı','Ankara')] 
rdd = sc.parallelize(liste) 
insanlar = rdd.map(lambda x: Row(isim=x[0], yas=int(x[1]), meslek=x[2], ikamet=x[3])) 
insanlarDF = sqlContext.createDataFrame(insanlar)

Hive Tablosundan:

from pyspark.sql import Row
veriSetimSparkDF = sqlContext.sql("select * from veritabanim.tablom")

Yukarıdaki sorgu Hive’de veritabanim isimli veritabanında tablom ismindeki tüm kayıtları alarak veriSetimSparkDF isimli dataframe değişkenine atar. type(veriSetimSparkDF) komutuyla pyspark.sql.dataframe.DataFrame sonucunu alırız.

Dataframe ile biraz oynayalım:

Şema yazdır:

insanlarDF.printSchema()
root
 |-- ikamet: string (nullable = true)
 |-- isim: string (nullable = true)
 |-- meslek: string (nullable = true)
 |-- yas: long (nullable = true)

İlk satırı yazdıralım:

insanlarDF.first()
Row(ikamet=u'Ankara', isim=u'Cemal', meslek=u'isci', yas=35)

İlk iki satırı gösterelim:

insanlarDF.show(2)
+-------+-----+------+---+
| ikamet| isim|meslek|yas|
+-------+-----+------+---+
| Ankara|Cemal|  isci| 35|
|Kayseri|Ceyda| Memur| 42|
+-------+-----+------+---+
only showing top 2 rows

Satırları sayalım:

insanlarDF.count()
4

İstatistiksel özet alalım:

insanlarDF.describe().show()
+-------+----------------+
|summary|             yas|
+-------+----------------+
|  count|               4|
|   mean|            34.0|
| stddev|5.94418483337567|
|    min|              29|
|    max|              42|
+-------+----------------+

Yukarıda görüldüğü üzere sadece yaş değişkeni ile ilgili bilgi alabildik. Bunun sebebi sadece yaş değişkeninin nümerik olmasıdır. Çünkü aritmetik ortalama, standart sapma vb.  işlemler sadece rakamlarla yapılır. Şimdi aynı işlemi kategorik özelliklerden birine yapalım bakalım sonuç ne olacak:

insanlarDF.describe('meslek').show()
+-------+-------+
|summary| meslek|
+-------+-------+
|  count|      4|
|   mean|   null|
| stddev|   null|
|    min|  Memur|
|    max|  İşsiz|
+-------+-------+

Görüldüğü gibi ortalama ve standart sapma null oldu. min ve max ise muhtemelen rastgele kategorik bir değer aldı. Yani sonuçlar tamamen anlamsız.

Şimdi sütunları nasıl seçeriz ona bir bakalım:

insanlarDF.select('isim','yas','meslek').show()
+-----+---+-----------+
| isim|yas|     meslek|
+-----+---+-----------+
|Cemal| 35|       isci|
|Ceyda| 42|      Memur|
|Timur| 30|      İşsiz|
|Burcu| 29|Pazarlamaci|
+-----+---+-----------+

Şimdi kaç tane farklı şehir var onu saydıralım. Burada SQL bilgimiz bize hemen distinct ve count sözcüklerini hatırlatacaktır.

insanlarDF.select('ikamet').distinct().count()
3

Şimdi kesiyorum. Müteakip yazılarımda dataframe ile biraz daha oynayacağız. Veriyle kalın…

Bölüm 2

PySpark ile dataframe işlemleri ikinci bölüm ile devam ediyoruz.

from pyspark.sql import Row
liste = [
 (1,'Cemal',35,'Isci','Ankara'),
 (2,'Ceyda',42,'Memur','Kayseri'),
 (3,'Timur',30,'Issiz','Istanbul'),
 (4,'Burcu',29,'Pazarlamaci','Ankara'),
 (5,'Yasemin',23,'Pazarlamaci','Bursa'),
 (6,'Ali',33,None,None),
 (7,'Burcu',29,'Pazarlamaci','Ankara'),
 (8,None,31,None,None),
 (9,'Ahmet',33,'Doktor','Ankara')
]
rdd = sc.parallelize(liste)
insanlar = rdd.map(lambda x: Row(record_id=int(x[0]),isim=x[1], yas=int(x[2]), meslek=x[3], ikamet=x[4]))
insanlarDF = sqlContext.createDataFrame(insanlar)
insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir")

PySpark Dataframe Sütun İsimlerini Değiştirmek Yöntem-1

Sütun isimlerini değiştirmek için selectExpr() metodunu kullanabiliriz. Örneğimiz aşağıdadır.

insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir")
insanlarDF.columns
['RecordId', 'Adi', 'Yas', 'Meslek', 'Sehir']
insanlarDF.show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       1|  Cemal| 35|       Isci|  Ankara|
|       2|  Ceyda| 42|      Memur| Kayseri|
|       3|  Timur| 30|      Issiz|Istanbul|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
|       6|    Ali| 33|       null|    null|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       8|   null| 31|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
+--------+-------+---+-----------+--------+

Gördüğümüz gibi yeni sütun isimlerini listeledik.

PySpark Dataframe Sütun İsimlerini Değiştirmek Yöntem-2

new_column_names = ['RecordId', 'Adi', 'Yas', 'Meslek', 'Sehir']

insanlarDF = insanlarDF .toDF(*new_column_names )

PySpark Dataframe Tekil Listeleme (Distinct)

Tekil meslekleri listeleyelim.

insanlarDF.select('Meslek').distinct().show()
+-----------+
|     Meslek|
+-----------+
|      Issiz|
|      Memur|
|     Doktor|
|       Isci|
|       null|
|Pazarlamaci|
+-----------+

PySpark Dataframe Tekil Sayma (Distinct Count)

Tekil sayma için yukarıdaki distinct() metoduna sadece count() eklememiz yeterli olacaktır.

insanlarDF.select('Meslek').distinct().count()
6

PySpark Dataframe Kategorik Nitelikleri Çapraz Tablo Halinde Gösterme

insanlarDF.crosstab('Meslek', 'Sehir').show()
+------------+------+----+-----+--------+-------+
|Meslek_Sehir|Ankara|null|Bursa|Istanbul|Kayseri|
+------------+------+----+-----+--------+-------+
|        null|     0|   2|    0|       0|      0|
|       Memur|     0|   0|    0|       0|      1|
|        Isci|     1|   0|    0|       0|      0|
| Pazarlamaci|     2|   0|    1|       0|      0|
|       Issiz|     0|   0|    0|       1|      0|
|      Doktor|     1|   0|    0|       0|      0|
+------------+------+----+-----+--------+-------+

PySpark Dataframe Mükerrer Satırları Düşürme (dropDuplicates)

Şimdi tekrar eden satırları seçmemeyi nasıl yapacağız onu göreceğiz. Bunun için dropDuplicates() metodu işimizi görecek.

insanlarDF.select('Meslek', 'Sehir').dropDuplicates().show()
+-----------+--------+
|     Meslek|   Sehir|
+-----------+--------+
|Pazarlamaci|  Ankara|
|     Doktor|  Ankara|
|       null|    null|
|       Isci|  Ankara|
|Pazarlamaci|   Bursa|
|      Issiz|Istanbul|
|      Memur| Kayseri|
+-----------+--------+

Veri setimizde iki adat Pazarlamacı Ankara vardı. Yukarıda gördüğümüz üzere mükerrer satır yapmadan sadece bir Pazarlamacı Ankara aldı. Eğer dropDuplicates() metodunu kullanmadan seçim yapsaydık sonuç şöyle olacaktı:

insanlarDF.select('Meslek', 'Sehir').show()
+-----------+--------+
|     Meslek|   Sehir|
+-----------+--------+
|       Isci|  Ankara|
|      Memur| Kayseri|
|      Issiz|Istanbul|
|Pazarlamaci|  Ankara|
|Pazarlamaci|   Bursa|
|       null|    null|
|Pazarlamaci|  Ankara|
|       null|    null|
|     Doktor|  Ankara|
+-----------+--------+

Gördüğümüz gibi Pazarlamacı Ankara olarak iki adet mükerrer kayıt bulunuyor.

PySpark Dataframe Null Değerleri Düşürme (dropna)

Veri setimizde bazı değerler null. Eğer bunları istemiyorsak null bulunan kayıtları düşürebiliriz. Burada iki seçenek mevcut. Ya bütün satır null ise satırı düşürmek veya satırda herhangi bir sütunda null var ise satırı düşürmek. Kullanacağımız dropna() metodu varsayılan olarak herhangi bir null değer var ise tüm satırı düşürmek üzere ayarlanmış.

insanlarDF.dropna().show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       1|  Cemal| 35|       Isci|  Ankara|
|       2|  Ceyda| 42|      Memur| Kayseri|
|       3|  Timur| 30|      Issiz|Istanbul|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       9|  Ahmet| 33|     Doktor|  Ankara|
+--------+-------+---+-----------+--------+

Yukarıdaki tabloda gördüğümüz üzere bir tane bile null değere rastlamıyoruz. Null değer bulunan 6 ve 8 inci satırları düşürmüş durumda.

PySpark Dataframe Null Değerleri Doldurma (fillna)

Az önce null değerleri düşürmeyi görmüştük. Şimdi ise null değerlerin yerine belirleyeceğimiz herhangi bir değeri atamayı göreceğiz. Orijinal tablomuz:

insanlarDF.show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       1|  Cemal| 35|       Isci|  Ankara|
|       2|  Ceyda| 42|      Memur| Kayseri|
|       3|  Timur| 30|      Issiz|Istanbul|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
|       6|    Ali| 33|       null|    null|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       8|   null| 31|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
+--------+-------+---+-----------+--------+

Boşluklar Bosluk kelimesiyle doldurulmuş tablo:

insanlarDF.fillna('Bosluk').show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       1|  Cemal| 35|       Isci|  Ankara|
|       2|  Ceyda| 42|      Memur| Kayseri|
|       3|  Timur| 30|      Issiz|Istanbul|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
|       6|    Ali| 33|     Bosluk|  Bosluk|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       8| Bosluk| 31|     Bosluk|  Bosluk|
|       9|  Ahmet| 33|     Doktor|  Ankara|
+--------+-------+---+-----------+--------+

PySpark Dataframe Filtreleme (filter)

Şimdi yaşı 30’dan büyük olanları filtreleyelim. Bunun için filter() metodunu kullanıyoruz.

insanlarDF.filter(insanlarDF.Yas > 30).show()
+--------+-----+---+------+-------+
|RecordId|  Adi|Yas|Meslek|  Sehir|
+--------+-----+---+------+-------+
|       1|Cemal| 35|  Isci| Ankara|
|       2|Ceyda| 42| Memur|Kayseri|
|       6|  Ali| 33|  null|   null|
|       8| null| 31|  null|   null|
|       9|Ahmet| 33|Doktor| Ankara|
+--------+-----+---+------+-------+

PySpark Dataframe Gruplama (groupby & agg)

Şimdi Mesleklere göre yaş ortalamalarını gruplayalım. Bunun için groupby() ve agg() metodlarını kullanacağız.

insanlarDF.groupby('Meslek').agg({'Yas': 'mean'}).show()
+-----------+--------+
|     Meslek|avg(Yas)|
+-----------+--------+
|      Issiz|    30.0|
|      Memur|    42.0|
|     Doktor|    33.0|
|       Isci|    35.0|
|       null|    32.0|
|Pazarlamaci|    27.0|
+-----------+--------+

Burada aggregation yapacağımız niteliğin numerik olmasına dikkat etmemiz gerekiyor. Şayet şöyle birşey yapmak isteseydik ki saçma bir şey olurdu, Mesleklere göre Şehir ortalamalarını almak gibi:

insanlarDF.groupby('Meslek').agg({'Sehir': 'mean'}).show()
+-----------+----------+
|     Meslek|avg(Sehir)|
+-----------+----------+
|      Issiz|      null|
|      Memur|      null|
|     Doktor|      null|
|       Isci|      null|
|       null|      null|
|Pazarlamaci|      null|
+-----------+----------+

Allah’tan hata vermedi. Ancak kibarca istediğin mantıksız kardeş dedi 🙂

PySpark Dataframe Birden Çok Nitelikle Gruplama (groupby & agg)

Bir önceki örneğimizde mesleklere göre yaş ortalamalarını bulmuştuk. Şehir ortalamasında ise null değeri almıştık. Bunun sebebi de Sehir niteliğinin numerik olmayışı (dört işleme uygun değil) idi. Ancak kategorik nitelikler üzerinde de bazı aggregation fonksiyonlarını çalıştırabiliriz. Burada count örneğini vereceğim.

insanlarDF.groupby('Meslek').agg({'Yas':'mean','Sehir': 'count'}).show()
+-----------+------------+--------+
|     Meslek|count(Sehir)|avg(Yas)|
+-----------+------------+--------+
|      Issiz|           1|    30.0|
|      Memur|           1|    42.0|
|     Doktor|           1|    33.0|
|       Isci|           1|    35.0|
|       null|           0|    32.0|
|Pazarlamaci|           3|    27.0|
+-----------+------------+--------+

Meslek gruplarında kaç tane şehir var ayrı bir sütunda gördük.

Şimdilik burada kesiyorum. PySpark ile Spark Dataframe İşlemleri yazı dizimize devam edeceğiz. Veriyle kalınız…

Bölüm 3

Pyspark ile Spark Dataframe işlemleri yazı dizimize devam ediyoruz.

from pyspark.sql import Row
liste = [
 (1,'Cemal',35,'Isci','Ankara'),
 (2,'Ceyda',42,'Memur','Kayseri'),
 (3,'Timur',30,'Issiz','Istanbul'),
 (4,'Burcu',29,'Pazarlamaci','Ankara'),
 (5,'Yasemin',23,'Pazarlamaci','Bursa'),
 (6,'Ali',33,None,None),
 (7,'Burcu',29,'Pazarlamaci','Ankara'),
 (8,None,31,None,None),
 (9,'Ahmet',33,'Doktor','Ankara')
]
rdd = sc.parallelize(liste)
insanlar = rdd.map(lambda x: Row(record_id=int(x[0]),isim=x[1], yas=int(x[2]), meslek=x[3], ikamet=x[4]))
insanlarDF = sqlContext.createDataFrame(insanlar)

insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir")

PySpark Dataframe Örneklem Seçmek (Sampling)

Dataframe’den örneklem alabiliriz. Kullanılacak metod sample(). Bu metod üç parametre alır: 1. withReplacement (True ya da False), 2. fraction örneğin .3 demek %30’unu al demek, 3. seed değeri rastgele seçim değeri.

insanlarDF.sample(False, 0.2, 3).show()
+--------+-----+---+------+--------+
|RecordId|  Adi|Yas|Meslek|   Sehir|
+--------+-----+---+------+--------+
|       1|Cemal| 35|  Isci|  Ankara|
|       3|Timur| 30| Issiz|Istanbul|
|       9|Ahmet| 33|Doktor|  Ankara|
+--------+-----+---+------+--------+

PySpark Dataframe Map İşlemi

Yukarıda ve daha önce gördüğümüz birçok işlem sonuç olarak yine dataframe döndürürken map() metodu sonuç olarak rdd döndürmektedir.

yasRDD = insanlarDF.map(lambda x: Row(Yas= (x[2] + 10)))
yasRDD.take(9)
[Row(Yas=45),
 Row(Yas=52),
 Row(Yas=40),
 Row(Yas=39),
 Row(Yas=33),
 Row(Yas=43),
 Row(Yas=39),
 Row(Yas=41),
 Row(Yas=43)]

Yukarıda Row(Yas = x[2]…) komutunda 2 sütun sıralamasında Yas sütununun index değerini verir. RecordId 0, Adi 1 ve Yas 2. Özetle yukarıda yaptığımız iş şu: Yaş sütunundaki her bir değere 10 ekledik ve  yasRDD adında yeni bir RDD’ye atadık ve bu rdd’yi yazdırdık.

PySpark Dataframe Sıralama (Sorting, OrderBy)

Şimdi dataframe’i büyükten küçüğe doğru yaş sıralamasına koyalım:

insanlarDF.orderBy(insanlarDF.Yas.desc()).show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       2|  Ceyda| 42|      Memur| Kayseri|
|       1|  Cemal| 35|       Isci|  Ankara|
|       6|    Ali| 33|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
|       8|   null| 31|       null|    null|
|       3|  Timur| 30|      Issiz|Istanbul|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
+--------+-------+---+-----------+--------+

İsterseniz bir de isimlere göre sıralayalım:

insanlarDF.orderBy(insanlarDF.Adi).show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       8|   null| 31|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
|       6|    Ali| 33|       null|    null|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       1|  Cemal| 35|       Isci|  Ankara|
|       2|  Ceyda| 42|      Memur| Kayseri|
|       3|  Timur| 30|      Issiz|Istanbul|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
+--------+-------+---+-----------+--------+

Yukarıda parantez içindeki insanlarDF.Adi sonuna küçükten büyüğe sıralama yapan asc() fonksiyonu eklesek de olurdu. Eklemedik, demek ki varsayılan sıralama küçükten büyüğe.

PySpark Dataframe Yeni Sütun Eklemek

Bu işlemi yapmak için withColumn() metodunu kullanıyoruz. Yeni bir sütun ekleyebileceğimiz gibi mevcut olanın yerine de koyabiliriz. Şimdi yaşımızı 2 ile çarpalım ve Yas_carpi_iki isminde yeni sütun olarak ekleyelim:

insanlarDF.withColumn('Yas_carpi_iki', insanlarDF.Yas * 2).show()
+--------+-------+---+-----------+--------+-------------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|Yas_carpi_iki|
+--------+-------+---+-----------+--------+-------------+
|       1|  Cemal| 35|       Isci|  Ankara|           70|
|       2|  Ceyda| 42|      Memur| Kayseri|           84|
|       3|  Timur| 30|      Issiz|Istanbul|           60|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|           58|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|           46|
|       6|    Ali| 33|       null|    null|           66|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|           58|
|       8|   null| 31|       null|    null|           62|
|       9|  Ahmet| 33|     Doktor|  Ankara|           66|
+--------+-------+---+-----------+--------+-------------+
0

4 Responses

    1. Merhaba sc, SparkContext’i temsil ediyor. Uygulama başlangıcında sc’nin oluşturulmuş olduğunu varsayıyoruz. Çok yaygın bir kullanım olduğu için detaylıca açıklamadık.

Bir yanıt yazın

Password Requirements:

  • At least 8 characters
  • At least 1 lowercase letter
  • At least 1 uppercase letter
  • At least 1 numerical number
  • At least 1 special character