Spark ile Cassandra’dan Veri Okumak

Merhabalar, Spark ve Cassandra büyük veri dünyasının önde gelen 2 popüler aracı. Apache Cassandra NoSQL veri tabanlarının önde gelen isimlerinden birisi. Adem-i merkeziyetçi yapısıyla (no master-slave) ve yüksek yazma performansıyla dikkatleri üzerine çekiyor. Apache Spark ise bildiğimiz gibi büyük veri işleme motoru/çatısı. Spark ile çok geniş yelpazede veri manipülasyonlarını gerçekleştirebiliyoruz. Spark’ın klasik veri işleme araçlarından en önemli farkı elbette dağıtık çalışması ve ölçeklenebilir olmasıdır. Yani veriniz belli bir büyüklüğe eriştikten sonra artık klasik yöntemler ihtiyacınızı karşılamayacaktır ve sizi Spark’a yönlendirecektir.

Bu yazımızda ise büyük veri dünyasının iki parlak ismini bir arada kullanacağız. Spark ile Cassandra NoSQL veri tabanına bağlanıp bir tabloyu okuyup ekrana yazdıracağız. Bunu yaparken hem RDD hem de Dataframe kullanacağız.

Bu yazıyı uygulamak istiyorsanız Cassandra ve Spark kurulu bir bilgisayara ihtiyacınız var. Benim kullandığım ortam bilgileri şu şekildedir.

  • İşletim sistemi: Ubuntu Desktop
  • Spark versiyonu: 2.3.1
  • Scala versiyonu: 2.11.8
  • Cassandra versiyonu: 3.9
  • Java versiyonu: 1.8
  • IDE: IntelliJ IDEA 2020.1 Community Edition

1. Cassandra Hazırlık

Bu bölümde Cassandra üzerinde spark isminde bir keyspace (database gibi düşünebilirsiniz) ve yesilcam adında da bir tablo oluşturacağız. Tablo oluşturduktan sonra içine bir kaç kayıt gireceğiz.

Keyspace oluşturma:

cqlsh> create keyspace spark with replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

Tablo oluşturma:

cqlsh> use spark;
cqlsh:spark> create table yesilcam(ID INT PRIMARY KEY , NAME STRING, SURNAME STRING, MOVIE_COUNT INT);

Tablo içine kayıt girme:

cqlsh:spark> INSERT INTO yesilcam(ID, NAME, SURNAME, MOVIE_COUNT) VALUES(1, 'Cüneyt','Arkın',323);
cqlsh:spark> INSERT INTO yesilcam(ID, NAME, SURNAME, MOVIE_COUNT) VALUES(2, 'Türkan','Şoray',204);
cqlsh:spark> INSERT INTO yesilcam(ID, NAME, SURNAME, MOVIE_COUNT) VALUES(3, 'Erol','Taş',314);
cqlsh:spark> INSERT INTO yesilcam(ID, NAME, SURNAME, MOVIE_COUNT) VALUES(4, 'Kadir','Savun',303);
cqlsh:spark> INSERT INTO yesilcam(ID, NAME, SURNAME, MOVIE_COUNT) VALUES(5, 'Hülya','Koçyiğit',177);

Girdiğimiz kayıtları kontrol edelim:

cqlsh:spark> select * from yesilcam ;

 id | movie_count | name   | surname
----+-------------+--------+----------
  5 |         177 |  Hülya | Koçyiğit
  1 |         323 | Cüneyt |    Arkın
  2 |         204 | Türkan |    Şoray
  4 |         303 |  Kadir |    Savun
  3 |         314 |   Erol |      Taş

(5 rows)

2. Spark Uygulaması

Intellij IDEA’da bir maven projesi başlattım. Bu projenin pom.xml dosyasının ilgili kısımları şu şekildedir.

    <properties>
        <spark.version>2.3.1</spark.version>
        <java.version>1.8</java.version>
        <junit.version>4.11</junit.version>
        <JAVA8.HOME>$JAVA_HOME</JAVA8.HOME>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.8</scala.version>
    </properties>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
         <!-- https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector -->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>
</project>

Yine Intellij arayüzünden yeni bir object oluşturdum ve adına ReadFromCassandra dedim.

İhtiyaç duyacağımız kütüphaneleri yazalım:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import com.datastax.spark.connector._
import org.apache.spark.SparkConf

SparkConf, SparkSession ve SparkContext oluşturalım. Cassandra bağlantı bilgilerini SparkConf nesnesi oluştururken .set(“spark.cassandra.connection.host”, “localhost”) ile girdiğimize dikkat edelim. Ben kimlik doğrulamayı aktif hale getirmediğim için username ve password özelliklerini yorum satırı yaptım.

Logger.getLogger("org").setLevel(Level.ERROR)

  // create spark conf, sparkSession and sparkContext
  val conf = new SparkConf(true)
    .set("spark.cassandra.connection.host", "localhost")
    .setMaster("local[2]")
    .setAppName("ReadFromCassandra")
  //.set("spark.cassandra.auth.username", "cassandra")
  //.set("spark.cassandra.auth.password", "cassandra")

  val spark = SparkSession.builder()
    .config(conf)
    .getOrCreate()

  import spark.implicits._

  val sc = spark.sparkContext

2.1. RDD Olarak Okuma

RDD olarak yesilcam tablosundan okuma yapalım. Burada da artık Cassandra konnektör kütüphanesini yüklediğimiz için sc içerisinden cassandraTable metodunu kullanabilir hale geliyoruz. İlk parametre keyspace (spark) ikincisi ise tablo ismi (yesilcam).

/************************* Read as RDD   *************************************/
  val yesilcamRdd = sc.cassandraTable("spark", "yesilcam")
  println(yesilcamRdd.take(5).mkString("\n"))

Çıktı:
CassandraRow{id: 1, movie_count: 323, name: Cüneyt, surname: Arkın}
CassandraRow{id: 2, movie_count: 204, name: Türkan, surname: Şoray}
CassandraRow{id: 4, movie_count: 303, name: Kadir, surname: Savun}
CassandraRow{id: 5, movie_count: 177, name: Hülya, surname: Koçyiğit}
CassandraRow{id: 3, movie_count: 314, name: Erol, surname: Taş}

2.2. Dataframe Olarak Okuma

Dataframe olarak Cassandra’dan veri okurken de aslında hemen hemen csv dosyası okur gibi okuyoruz. Bildiğimiz gibi format içine her zaman kaynağı yazıyoruz. Kaynağın detayları zaten spark nesnesi içinde var. Geriye option ile keyspace ve tablo ismini söylemek kalıyor.

/************************* Read as Dataframe   *************************************/
  val yesilcamDF = spark.read
    .format("org.apache.spark.sql.cassandra")
    .options(Map( "table" -> "yesilcam", "keyspace" -> "spark"))
    .load()

  yesilcamDF.show()

Çıktısı:
+---+-----------+------+--------+
| id|movie_count|  name| surname|
+---+-----------+------+--------+
|  1|        323|Cüneyt|   Arkın|
|  2|        204|Türkan|   Şoray|
|  4|        303| Kadir|   Savun|
|  5|        177| Hülya|Koçyiğit|
|  3|        314|  Erol|     Taş|
+---+-----------+------+--------+

Peki şema oluşturabilmiş mi? Onu kontrol edelim:

yesilcamDF.printSchema()
root
 |-- id: integer (nullable = true)
 |-- movie_count: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)

İşte bu kadar. Artık bundan sonra eriştiğiniz bu veriye Spark ile ne yapmak istiyorsanız orası size kalmış.

Bu yazımızda Spark ile Cassandra veri tabanına bağlanarak orada bulunan bir tablodan hem RDD hem DataFrame olarak veri okuduk ve ekrana yazdırdık.

Başka bir yazıda görüşmek dileğiyle hoşça kalın…

Yazar Hakkında
Toplam 179 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara