IntelliJ IDEA ile Apache Spark Projesini Uzak YARN Cluster Üzerinde Çalıştırmak-2/2

Merhaba. Yazı serimizin ikincisine devam ediyoruz. Bir önceki ilk yazımızda konuya giriş yapmıştık. Hatırlayalım, amacımız Spark uygulamasını Windows bilgisayarımızda kurulu IntelliJ ile uzak hadoop cluster üzerinde geliştirmekti. IntelliJ ile devam ediyoruz.

4. IntelliJ Spark Uygulaması: Maven Projesi Oluşturmak

Şimdi uygulamamızı yazmaya başlayacağız. Bunun için IntelliJ’i başlatalım.

Intellij üzerinden yeni proje (Create New Project) diyoruz. Bir sonraki ekranda Maven seçiyoruz.

Bir sonraki pencerede GroupId ve ArtifactId belirliyoruz.

Projenin adını ve nereye kaydedileceğini seçiyoruz. Finish diyoruz. Eğer yeni bir klasör yaratılması gerekiyorsa yaratayım mı diye soracaktır. Ok deyip devam edelim.

sparkremotecluster‘a sağ tıklayıp Add Framework Support‘tan resimde görüldüğü gibi Scala dilini seçiyoruz. Daha sonra main ve test kısmında java olan yerleri (Shift+F6) ile scala yapıyoruz. En son hem main hem test altında com.veribilimiokulu paketini ekliyoruz (scala üzerine sağ tıkla -> New -> Package).

pom.xml dosyasına aşağıdaki dependencies ekleniyor.

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

    </dependencies>

Yukarıda Spark versiyonunu 2.4.0 yapmamız dikkatinizi çekmiş olmalı zira Cloudera 6.2.0 paketindeki Spark sürümü 2.4.0. Aksi halde java.io.InvalidClassException: org.apache.spark.sql.execution.datasources.FilePartition; local class incompatible: stream classdesc serialVersionUID hatasını alabiliriz.

com.veribilimiokulu paketi altında bir object oluşturuyoruz.

5. SparkConf Ayarları

Spark uygulamamıza ait object oluştu. Şimdi basit bir uygulama yazabiliriz. Ancak SparkSession oluştururken tekrar bazı konfigürasyon değerlerini SparkConf ile girmemiz gerekir. Bu ayarların ne olduğu ve değerlerin nereden öğrenileceği yorum satırlarında belirtilmiştir.

val sparkConf = new SparkConf()
    .setMaster("yarn")
    .setAppName("SparkDeneme")

    // HDFS konfigürasyonundan "fs.defaultFS" ile aranır. Sunucu adresi HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.hadoop.fs.defaultFS","hdfs://cdh1.impektra.com:8020")

    // YARN konfigürasyonundan "resourcemanager.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.address","cdh2.impektra.com:8032")

    // YARN konfigürasyonundan "scheduler.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.scheduler.address","cdh2.impektra.com:8030")

    // HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.yarn.jars","hdfs://cdh1.impektra.com:8020/tmp/spark_jars/*.jar")

Şayet uygulama çalışırken dirver ile bağlantı hatası verir ise hadoop cluster’ın bizim makinemize dönerken kullandığı port numarası Windows güvenlik duvarından kapalı olabilir. Bunu açmalıyız. Bunu burada detaylı yazmıyorum çünkü port açma ile ilgili bir çok kaynak mevcut. Böyle bir hata alırsanız açarsınız.

6. Kodları Çalıştırma

Tüm kodlarımız ise aşağıdadır:

package com.veribilimiokulu

import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.{functions => F}
import org.apache.log4j.{Logger, Level}
import org.apache.spark.sql.{functions => F}
import org.apache.spark.SparkConf

object SparkDeneme extends App {

  Logger.getLogger("org").setLevel(Level.INFO) // ilk etapta INFI yaparak alınacak hataların sebepleri öğrenilir.
  // Bütün hatalar ayıklandıktan sonra ERROR'a getirilir

  val sparkConf = new SparkConf()
    .setMaster("yarn")
    .setAppName("SparkDeneme")

    // HDFS konfigürasyonundan "fs.defaultFS" ile aranır. Sunucu adresi HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.hadoop.fs.defaultFS","hdfs://cdh1.impektra.com:8020")

    // YARN konfigürasyonundan "resourcemanager.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.address","cdh2.impektra.com:8032")

    // YARN konfigürasyonundan "scheduler.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir.
    .set("spark.hadoop.yarn.resourcemanager.scheduler.address","cdh2.impektra.com:8030")

    // HDFS -> Instances -> NameNode'dan öğrenilir.
    .set("spark.yarn.jars","hdfs://cdh1.impektra.com:8020/tmp/spark_jars/*.jar")


  val spark = SparkSession.builder()
    .config(sparkConf)
    .getOrCreate()

  import spark.implicits._


  val df = spark.read.format("csv")
    .option("header",true)
    .option("inferSchema", true)
    .option("sep",",")
    .load("/user/admin/data/iris.csv")

  df.show()


}

Sonuç:

+-------------+------------+-------------+------------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+-------------+------------+-------------+------------+-----------+
|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
|          5.4|         3.9|          1.7|         0.4|Iris-setosa|
|          4.6|         3.4|          1.4|         0.3|Iris-setosa|
|          5.0|         3.4|          1.5|         0.2|Iris-setosa|
|          4.4|         2.9|          1.4|         0.2|Iris-setosa|
|          4.9|         3.1|          1.5|         0.1|Iris-setosa|
|          5.4|         3.7|          1.5|         0.2|Iris-setosa|
|          4.8|         3.4|          1.6|         0.2|Iris-setosa|
|          4.8|         3.0|          1.4|         0.1|Iris-setosa|
|          4.3|         3.0|          1.1|         0.1|Iris-setosa|
|          5.8|         4.0|          1.2|         0.2|Iris-setosa|
|          5.7|         4.4|          1.5|         0.4|Iris-setosa|
|          5.4|         3.9|          1.3|         0.4|Iris-setosa|
|          5.1|         3.5|          1.4|         0.3|Iris-setosa|
|          5.7|         3.8|          1.7|         0.3|Iris-setosa|
|          5.1|         3.8|          1.5|         0.3|Iris-setosa|
+-------------+------------+-------------+------------+-----------+
only showing top 20 rows

Elbette yukarıdaki sonuç bir çok INFO logunun arasındadır. Şayet hata ayıklama bitmiş ise log seviyesini ERROR’a çekebilirsiniz.

Kodlar çalışırken ResourceManager üzerinden takip edebilirsiniz. Örnek ekran görüntüsü aşağıdadır. (Sizin adresiniz farklı olacaktır)

http://cdh2.impektra.com:8088/cluster/apps/RUNNING

Evet böylelikle Windows makinemizden hiç rahatımızı bozmadan uzak hadoop cluster üzerinde kod geliştirme yapabildik ve windows makinemizi Spark driver olarak kullandık.

Büyük veriyle kalın…

Yazar Hakkında
Toplam 180 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara