Apache Flink HDFS: Okuma ve Yazma
Merhabalar. Bu yazımızda Apache Flink Datastream API ile HDFS’ten veri okuma ve tekrar HDFS’e yazma ile ilgili basit bir örnek yapacağız. Bu yazıyı yazmaya beni motive eden sebep şu oldu: Spark ile HDFS’e yazmaya alışkınız artık çocuk oyuncağı gibi geliyor. Apache Flink HDFS ne kadar zor olabilir ki diye düşündüm ve kendim bir deneyeyim dedim. O da ne? Hiç de beklediğim kadar kolay değilmiş. Aslında bilmeyene kolay şeyler de zor olur ancak ben en azından bir sürü örnek bulur onlardan birini alır kendime uyarlar rahatlıkla bu işi kıvırırım diye düşünmüştüm. Ancak o bir sürü örnek yokmuş. Olan bir kaç zayıf örnek de eski versiyonlara aitmiş. Çıkan örneklerde kullanılan flink-connector-filesystem kütüphanesi versiyon 1.11.3’te durmuş. Ben şuan itibariyle son sürüm olan 1.13.0 kullanıyorum. Dolayısıyla örneklerden yürüme kolaycılığı işime yaramadı ve Flink dokümanlarını karıştırarak bir şeyler çıkarmaya çalıştım. Aslında işin doğrusu bu, yani birincil kaynaklarla ilerlemek ancak dokümanların bir sıkıntılı yönü her türlü seçeneği ve alternatifi barındırdıkları için gideceğiniz yolu seçmekte zorlanmanız, hatta çoğu zaman dokümanların içinde kaybolmanız. Neyse laf salatasını fazla uzatmadan geçelim örneğe.
Ortam Bilgileri
Flink: 1.13.0
Hadoop 3.1.2
Scala: 2.12
Java: 1.8.0
IDE: IntelliJ IDEA
Proje türü: Maven
OS: CentOS7
Kütüphaneler
import java.util.concurrent.TimeUnit import org.apache.flink.api.common.serialization.SimpleStringEncoder import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
Execution Environment Oluşturma
Spark’ta SparkSession olduğu gibi Flink’te de buna benzer bir ExecutionEnvironment var. Bunu oluşturalım.
// create an execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment
HDFS kaynağından okuma
HDFS kaynağından okumak için çalışan bir Hadoop’un olması gerekiyor. Benim namenode’um localhost:9000‘den erişilebilir durumda. Ayrıca okumak istediğim veri seti /user/train/datasets dizininde mevcut.
// read data from hdfs val stream = env.readTextFile("hdfs://localhost:9000/user/train/datasets/Advertising.csv")
Okuduklarımızı bir görelim. Bunun debugging’den başka bir amacı yok.
// print what you have read to console for debugging stream.print()
HDFS’e yazma
HDFS’e yazmadan önce biraz hazırlık yapmak gerekiyor. Aslında bir FileSink nesnesi oluşturuyoruz. Önce HDFS’te nereye yazacağız onu bir tanımlayalım. Bir çok yerde hedef dizini bir string içinde tanımlamaya alışkınız ancak burada Path sınıfından oluşturacağımız bir nesne içinde saklayacağız. Elbette bu benim tercihim değil 🙂
// define an hdfs directory for sink val outputPath = new Path("hdfs://localhost:9000/user/flink/advertising")
FileSink oluşturma
// create a FileSink val sink: StreamingFileSink[String] = StreamingFileSink .forRowFormat(outputPath, new SimpleStringEncoder[String]("UTF-8")) .withRollingPolicy( DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) .withMaxPartSize(1024 * 1024) .build()) .build()
Oluşturduğumuz file sink’i okuduğumuz veri ile buluşturalım
// Write to hdfs stream.addSink(sink)
Akışı başlatma
// start the streaming app env.execute("Read Write HDFS")
Şimdi artık IntelliJ üzerinden local modda Flink’i çalıştırabiliriz. Bundan önce şu ön hazırlıkları yapmış olmak gerekiyor:
- Ortam değişkenleri tanımlı olmalıdır: HADOOP_HOME ve HADOOP_CLASSPATH
- pom.xml içinde şu dependency eklenmelidir
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.2</version> </dependency>
Çalıştırmak için kodların içine sağ tıklayıp güzel yeşil run üçgeni seçiyoruz.
IntelliJ consolde aşağıdakine benzer bir çıktı görmeliyiz.
3> 103,280.2,10.1,21.4,14.8 2> 52,100.4,9.6,3.6,10.7 4> 152,121,8.4,48.7,11.6 1> ID,TV,Radio,Newspaper,Sales 4> 153,197.6,23.3,14.2,16.6 1> 1,230.1,37.8,69.2,22.1 3> 104,187.9,17.2,17.9,14.7 . . . . . .
Ayrıca sonucu hdfs komutları ile kontrol edelim.
[train@localhost ~]$ hdfs dfs -head /user/flink/advertising/2021-05-16--13/.part-3-0.inprogress.d05f6e10-b849-41b1-9307-7e623b53ec73 152,121,8.4,48.7,11.6 153,197.6,23.3,14.2,16.6 154,171.3,39.7,37.7,19 155,187.8,21.1,9.5,15.6 156,4.1,11.6,5.7,3.2 157,93.9,43.5,50.5,15.3 158,149.8,1.3,24.3,10.1 159,11.7,36.9,45.2,7.3 160,131.7,18.4,34.6,12.9 161,172.5,18.1,30.7,14.4 162,85.7,35.8,49.3,13.3 163,188.4,18.1,25.6,14.9
Okuduğumuzun aynısını yazmış görünüyoruz.
Belki başka daha iyi yöntemleri olabilir ancak şimdilik benim keşfedebildiğim bu. Ayrıca farklı formatlarda (parquet, avro, orc vb.) yazma ve sıkıştırma seçenekleri de muhakkak vardır. Yazıda bunlara değinemedik. StreamingFileSink içindeki seçeneklere ve bunların ne anlama geldiklerine de değinmedim. Bu yazıdaki temel kaygı iyi kötü farketmez basit bir Flink HDFS okuma yazma örneği paylaşmaktı. Zengin seçenekler artık bu temel yapının üzerine inşa edilebilir.
Başka bir yazıda görüşmek dileğiyle hoşça kalın.
Kapak Görseli: by Benoit Gauzere on Unsplash