PySpark Dataframe İşlemleri
Bölüm 3
Pyspark ile Spark Dataframe işlemleri yazı dizimize devam ediyoruz.
from pyspark.sql import Row liste = [ (1,'Cemal',35,'Isci','Ankara'), (2,'Ceyda',42,'Memur','Kayseri'), (3,'Timur',30,'Issiz','Istanbul'), (4,'Burcu',29,'Pazarlamaci','Ankara'), (5,'Yasemin',23,'Pazarlamaci','Bursa'), (6,'Ali',33,None,None), (7,'Burcu',29,'Pazarlamaci','Ankara'), (8,None,31,None,None), (9,'Ahmet',33,'Doktor','Ankara') ] rdd = sc.parallelize(liste) insanlar = rdd.map(lambda x: Row(record_id=int(x[0]),isim=x[1], yas=int(x[2]), meslek=x[3], ikamet=x[4])) insanlarDF = sqlContext.createDataFrame(insanlar) insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir")
PySpark Dataframe Örneklem Seçmek (Sampling)
Dataframe’den örneklem alabiliriz. Kullanılacak metod sample(). Bu metod üç parametre alır: 1. withReplacement (True ya da False), 2. fraction örneğin .3 demek %30’unu al demek, 3. seed değeri rastgele seçim değeri.
insanlarDF.sample(False, 0.2, 3).show() +--------+-----+---+------+--------+ |RecordId| Adi|Yas|Meslek| Sehir| +--------+-----+---+------+--------+ | 1|Cemal| 35| Isci| Ankara| | 3|Timur| 30| Issiz|Istanbul| | 9|Ahmet| 33|Doktor| Ankara| +--------+-----+---+------+--------+
PySpark Dataframe Map İşlemi
Yukarıda ve daha önce gördüğümüz birçok işlem sonuç olarak yine dataframe döndürürken map() metodu sonuç olarak rdd döndürmektedir.
yasRDD = insanlarDF.map(lambda x: Row(Yas= (x[2] + 10))) yasRDD.take(9) [Row(Yas=45), Row(Yas=52), Row(Yas=40), Row(Yas=39), Row(Yas=33), Row(Yas=43), Row(Yas=39), Row(Yas=41), Row(Yas=43)]
Yukarıda Row(Yas = x[2]…) komutunda 2 sütun sıralamasında Yas sütununun index değerini verir. RecordId 0, Adi 1 ve Yas 2. Özetle yukarıda yaptığımız iş şu: Yaş sütunundaki her bir değere 10 ekledik ve yasRDD adında yeni bir RDD’ye atadık ve bu rdd’yi yazdırdık.
PySpark Dataframe Sıralama (Sorting, OrderBy)
Şimdi dataframe’i büyükten küçüğe doğru yaş sıralamasına koyalım:
insanlarDF.orderBy(insanlarDF.Yas.desc()).show() +--------+-------+---+-----------+--------+ |RecordId| Adi|Yas| Meslek| Sehir| +--------+-------+---+-----------+--------+ | 2| Ceyda| 42| Memur| Kayseri| | 1| Cemal| 35| Isci| Ankara| | 6| Ali| 33| null| null| | 9| Ahmet| 33| Doktor| Ankara| | 8| null| 31| null| null| | 3| Timur| 30| Issiz|Istanbul| | 4| Burcu| 29|Pazarlamaci| Ankara| | 7| Burcu| 29|Pazarlamaci| Ankara| | 5|Yasemin| 23|Pazarlamaci| Bursa| +--------+-------+---+-----------+--------+
İsterseniz bir de isimlere göre sıralayalım:
insanlarDF.orderBy(insanlarDF.Adi).show() +--------+-------+---+-----------+--------+ |RecordId| Adi|Yas| Meslek| Sehir| +--------+-------+---+-----------+--------+ | 8| null| 31| null| null| | 9| Ahmet| 33| Doktor| Ankara| | 6| Ali| 33| null| null| | 4| Burcu| 29|Pazarlamaci| Ankara| | 7| Burcu| 29|Pazarlamaci| Ankara| | 1| Cemal| 35| Isci| Ankara| | 2| Ceyda| 42| Memur| Kayseri| | 3| Timur| 30| Issiz|Istanbul| | 5|Yasemin| 23|Pazarlamaci| Bursa| +--------+-------+---+-----------+--------+
Yukarıda parantez içindeki insanlarDF.Adi sonuna küçükten büyüğe sıralama yapan asc() fonksiyonu eklesek de olurdu. Eklemedik, demek ki varsayılan sıralama küçükten büyüğe.
PySpark Dataframe Yeni Sütun Eklemek
Bu işlemi yapmak için withColumn() metodunu kullanıyoruz. Yeni bir sütun ekleyebileceğimiz gibi mevcut olanın yerine de koyabiliriz. Şimdi yaşımızı 2 ile çarpalım ve Yas_carpi_iki isminde yeni sütun olarak ekleyelim:
insanlarDF.withColumn('Yas_carpi_iki', insanlarDF.Yas * 2).show() +--------+-------+---+-----------+--------+-------------+ |RecordId| Adi|Yas| Meslek| Sehir|Yas_carpi_iki| +--------+-------+---+-----------+--------+-------------+ | 1| Cemal| 35| Isci| Ankara| 70| | 2| Ceyda| 42| Memur| Kayseri| 84| | 3| Timur| 30| Issiz|Istanbul| 60| | 4| Burcu| 29|Pazarlamaci| Ankara| 58| | 5|Yasemin| 23|Pazarlamaci| Bursa| 46| | 6| Ali| 33| null| null| 66| | 7| Burcu| 29|Pazarlamaci| Ankara| 58| | 8| null| 31| null| null| 62| | 9| Ahmet| 33| Doktor| Ankara| 66| +--------+-------+---+-----------+--------+-------------+
Güzel bir Türkçe kaynak olmuş, elinize sağlık
Çok teşekkürler Tuğba Hanım. Tekrar bekleriz 🙂
Öncelikle ellerinize sağlık, yalnız kodda eksik var, “rdd = sc.parallelize(liste)” kısmında sc nereden geliyor?
Merhaba sc, SparkContext’i temsil ediyor. Uygulama başlangıcında sc’nin oluşturulmuş olduğunu varsayıyoruz. Çok yaygın bir kullanım olduğu için detaylıca açıklamadık.