PySpark Dataframe İşlemleri

Bölüm 3

Pyspark ile Spark Dataframe işlemleri yazı dizimize devam ediyoruz.

from pyspark.sql import Row
liste = [
 (1,'Cemal',35,'Isci','Ankara'),
 (2,'Ceyda',42,'Memur','Kayseri'),
 (3,'Timur',30,'Issiz','Istanbul'),
 (4,'Burcu',29,'Pazarlamaci','Ankara'),
 (5,'Yasemin',23,'Pazarlamaci','Bursa'),
 (6,'Ali',33,None,None),
 (7,'Burcu',29,'Pazarlamaci','Ankara'),
 (8,None,31,None,None),
 (9,'Ahmet',33,'Doktor','Ankara')
]
rdd = sc.parallelize(liste)
insanlar = rdd.map(lambda x: Row(record_id=int(x[0]),isim=x[1], yas=int(x[2]), meslek=x[3], ikamet=x[4]))
insanlarDF = sqlContext.createDataFrame(insanlar)

insanlarDF = insanlarDF.selectExpr("record_id as RecordId","isim as Adi","yas as Yas","meslek as Meslek","ikamet as Sehir")

PySpark Dataframe Örneklem Seçmek (Sampling)

Dataframe’den örneklem alabiliriz. Kullanılacak metod sample(). Bu metod üç parametre alır: 1. withReplacement (True ya da False), 2. fraction örneğin .3 demek %30’unu al demek, 3. seed değeri rastgele seçim değeri.

insanlarDF.sample(False, 0.2, 3).show()
+--------+-----+---+------+--------+
|RecordId|  Adi|Yas|Meslek|   Sehir|
+--------+-----+---+------+--------+
|       1|Cemal| 35|  Isci|  Ankara|
|       3|Timur| 30| Issiz|Istanbul|
|       9|Ahmet| 33|Doktor|  Ankara|
+--------+-----+---+------+--------+

PySpark Dataframe Map İşlemi

Yukarıda ve daha önce gördüğümüz birçok işlem sonuç olarak yine dataframe döndürürken map() metodu sonuç olarak rdd döndürmektedir.

yasRDD = insanlarDF.map(lambda x: Row(Yas= (x[2] + 10)))
yasRDD.take(9)
[Row(Yas=45),
 Row(Yas=52),
 Row(Yas=40),
 Row(Yas=39),
 Row(Yas=33),
 Row(Yas=43),
 Row(Yas=39),
 Row(Yas=41),
 Row(Yas=43)]

Yukarıda Row(Yas = x[2]…) komutunda 2 sütun sıralamasında Yas sütununun index değerini verir. RecordId 0, Adi 1 ve Yas 2. Özetle yukarıda yaptığımız iş şu: Yaş sütunundaki her bir değere 10 ekledik ve  yasRDD adında yeni bir RDD’ye atadık ve bu rdd’yi yazdırdık.

PySpark Dataframe Sıralama (Sorting, OrderBy)

Şimdi dataframe’i büyükten küçüğe doğru yaş sıralamasına koyalım:

insanlarDF.orderBy(insanlarDF.Yas.desc()).show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       2|  Ceyda| 42|      Memur| Kayseri|
|       1|  Cemal| 35|       Isci|  Ankara|
|       6|    Ali| 33|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
|       8|   null| 31|       null|    null|
|       3|  Timur| 30|      Issiz|Istanbul|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
+--------+-------+---+-----------+--------+

İsterseniz bir de isimlere göre sıralayalım:

insanlarDF.orderBy(insanlarDF.Adi).show()
+--------+-------+---+-----------+--------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|
+--------+-------+---+-----------+--------+
|       8|   null| 31|       null|    null|
|       9|  Ahmet| 33|     Doktor|  Ankara|
|       6|    Ali| 33|       null|    null|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|
|       1|  Cemal| 35|       Isci|  Ankara|
|       2|  Ceyda| 42|      Memur| Kayseri|
|       3|  Timur| 30|      Issiz|Istanbul|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|
+--------+-------+---+-----------+--------+

Yukarıda parantez içindeki insanlarDF.Adi sonuna küçükten büyüğe sıralama yapan asc() fonksiyonu eklesek de olurdu. Eklemedik, demek ki varsayılan sıralama küçükten büyüğe.

PySpark Dataframe Yeni Sütun Eklemek

Bu işlemi yapmak için withColumn() metodunu kullanıyoruz. Yeni bir sütun ekleyebileceğimiz gibi mevcut olanın yerine de koyabiliriz. Şimdi yaşımızı 2 ile çarpalım ve Yas_carpi_iki isminde yeni sütun olarak ekleyelim:

insanlarDF.withColumn('Yas_carpi_iki', insanlarDF.Yas * 2).show()
+--------+-------+---+-----------+--------+-------------+
|RecordId|    Adi|Yas|     Meslek|   Sehir|Yas_carpi_iki|
+--------+-------+---+-----------+--------+-------------+
|       1|  Cemal| 35|       Isci|  Ankara|           70|
|       2|  Ceyda| 42|      Memur| Kayseri|           84|
|       3|  Timur| 30|      Issiz|Istanbul|           60|
|       4|  Burcu| 29|Pazarlamaci|  Ankara|           58|
|       5|Yasemin| 23|Pazarlamaci|   Bursa|           46|
|       6|    Ali| 33|       null|    null|           66|
|       7|  Burcu| 29|Pazarlamaci|  Ankara|           58|
|       8|   null| 31|       null|    null|           62|
|       9|  Ahmet| 33|     Doktor|  Ankara|           66|
+--------+-------+---+-----------+--------+-------------+

Yazar Hakkında
Toplam 181 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (4 yorum)
Tugba
Tugba Yanıtla
- 14:03

Güzel bir Türkçe kaynak olmuş, elinize sağlık

    Erkan ŞİRİN
    Erkan ŞİRİN Yanıtla
    - 19:16

    Çok teşekkürler Tuğba Hanım. Tekrar bekleriz 🙂

Sezer Bozkir
Sezer Bozkir Yanıtla
- 10:47

Öncelikle ellerinize sağlık, yalnız kodda eksik var, “rdd = sc.parallelize(liste)” kısmında sc nereden geliyor?

    Erkan ŞİRİN
    Erkan ŞİRİN Yanıtla
    - 22:35

    Merhaba sc, SparkContext’i temsil ediyor. Uygulama başlangıcında sc’nin oluşturulmuş olduğunu varsayıyoruz. Çok yaygın bir kullanım olduğu için detaylıca açıklamadık.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara