PySpark Dataframe İşlemleri
Bölüm 2
PySpark ile dataframe işlemleri ikinci bölüm ile devam ediyoruz.
from pyspark.sql import Row liste = [ (1,'Cemal',35,'Isci','Ankara'), (2,'Ceyda',42,'Memur','Kayseri'), (3,'Timur',30,'Issiz','Istanbul'), (4,'Burcu',29,'Pazarlamaci','Ankara'), (5,'Yasemin',23,'Pazarlamaci','Bursa'), (6,'Ali',33,None,None), (7,'Burcu',29,'Pazarlamaci','Ankara'), (8,None,31,None,None), (9,'Ahmet',33,'Doktor','Ankara') ] rdd = sc.parallelize(liste) insanlar = rdd.map(lambda x: Row(record_id=int(x[0]),isim=x[1], yas=int(x[2]), meslek=x[3], ikamet=x[4])) insanlarDF = sqlContext.createDataFrame(insanlar) insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir")
PySpark Dataframe Sütun İsimlerini Değiştirmek Yöntem-1
Sütun isimlerini değiştirmek için selectExpr() metodunu kullanabiliriz. Örneğimiz aşağıdadır.
insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir") insanlarDF.columns ['RecordId', 'Adi', 'Yas', 'Meslek', 'Sehir']
insanlarDF.show() +--------+-------+---+-----------+--------+ |RecordId| Adi|Yas| Meslek| Sehir| +--------+-------+---+-----------+--------+ | 1| Cemal| 35| Isci| Ankara| | 2| Ceyda| 42| Memur| Kayseri| | 3| Timur| 30| Issiz|Istanbul| | 4| Burcu| 29|Pazarlamaci| Ankara| | 5|Yasemin| 23|Pazarlamaci| Bursa| | 6| Ali| 33| null| null| | 7| Burcu| 29|Pazarlamaci| Ankara| | 8| null| 31| null| null| | 9| Ahmet| 33| Doktor| Ankara| +--------+-------+---+-----------+--------+
Gördüğümüz gibi yeni sütun isimlerini listeledik.
PySpark Dataframe Sütun İsimlerini Değiştirmek Yöntem-2
new_column_names = ['RecordId', 'Adi', 'Yas', 'Meslek', 'Sehir'] insanlarDF = insanlarDF .toDF(*new_column_names )
PySpark Dataframe Tekil Listeleme (Distinct)
Tekil meslekleri listeleyelim.
insanlarDF.select('Meslek').distinct().show() +-----------+ | Meslek| +-----------+ | Issiz| | Memur| | Doktor| | Isci| | null| |Pazarlamaci| +-----------+
PySpark Dataframe Tekil Sayma (Distinct Count)
Tekil sayma için yukarıdaki distinct() metoduna sadece count() eklememiz yeterli olacaktır.
insanlarDF.select('Meslek').distinct().count() 6
PySpark Dataframe Kategorik Nitelikleri Çapraz Tablo Halinde Gösterme
insanlarDF.crosstab('Meslek', 'Sehir').show() +------------+------+----+-----+--------+-------+ |Meslek_Sehir|Ankara|null|Bursa|Istanbul|Kayseri| +------------+------+----+-----+--------+-------+ | null| 0| 2| 0| 0| 0| | Memur| 0| 0| 0| 0| 1| | Isci| 1| 0| 0| 0| 0| | Pazarlamaci| 2| 0| 1| 0| 0| | Issiz| 0| 0| 0| 1| 0| | Doktor| 1| 0| 0| 0| 0| +------------+------+----+-----+--------+-------+
PySpark Dataframe Mükerrer Satırları Düşürme (dropDuplicates)
Şimdi tekrar eden satırları seçmemeyi nasıl yapacağız onu göreceğiz. Bunun için dropDuplicates() metodu işimizi görecek.
insanlarDF.select('Meslek', 'Sehir').dropDuplicates().show() +-----------+--------+ | Meslek| Sehir| +-----------+--------+ |Pazarlamaci| Ankara| | Doktor| Ankara| | null| null| | Isci| Ankara| |Pazarlamaci| Bursa| | Issiz|Istanbul| | Memur| Kayseri| +-----------+--------+
Veri setimizde iki adat Pazarlamacı Ankara vardı. Yukarıda gördüğümüz üzere mükerrer satır yapmadan sadece bir Pazarlamacı Ankara aldı. Eğer dropDuplicates() metodunu kullanmadan seçim yapsaydık sonuç şöyle olacaktı:
insanlarDF.select('Meslek', 'Sehir').show() +-----------+--------+ | Meslek| Sehir| +-----------+--------+ | Isci| Ankara| | Memur| Kayseri| | Issiz|Istanbul| |Pazarlamaci| Ankara| |Pazarlamaci| Bursa| | null| null| |Pazarlamaci| Ankara| | null| null| | Doktor| Ankara| +-----------+--------+
Gördüğümüz gibi Pazarlamacı Ankara olarak iki adet mükerrer kayıt bulunuyor.
PySpark Dataframe Null Değerleri Düşürme (dropna)
Veri setimizde bazı değerler null. Eğer bunları istemiyorsak null bulunan kayıtları düşürebiliriz. Burada iki seçenek mevcut. Ya bütün satır null ise satırı düşürmek veya satırda herhangi bir sütunda null var ise satırı düşürmek. Kullanacağımız dropna() metodu varsayılan olarak herhangi bir null değer var ise tüm satırı düşürmek üzere ayarlanmış.
insanlarDF.dropna().show() +--------+-------+---+-----------+--------+ |RecordId| Adi|Yas| Meslek| Sehir| +--------+-------+---+-----------+--------+ | 1| Cemal| 35| Isci| Ankara| | 2| Ceyda| 42| Memur| Kayseri| | 3| Timur| 30| Issiz|Istanbul| | 4| Burcu| 29|Pazarlamaci| Ankara| | 5|Yasemin| 23|Pazarlamaci| Bursa| | 7| Burcu| 29|Pazarlamaci| Ankara| | 9| Ahmet| 33| Doktor| Ankara| +--------+-------+---+-----------+--------+
Yukarıdaki tabloda gördüğümüz üzere bir tane bile null değere rastlamıyoruz. Null değer bulunan 6 ve 8 inci satırları düşürmüş durumda.
PySpark Dataframe Null Değerleri Doldurma (fillna)
Az önce null değerleri düşürmeyi görmüştük. Şimdi ise null değerlerin yerine belirleyeceğimiz herhangi bir değeri atamayı göreceğiz. Orijinal tablomuz:
insanlarDF.show() +--------+-------+---+-----------+--------+ |RecordId| Adi|Yas| Meslek| Sehir| +--------+-------+---+-----------+--------+ | 1| Cemal| 35| Isci| Ankara| | 2| Ceyda| 42| Memur| Kayseri| | 3| Timur| 30| Issiz|Istanbul| | 4| Burcu| 29|Pazarlamaci| Ankara| | 5|Yasemin| 23|Pazarlamaci| Bursa| | 6| Ali| 33| null| null| | 7| Burcu| 29|Pazarlamaci| Ankara| | 8| null| 31| null| null| | 9| Ahmet| 33| Doktor| Ankara| +--------+-------+---+-----------+--------+
Boşluklar Bosluk kelimesiyle doldurulmuş tablo:
insanlarDF.fillna('Bosluk').show() +--------+-------+---+-----------+--------+ |RecordId| Adi|Yas| Meslek| Sehir| +--------+-------+---+-----------+--------+ | 1| Cemal| 35| Isci| Ankara| | 2| Ceyda| 42| Memur| Kayseri| | 3| Timur| 30| Issiz|Istanbul| | 4| Burcu| 29|Pazarlamaci| Ankara| | 5|Yasemin| 23|Pazarlamaci| Bursa| | 6| Ali| 33| Bosluk| Bosluk| | 7| Burcu| 29|Pazarlamaci| Ankara| | 8| Bosluk| 31| Bosluk| Bosluk| | 9| Ahmet| 33| Doktor| Ankara| +--------+-------+---+-----------+--------+
PySpark Dataframe Filtreleme (filter)
Şimdi yaşı 30’dan büyük olanları filtreleyelim. Bunun için filter() metodunu kullanıyoruz.
insanlarDF.filter(insanlarDF.Yas > 30).show() +--------+-----+---+------+-------+ |RecordId| Adi|Yas|Meslek| Sehir| +--------+-----+---+------+-------+ | 1|Cemal| 35| Isci| Ankara| | 2|Ceyda| 42| Memur|Kayseri| | 6| Ali| 33| null| null| | 8| null| 31| null| null| | 9|Ahmet| 33|Doktor| Ankara| +--------+-----+---+------+-------+
PySpark Dataframe Gruplama (groupby & agg)
Şimdi Mesleklere göre yaş ortalamalarını gruplayalım. Bunun için groupby() ve agg() metodlarını kullanacağız.
insanlarDF.groupby('Meslek').agg({'Yas': 'mean'}).show() +-----------+--------+ | Meslek|avg(Yas)| +-----------+--------+ | Issiz| 30.0| | Memur| 42.0| | Doktor| 33.0| | Isci| 35.0| | null| 32.0| |Pazarlamaci| 27.0| +-----------+--------+
Burada aggregation yapacağımız niteliğin numerik olmasına dikkat etmemiz gerekiyor. Şayet şöyle birşey yapmak isteseydik ki saçma bir şey olurdu, Mesleklere göre Şehir ortalamalarını almak gibi:
insanlarDF.groupby('Meslek').agg({'Sehir': 'mean'}).show() +-----------+----------+ | Meslek|avg(Sehir)| +-----------+----------+ | Issiz| null| | Memur| null| | Doktor| null| | Isci| null| | null| null| |Pazarlamaci| null| +-----------+----------+
Allah’tan hata vermedi. Ancak kibarca istediğin mantıksız kardeş dedi 🙂
PySpark Dataframe Birden Çok Nitelikle Gruplama (groupby & agg)
Bir önceki örneğimizde mesleklere göre yaş ortalamalarını bulmuştuk. Şehir ortalamasında ise null değeri almıştık. Bunun sebebi de Sehir niteliğinin numerik olmayışı (dört işleme uygun değil) idi. Ancak kategorik nitelikler üzerinde de bazı aggregation fonksiyonlarını çalıştırabiliriz. Burada count örneğini vereceğim.
insanlarDF.groupby('Meslek').agg({'Yas':'mean','Sehir': 'count'}).show() +-----------+------------+--------+ | Meslek|count(Sehir)|avg(Yas)| +-----------+------------+--------+ | Issiz| 1| 30.0| | Memur| 1| 42.0| | Doktor| 1| 33.0| | Isci| 1| 35.0| | null| 0| 32.0| |Pazarlamaci| 3| 27.0| +-----------+------------+--------+
Meslek gruplarında kaç tane şehir var ayrı bir sütunda gördük.
Şimdilik burada kesiyorum. PySpark ile Spark Dataframe İşlemleri yazı dizimize devam edeceğiz. Veriyle kalınız…
Güzel bir Türkçe kaynak olmuş, elinize sağlık
Çok teşekkürler Tuğba Hanım. Tekrar bekleriz 🙂
Öncelikle ellerinize sağlık, yalnız kodda eksik var, “rdd = sc.parallelize(liste)” kısmında sc nereden geliyor?
Merhaba sc, SparkContext’i temsil ediyor. Uygulama başlangıcında sc’nin oluşturulmuş olduğunu varsayıyoruz. Çok yaygın bir kullanım olduğu için detaylıca açıklamadık.