PySpark Dataframe İşlemleri
Bölüm 1
Bu yazımızda Spark’ın Dataframe’inden bahsedeceğim. Spark Dataframe (nedense Spark Tablosu diyesim var 🙂 ) yapısını ilişkisel veri tabanlarındaki tablolara benzetebiliriz; satırlar, sütunlar ve şema. Spark Dataframe; Python, R, Pandas vb. dillerdeki dataframe benzese de en büyük farkı dağıtık işlemeye uygun olmasıdır. Spark Tablosunu; ilişkisel veri tabanı tabloları, Hive tabloları, Spark RDD ler gibi birçok farklı kaynaktan oluşturabiliriz.
Spark Tablosunun bazı avantajları şunlardır: Python, R, Scala, Java gibi API desteği bulunmaktadır. Bir sütun altında tek bir veri tipi bulunduğundan Spark bu tablodan şemayı daha rahat anlar ve işlemlerini daha iyi optimize eder. Çok büyük hacimlerdeki (petabayt) yapısal ve yarı-yapısal verileri işlemeye müsaittir.
RDD’den Spark Tablosu oluşturma:
from pyspark.sql import Row liste = [('Cemal',35,'İşçi','Ankara'),('Ceyda',42,'Memur','Kayseri'),('Timur',30,'İşsiz','İstanbul'),'Burcu',29,'Pazarlamacı','Ankara')] rdd = sc.parallelize(liste) insanlar = rdd.map(lambda x: Row(isim=x[0], yas=int(x[1]), meslek=x[2], ikamet=x[3])) insanlarDF = sqlContext.createDataFrame(insanlar)
Hive Tablosundan:
from pyspark.sql import Row veriSetimSparkDF = sqlContext.sql("select * from veritabanim.tablom")
Yukarıdaki sorgu Hive’de veritabanim isimli veritabanında tablom ismindeki tüm kayıtları alarak veriSetimSparkDF isimli dataframe değişkenine atar. type(veriSetimSparkDF) komutuyla pyspark.sql.dataframe.DataFrame sonucunu alırız.
Dataframe ile biraz oynayalım:
Şema yazdır:
insanlarDF.printSchema() root |-- ikamet: string (nullable = true) |-- isim: string (nullable = true) |-- meslek: string (nullable = true) |-- yas: long (nullable = true)
İlk satırı yazdıralım:
insanlarDF.first() Row(ikamet=u'Ankara', isim=u'Cemal', meslek=u'isci', yas=35)
İlk iki satırı gösterelim:
insanlarDF.show(2) +-------+-----+------+---+ | ikamet| isim|meslek|yas| +-------+-----+------+---+ | Ankara|Cemal| isci| 35| |Kayseri|Ceyda| Memur| 42| +-------+-----+------+---+ only showing top 2 rows
Satırları sayalım:
insanlarDF.count() 4
İstatistiksel özet alalım:
insanlarDF.describe().show() +-------+----------------+ |summary| yas| +-------+----------------+ | count| 4| | mean| 34.0| | stddev|5.94418483337567| | min| 29| | max| 42| +-------+----------------+
Yukarıda görüldüğü üzere sadece yaş değişkeni ile ilgili bilgi alabildik. Bunun sebebi sadece yaş değişkeninin nümerik olmasıdır. Çünkü aritmetik ortalama, standart sapma vb. işlemler sadece rakamlarla yapılır. Şimdi aynı işlemi kategorik özelliklerden birine yapalım bakalım sonuç ne olacak:
insanlarDF.describe('meslek').show() +-------+-------+ |summary| meslek| +-------+-------+ | count| 4| | mean| null| | stddev| null| | min| Memur| | max| İşsiz| +-------+-------+
Görüldüğü gibi ortalama ve standart sapma null oldu. min ve max ise muhtemelen rastgele kategorik bir değer aldı. Yani sonuçlar tamamen anlamsız.
Şimdi sütunları nasıl seçeriz ona bir bakalım:
insanlarDF.select('isim','yas','meslek').show() +-----+---+-----------+ | isim|yas| meslek| +-----+---+-----------+ |Cemal| 35| isci| |Ceyda| 42| Memur| |Timur| 30| İşsiz| |Burcu| 29|Pazarlamaci| +-----+---+-----------+
Şimdi kaç tane farklı şehir var onu saydıralım. Burada SQL bilgimiz bize hemen distinct ve count sözcüklerini hatırlatacaktır.
insanlarDF.select('ikamet').distinct().count() 3
Şimdi kesiyorum. Müteakip yazılarımda dataframe ile biraz daha oynayacağız. Veriyle kalın…
Güzel bir Türkçe kaynak olmuş, elinize sağlık
Çok teşekkürler Tuğba Hanım. Tekrar bekleriz 🙂
Öncelikle ellerinize sağlık, yalnız kodda eksik var, “rdd = sc.parallelize(liste)” kısmında sc nereden geliyor?
Merhaba sc, SparkContext’i temsil ediyor. Uygulama başlangıcında sc’nin oluşturulmuş olduğunu varsayıyoruz. Çok yaygın bir kullanım olduğu için detaylıca açıklamadık.