Apache Spark 2.3.0 OneHotEncoderEstimator: Scala Örnek Uygulaması

Apache Spark 2X’e geçtikten sonra dördüncü sürümünü genel kullanıma sundu. Son sürüm 2.3.0. Bu sürümle birlikte gelen bir çok yenilik arasında MLlib kütüphanesindeki OneHotEncoderEstimator da yer alıyor. Bu sınıf, makine öğrenmesinde veri hazırlığı aşamasında kategorik niteliklerin vektör haline getirilmesine katkıda bulunuyor. Eskisinden farkı; sınıf Transformer’dan Estimator’a dönmüş. Anlaşılabilecek değişiklik; eski sürümde nitelikleri tek tek girdi olarak vermek zorunluluğu vardı. Şimdi ise array halinde topluca verebiliyoruz. Ben StringIndexer’a bağımlılıktan kurtulmuştur diye sevinmiştim ancak nitelikler nümerik değil ise bunu StringIndexer ile nümerik hale getirmek gerekiyor.
Bu yazımızda bu sınıfın (OneHotEncoderEstimator) kullanımı ile ilgili çok basit bir örnek yapacağız. Belki başka kullanım yöntemleri de vardır ancak çalışan ve bildiğin yöntem, daha iyi de olsa bilmediğin yöntemlerden iyidir. Daha iyi ve kolay yöntemleri yorum satırlarında sizlerden bekleriz 🙂
Örneğimiz kendi oluşturduğumuz iki sütunlu bir dataframe üzerinde olacak. Örneğimiz; dataframe oluşturmadan, analize girecek niteliklerin vektör haline getirilmesine kadar olan aşamayı kapsıyor.
Kullandığım araçlar: Apache Zeppelin, Scala, Spark 2.3.0 MLlib kütüphaneleri, Spark 2.3.0

1. Önce kütüphaneleri indirelim:

import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler, StringIndexer}
import org.apache.spark.ml.{Pipeline}

2. Dataframe oluştur:

val df = spark.createDataFrame(Seq(
  ("Bir", "İki"),
  ("Bir", "Sıfır"),
  ("İki", "Bir"),
  ("Beş", "İki"),
  ("Dört", "Bir"),
  ("İki", "Üç"),
  ("İki", "Üç"),
  ("Sıfır", "Bir")
)).toDF("sutun_01", "sutun_02")

Yukarıda gördüğümüz gibi iki sütun var, ikisi de kategorik.
Veri setimizin başlangıç dataframe görünümü:

3. Şimdi bu kategorik nitelikleri rakamlara çevirelim. Niye çeviriyoruz? Çünkü OneHotEncoderEstimator girdi olarak nümerik değerler istiyor. Bu dönüştürme (transformation) işlemini StringIndexer sınıfı ile yapacağız. Peki, bu sınıf ne iş yapıyor? Kategorik sütundaki her bir kategorik sınıfın tekrarlanma sayısını hesaplıyor. En çok tekrarlanandan en az tekrarlanana doğru sıralayıp her bir tekil sınıfa 0’dan başlayan bir rakam atıyor.

def stringIndexerPipeline(inputCol: String): (Pipeline, String) = {
    val indexer = new StringIndexer().
      setInputCol(inputCol).
      setOutputCol(inputCol + "_indexed")
    val pipeline = new Pipeline().setStages(Array(indexer))
    (pipeline, inputCol + "_indexed")
  }

Yukarıdaki fonksiyon, girdi olarak sütun isimlerimizi alıyor. Çıktı olarak ise bir StringIndexer nesnesine sahip Pipeline nesnesi ve sütun ismine _indexed eklenmiş bir string döndürüyor. Bu çıktılar OneHotEncoderEstimator için kullanılacak.
Bu aşamadan sonra dataframe’in alacağı hal aşağıdaki resimde görülüyor. Ben burada dataframe’in en son aşamada alacağı haline ait ekran görüntüsünü kesip koyuyorum. Pipeline kullandığımız için tüm fit() metodları ardısıra çalışıyor. O yüzden şuan göremezsiniz.


Yukarıdaki fonksiyonu her bir sütun için çalıştıralım ve sonuçları değişkenlerde tutalım:

val (sutun_01Pipeline, sutun_01_indexed) = stringIndexerPipeline("sutun_01")
val (sutun_02Pipeline, sutun_02_indexed) = stringIndexerPipeline("sutun_02")

4. Şimdi yazımıza konu olan sınıftan bir nesne oluşturalım. Nesneyi oluştururken setInputCols ve setOutputCols metodları ile sınıfın niteliklerine sınıf yaratılırken değer atayalım (burası biraz Nesneye Yönelik Programlama bilgisi içeriyor).

val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array("sutun_01_indexed", "sutun_02_indexed"))
  .setOutputCols(Array("sutun_01__indexedVec", "sutun_02__indexedVec"))

Girdinin Array[String] ve elemanlarının da StringIndexer tarafından üretilen sütun isimleri olduğuna dikkat edelim.
Bu aşamadan sonra dataframe:

5. Şimdi analize girecek sütunları vektör haline (tüm sütunları tek sütunda rakamlar haline getirme) getirmeye geldik. Burada da OneHotEncoderEstimator tarafından üretilen sütun isimlerini array içinde veriyoruz. Bunun için OneHotEncoderEstimator sınıfının bir metodunu kullanıyoruz: getOutputCols. Bu metod aslında sınıfın özelliği olan outputCols içinde saklanan sütun isimlerini getirip bize array içinde sunuyor. Çıktımız ise analize girecek olan featureVector

val vectorAssembler = new VectorAssembler().
     setInputCols(encoder.getOutputCols).
     setOutputCol("featureVector")

6. Yukarıda ürettiklerimizi bir tren katarı gibi pipeline içinde ardısıra dizelim:

val pipeline = new Pipeline().setStages(
      Array(sutun_01Pipeline, sutun_02Pipeline, encoder, vectorAssembler))

Tren katarı içinde estimator olanların fit() metodunu çalıştırtalım.

val model = pipeline.fit(df)

Dönen sonucu (PipelineModel) transform edelim ve dataframe’in son halini görelim:

val encoded = model.transform(df)
encoded.show()
+--------+--------+----------------+----------------+--------------------+--------------------+-------------------+
|sutun_01|sutun_02|sutun_01_indexed|sutun_02_indexed|sutun_01__indexedVec|sutun_02__indexedVec|      featureVector|
+--------+--------+----------------+----------------+--------------------+--------------------+-------------------+
|     Bir|     İki|             1.0|             1.0|       (4,[1],[1.0])|       (3,[1],[1.0])|(7,[1,5],[1.0,1.0])|
|     Bir|   Sıfır|             1.0|             3.0|       (4,[1],[1.0])|           (3,[],[])|      (7,[1],[1.0])|
|     İki|     Bir|             0.0|             0.0|       (4,[0],[1.0])|       (3,[0],[1.0])|(7,[0,4],[1.0,1.0])|
|     Beş|     İki|             4.0|             1.0|           (4,[],[])|       (3,[1],[1.0])|      (7,[5],[1.0])|
|    Dört|     Bir|             2.0|             0.0|       (4,[2],[1.0])|       (3,[0],[1.0])|(7,[2,4],[1.0,1.0])|
|     İki|      Üç|             0.0|             2.0|       (4,[0],[1.0])|       (3,[2],[1.0])|(7,[0,6],[1.0,1.0])|
|     İki|      Üç|             0.0|             2.0|       (4,[0],[1.0])|       (3,[2],[1.0])|(7,[0,6],[1.0,1.0])|
|   Sıfır|     Bir|             3.0|             0.0|       (4,[3],[1.0])|       (3,[0],[1.0])|(7,[3,4],[1.0,1.0])|
+--------+--------+----------------+----------------+--------------------+--------------------+-------------------+

Github’da maven projesi olarak hazırlanmış kodlar buradadır.

Apache Zeppelin için kodlarımızın hepsi toplu halde aşağıdadır:

import org.apache.spark.ml.feature.{OneHotEncoderEstimator, VectorAssembler, StringIndexer}
import org.apache.spark.ml.{Pipeline}

val df = spark.createDataFrame(Seq(
  ("Bir", "İki"),
  ("Bir", "Sıfır"),
  ("İki", "Bir"),
  ("Beş", "İki"),
  ("Dört", "Bir"),
  ("İki", "Üç"),
  ("İki", "Üç"),
  ("Sıfır", "Bir")
)).toDF("sutun_01", "sutun_02")


def stringIndexerPipeline(inputCol: String): (Pipeline, String) = {
    val indexer = new StringIndexer().
      setInputCol(inputCol).
      setOutputCol(inputCol + "_indexed")
    val pipeline = new Pipeline().setStages(Array(indexer))
    (pipeline, inputCol + "_indexed")
  }

val (sutun_01Pipeline, sutun_01_indexed) = stringIndexerPipeline("sutun_01")
val (sutun_02Pipeline, sutun_02_indexed) = stringIndexerPipeline("sutun_02")



val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array("sutun_01_indexed", "sutun_02_indexed"))
  .setOutputCols(Array("sutun_01__indexedVec", "sutun_02__indexedVec"))
  
       
val vectorAssembler = new VectorAssembler().
     setInputCols(encoder.getOutputCols).
     setOutputCol("featureVector")

val pipeline = new Pipeline().setStages(
      Array(sutun_01Pipeline, sutun_02Pipeline, encoder, vectorAssembler))


val model = pipeline.fit(df)
    

val encoded = model.transform(df)
encoded.show()

Başka bir yazıda görüşmek dileğiyle, hoşçakalın…

Yazar Hakkında
Toplam 180 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (3 yorum)
Kadriye
Kadriye Yanıtla
- 14:33

Merhaba,
Bu kodu Windows için hangi IDE de yazdınız?

    Erkan ŞİRİN
    Erkan ŞİRİN Yanıtla
    - 19:34

    Merhaba Kadriye Hanım,
    Windows ortamında değil, CentOS7 üzerinde çalışan Hadoop Cluster üzerindeki Apache Zeppelin ve Spark2 interpreter ile yazdım. Ancak Windows üzerine kurulu bir Spark üzerinde spark-shell ile de aynı kodlar çalıştırılabilir.

Kadriye
Kadriye Yanıtla
- 09:59

Teşekkürler.

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara