IntelliJ IDEA ile Apache Spark Projesini Uzak YARN Cluster Üzerinde Çalıştırmak-2/2
Merhaba. Yazı serimizin ikincisine devam ediyoruz. Bir önceki ilk yazımızda konuya giriş yapmıştık. Hatırlayalım, amacımız Spark uygulamasını Windows bilgisayarımızda kurulu IntelliJ ile uzak hadoop cluster üzerinde geliştirmekti. IntelliJ ile devam ediyoruz.
4. IntelliJ Spark Uygulaması: Maven Projesi Oluşturmak
Şimdi uygulamamızı yazmaya başlayacağız. Bunun için IntelliJ’i başlatalım.
Intellij üzerinden yeni proje (Create New Project) diyoruz. Bir sonraki ekranda Maven seçiyoruz.
Bir sonraki pencerede GroupId ve ArtifactId belirliyoruz.
Projenin adını ve nereye kaydedileceğini seçiyoruz. Finish diyoruz. Eğer yeni bir klasör yaratılması gerekiyorsa yaratayım mı diye soracaktır. Ok deyip devam edelim.
sparkremotecluster‘a sağ tıklayıp Add Framework Support‘tan resimde görüldüğü gibi Scala dilini seçiyoruz. Daha sonra main ve test kısmında java olan yerleri (Shift+F6) ile scala yapıyoruz. En son hem main hem test altında com.veribilimiokulu paketini ekliyoruz (scala üzerine sağ tıkla -> New -> Package).
pom.xml
dosyasına aşağıdaki dependencies
ekleniyor.
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.11</artifactId> <version>2.4.0</version> </dependency> </dependencies>
Yukarıda Spark versiyonunu 2.4.0 yapmamız dikkatinizi çekmiş olmalı zira Cloudera 6.2.0 paketindeki Spark sürümü 2.4.0. Aksi halde java.io.InvalidClassException: org.apache.spark.sql.execution.datasources.FilePartition; local class incompatible: stream classdesc serialVersionUID
hatasını alabiliriz.
com.veribilimiokulu paketi altında bir object oluşturuyoruz.
5. SparkConf Ayarları
Spark uygulamamıza ait object oluştu. Şimdi basit bir uygulama yazabiliriz. Ancak SparkSession oluştururken tekrar bazı konfigürasyon değerlerini SparkConf ile girmemiz gerekir. Bu ayarların ne olduğu ve değerlerin nereden öğrenileceği yorum satırlarında belirtilmiştir.
val sparkConf = new SparkConf() .setMaster("yarn") .setAppName("SparkDeneme") // HDFS konfigürasyonundan "fs.defaultFS" ile aranır. Sunucu adresi HDFS -> Instances -> NameNode'dan öğrenilir. .set("spark.hadoop.fs.defaultFS","hdfs://cdh1.impektra.com:8020") // YARN konfigürasyonundan "resourcemanager.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir. .set("spark.hadoop.yarn.resourcemanager.address","cdh2.impektra.com:8032") // YARN konfigürasyonundan "scheduler.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir. .set("spark.hadoop.yarn.resourcemanager.scheduler.address","cdh2.impektra.com:8030") // HDFS -> Instances -> NameNode'dan öğrenilir. .set("spark.yarn.jars","hdfs://cdh1.impektra.com:8020/tmp/spark_jars/*.jar")
Şayet uygulama çalışırken dirver ile bağlantı hatası verir ise hadoop cluster’ın bizim makinemize dönerken kullandığı port numarası Windows güvenlik duvarından kapalı olabilir. Bunu açmalıyız. Bunu burada detaylı yazmıyorum çünkü port açma ile ilgili bir çok kaynak mevcut. Böyle bir hata alırsanız açarsınız.
6. Kodları Çalıştırma
Tüm kodlarımız ise aşağıdadır:
package com.veribilimiokulu import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.{functions => F} import org.apache.log4j.{Logger, Level} import org.apache.spark.sql.{functions => F} import org.apache.spark.SparkConf object SparkDeneme extends App { Logger.getLogger("org").setLevel(Level.INFO) // ilk etapta INFI yaparak alınacak hataların sebepleri öğrenilir. // Bütün hatalar ayıklandıktan sonra ERROR'a getirilir val sparkConf = new SparkConf() .setMaster("yarn") .setAppName("SparkDeneme") // HDFS konfigürasyonundan "fs.defaultFS" ile aranır. Sunucu adresi HDFS -> Instances -> NameNode'dan öğrenilir. .set("spark.hadoop.fs.defaultFS","hdfs://cdh1.impektra.com:8020") // YARN konfigürasyonundan "resourcemanager.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir. .set("spark.hadoop.yarn.resourcemanager.address","cdh2.impektra.com:8032") // YARN konfigürasyonundan "scheduler.address" aranır. Sunucu adresi YARN -> Instances -> ResourceManager'dan öğrenilir. .set("spark.hadoop.yarn.resourcemanager.scheduler.address","cdh2.impektra.com:8030") // HDFS -> Instances -> NameNode'dan öğrenilir. .set("spark.yarn.jars","hdfs://cdh1.impektra.com:8020/tmp/spark_jars/*.jar") val spark = SparkSession.builder() .config(sparkConf) .getOrCreate() import spark.implicits._ val df = spark.read.format("csv") .option("header",true) .option("inferSchema", true) .option("sep",",") .load("/user/admin/data/iris.csv") df.show() }
Sonuç:
+-------------+------------+-------------+------------+-----------+ |SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm| Species| +-------------+------------+-------------+------------+-----------+ | 5.1| 3.5| 1.4| 0.2|Iris-setosa| | 4.9| 3.0| 1.4| 0.2|Iris-setosa| | 4.7| 3.2| 1.3| 0.2|Iris-setosa| | 4.6| 3.1| 1.5| 0.2|Iris-setosa| | 5.0| 3.6| 1.4| 0.2|Iris-setosa| | 5.4| 3.9| 1.7| 0.4|Iris-setosa| | 4.6| 3.4| 1.4| 0.3|Iris-setosa| | 5.0| 3.4| 1.5| 0.2|Iris-setosa| | 4.4| 2.9| 1.4| 0.2|Iris-setosa| | 4.9| 3.1| 1.5| 0.1|Iris-setosa| | 5.4| 3.7| 1.5| 0.2|Iris-setosa| | 4.8| 3.4| 1.6| 0.2|Iris-setosa| | 4.8| 3.0| 1.4| 0.1|Iris-setosa| | 4.3| 3.0| 1.1| 0.1|Iris-setosa| | 5.8| 4.0| 1.2| 0.2|Iris-setosa| | 5.7| 4.4| 1.5| 0.4|Iris-setosa| | 5.4| 3.9| 1.3| 0.4|Iris-setosa| | 5.1| 3.5| 1.4| 0.3|Iris-setosa| | 5.7| 3.8| 1.7| 0.3|Iris-setosa| | 5.1| 3.8| 1.5| 0.3|Iris-setosa| +-------------+------------+-------------+------------+-----------+ only showing top 20 rows
Elbette yukarıdaki sonuç bir çok INFO logunun arasındadır. Şayet hata ayıklama bitmiş ise log seviyesini ERROR’a çekebilirsiniz.
Kodlar çalışırken ResourceManager üzerinden takip edebilirsiniz. Örnek ekran görüntüsü aşağıdadır. (Sizin adresiniz farklı olacaktır)
http://cdh2.impektra.com:8088/cluster/apps/RUNNING
Evet böylelikle Windows makinemizden hiç rahatımızı bozmadan uzak hadoop cluster üzerinde kod geliştirme yapabildik ve windows makinemizi Spark driver olarak kullandık.
Büyük veriyle kalın…