Apache Spark, Minio, Nessie Catalog, Iceberg ve Docker ile Lakehouse Örneği
Veri ambarı (data warehouse) ile veri gölünün (data lake) en iyi yönlerini birleştirerek bizlere ilişkisel veri tabanı konforunu büyük veri üzerinde sunan lakehouse çözümleri gün geçtikçe hayatımızdaki yerini alıyor. Bugün burada tamamen açık kaynak kodlu bileşenleri kullanarak docker üzerinde basit bir lakehouse örneği yapacağız.
1. Altyapıyı Oluşturan Bileşenler
1.1. Nessie
Nessie veri gölleri için transactional katalog sunan Apache Hive Metastore işlevi gören açık kaynaklı bir projedir. Hive metastore ile arasındaki en büyük fark veri versiyonlaması (Git-inspired data version control) özelliğidir.
1.2. MinIO
MinIO Amazon S3 uyumlu popüler bir nesne tabanlı depolama (object storage) çözümüdür. Özellikle S3 kullanılamayan kapalı ortamlarda ve Kubernetes ile birlikte nesne tabanlı depolama ihtiyacını çözmektedir.
1.3. Apache Spark
Apache Spark açık kaynaklı dağıtık veri işleme motorudur. Veri mühendisliği, veri bilimi, veri analizi, graf analizi, makine öğrenmesi, gerçek zamanlı veri işleme gibi bir çok ihtiyaç için yaygın olarak kullanılmaktadır.
1.4. Apache Iceberg
Iceberg, büyük analitik tablolar için yüksek performanslı bir formattır. Iceberg, SQL tablolarının güvenilirliğini ve basitliğini büyük verilere taşırken Spark, Trino, Flink, Presto, Hive ve Impala gibi araçların aynı anda aynı tablolarla güvenli bir şekilde çalışmasını mümkün kılar.
2. Docker Compose ile Altyapıyı Ayağa Kaldırma
Aşağıdaki docker-compose.yaml dosyasını kullanarak örneğimiz için altyapıyı ayağa kaldıracağız.
version: "3" services: # Nessie Catalog Server nessie: image: projectnessie/nessie:0.67.0 container_name: nessie networks: vbo: ports: - 19120:19120 # Minio minio: image: "minio/minio:RELEASE.2023-05-04T21-44-30Z" container_name: minio environment: - MINIO_ROOT_USER=minioadmin - MINIO_ROOT_PASSWORD=minioadmin networks: vbo: ports: - 9001:9001 - 9000:9000 command: ["server", "/data", "--console-address", ":9001"] # Spark spark: container_name: spark image: veribilimiokulu/pyspark-3.4.1_python-3.8:1.0 ports: - "8888:8888" - "4041:4040" networks: - vbo volumes: - ./spark/examples:/opt/examples command: sleep infinity networks: vbo:
Docker compose konteynırlarımızı ayağa kaldıralım.
docker-compose up -d
3. MinIO Web Arayüzü
http://localhost:9001/login adresinden kullanıcı: minioadmin, şifre: minioadmin ile login olalım.
warehouse adında bir bucket oluşturalım.
4. Spark
Spark konteynıra bağlanalım.
docker exec -it spark bash
4.1. Jupyter Lab Çalıştırma
Jupyter lab kullanmak istiyorum. Bunun için jupyterlab paketini ve Jupyter’e Spark’ı göstermek için findspark paketlerini yükleyeceğim.
pip install jupyterlab findspark
Jupyter Lab’ı başlatalım
jupyter lab --ip 0.0.0.0 --port 8888 --allow-root
Info loglarındaki linki alıp tarayıcımıza yapıştırınca karşımızda Jupyter Lab’ı bulacağız.
4.2. Spark Uygulaması
İlk hücrede gerekli kütüphaneleri import edip Spark’ın yerini söyleyelim.
import findspark findspark.init("/opt/spark/") from pyspark.sql import SparkSession, functions as F
Önemli değişkenleri tanımlayalım. Gerekli deteylar yorum olarak mevcuttur.
# Spark'ın katalog olarak Nessie'ye erişeceği url url = "http://nessie:19120/api/v1" # Nessie tablolarının tutulacağı bucket full_path_to_warehouse = 's3a://warehouse' # Bessie için kullanacağımız branch ref = "main" # Nessie authentication türü. Diğer seçenekler (NONE, BEARER, OAUTH2 or AWS) auth_type = "NONE" # AWS S3 yerine MinIO kullandığımız için. Spark'a amazona gitme burada kal demek için. s3_endpoint = "http://minio:9000" # MinIO'ya erişim için. Bunlar root olarak docker-compose içinde belirtiliyor. Bu haliyle canlı ortamlarda kullanılmamalıdır. accessKeyId='minioadmin' secretAccessKey='minioadmin'
4.2.1. Spark Konfigürasyon
SarkSession oluştururken bazı konfigürasyonlar kullanacağız.
spark = ( SparkSession.builder .master("local[2]") .appName("Spark Nessie Iceberg Demo") .config("spark.driver.memory", "2g") # Nessie, Iceberg, Delta, MinIO (S3) paketlerini uygun versiyonları ile buraya eklemeliyiz. .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.0,io.delta:delta-core_2.12:2.4.0,org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.4_2.12:0.75.0') # MinIO (S3) erişimi için gerekli anahtarlar. .config("spark.hadoop.fs.s3a.access.key", accessKeyId) .config("spark.hadoop.fs.s3a.secret.key", secretAccessKey) .config("spark.hadoop.fs.s3a.path.style.access", True) .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") # Spark Amazon S3 varsayılan API'sine değil lokaldeki MinIO'ya gitsin. .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint) # Spark extensions arasından Iceberg ve Nessie .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions") # Spark Nessie'yi nerede bulacak onun adresi .config("spark.sql.catalog.nessie.uri", url) # Hangi branch ile çalışacak .config("spark.sql.catalog.nessie.ref", ref) # Nessie'ye her gelen birşey sormasın. Hangi auth yöntemi ile sorulacak. Burada yok. .config("spark.sql.catalog.nessie.authentication.type", auth_type) # Katalog nessie olsun. .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") # Spark katalog implementasyonu iceberg.nessie olsun. Varsayılan kendi lokali .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog") # Varsayılan warehouse adresini minio s3 warehouse bucket gösteriyoruz. .config("spark.sql.catalog.nessie.warehouse", full_path_to_warehouse) .getOrCreate() )
4.2.2. Spark Dataframe
Github’dan bir veri seti okuyup spark dataframe oluşturalım.
from pyspark import SparkFiles sc = spark.sparkContext github_url="https://raw.githubusercontent.com/erkansirin78/datasets/master/Churn_Modelling.csv" sc.addFile(github_url) df = spark.read.csv(SparkFiles.get("Churn_Modelling.csv"),header= True, inferSchema=True) df.show(3)
4.2.3.Nessie Namespace ve Tablo Oluşturma
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.demo;") spark.sql("DROP TABLE IF EXISTS nessie.demo.churn;") spark.createDataFrame([], df.schema).writeTo("nessie.demo.churn").create()
4.2.4.Spark Dataframe’i Nessie Tabloya Yazma
Dataframe’i tabloya yazma
df.write.format("iceberg").mode("overwrite") \ .save("nessie.demo.churn")
4.2.4. Nessie Tablo’dan Spark ile Okuyarak Spark Dataframe Oluşturma
Yazdığımız tablodan okyup tekrar dataframe oluşturma.
df_from_iceberg = spark.table("nessie.demo.churn") df_from_iceberg.show()
Bu yazımızda açık kaynak araçlarla lakehouse üzerinde bir tablo oluşturma ve bu tabloya spark ile okuma yazma örneği yapmış olduk.
Data Engineering konusunda çok daha fazlasını öğrenmek ve bir kariyer olarak seçmek isterseniz Data Engineering Bootcamp programımızı tavsiye ederim.
Başka bir yazıda görüşene dek hoşçakalın.
Notebook ve compose dosyası burada.
Not: Bu yazının hazırlanmasında [1, 2] yazılarından çok faydalandım teşekkür ederim.
Kapak Foto: Photo by Alexander Hafemann on Unsplash
Kaynaklar
- https://www.dremio.com/a-notebook-for-getting-started-with-project-nessie-apache-iceberg-and-apache-spark/
- https://medium.com/@khurrammeraj17/creating-a-lakehouse-by-using-apache-spark-minio-nessie-catalog-and-dremio-67c23a335616