Apache Flink FileStream Window Aggregation
Merhabalar. Bu yazımızda Apache Flink ile FileStream kaynağından veri okuyacağız ve okuduğumuz bu veri üzerinde zaman pencereli (window) aggregation yaparak ekrana yazdıracağız. Çalışmamda kullandığım ortam bilgileri şu şekildedir: Ubuntu, Intellij IDEA (maven projesi), Java8, Scala 2.11, Flink 1.9.2, github projesine buradan erişebilirsiniz.
Uygulamamızda iris veri setini (data-generator input klasörü içinde mevcuttur) data-generator ile bir dizine loglar halinde üreteceğiz. Flink datastream api ile bu dizini izleyerek okuduğumuz veri üzerinden 7 saniyelik pencereler ile çiçek türlerini sayacağız. Yani son yedi saniyede hangi çiçek türünden ne kadar üretiliyor onu göreceğiz. Şimdi başlayalım kodlamaya.
Kütüphaneleri indirelim
import java.sql.Timestamp import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows} import org.apache.flink.streaming.api.windowing.time.Time
Flink StreamExecutionEnvironment
Her Flink uygulaması bir environment oluşturmakla başlar burada StreamExecutionEnvironment oluşturuyoruz.
// Create stream execution env val env = StreamExecutionEnvironment.getExecutionEnvironment
Apache Flink ile Veri Okuma
Veri kaynağı olan lokal dizini izleyelim. Monitör etme veya okuma da diyebiliriz.
val irisDataStreams = env.readFileStream(StreamPath = "file:///home/erkan/data-generator/output", intervalMillis=1000L)
Apache Flink Datastream Transformations ve Aggregations
Şimdi veriye şema giydirelim. Ben data-generator ile bir kaç log dosyası ürettim ve şemaya aşağıdaki şekilde karar verdim. Scala’nın meşhur case class yapısını kullanacağıız.
case class Iris(SepalLengthCm: Double, SepalWidthCm: Double, PetalLengthCm: Double, PetalWidthCm: Double, Species: String, ts: String)
Yukarıda tanımladığımız şemayı veriye giydirelim. Aynı zamanda ayraçtan(,) alanları bölüyoruz.
val structuredIris = irisDataStreams.map(line => { val SepalLengthCm = line.split(",")(0).toDouble val SepalWidthCm = line.split(",")(1).toDouble val PetalLengthCm = line.split(",")(2).toDouble val PetalWidthCm = line.split(",")(3).toDouble val Species = line.split(",")(4) val ts = line.split(",")(5) Iris(SepalLengthCm, SepalWidthCm, PetalLengthCm, PetalWidthCm, Species, ts) })
Şimdi geldik asıl operasyona. Benim amacım son 7 saniyede üretilen çiçek türlerini saymaktı bu yüzden sadece ilgilendiğim alanı (Species) seçiyorum ve yanına 1 koyuyorum (structuredIris.map(x => (x.Species, 1)
). Böyle yaparak (String, Int)
yapısında bir tuple elde ettim. Bunun üzerine window kullanıyorum. Flink’te farklı bir çok window var. Buradaki TumblingWindow zaman bazlı olup hiç bindirme yapmadan aralıkları kesin olarak ayırır. sum(1)
ifadesi ise (String, Int) yapısındaki Int üzerinde yani 1’inci indeks üzerinde toplayarak git demek. Rakamlar hep 1 olduğu için bunlar toplandığında satırları saymış oluruz. Yani her satırda farklı bir çiçek türü olduğundan o türe bir ilave etmiş oluyoruz.
structuredIris.map(x => (x.Species, 1)) .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(7))) .sum(1) .print()
Son olarak ekrana print ediyoruz.
Akışı başlatmak için env.execute()
diyoruz.
env.execute("Iris Windowed Count")
Kodları çalıştırma
Uygulamayı ide üzerinden çalıştıralım. Ardından da data-generator’ü çalıştıralım. Bununla ilgili bilgi github sayfasında olduğu için buraya dahil etmiyorum oradaki bilgilerle rahatlıkla çalıştırabileceğinizi düşünüyorum. Benim data-generator ile çalıştırdığım komut:
(datagen) erkan@ubuntu:~/data-generator$ python dataframe_to_log.py -shf True -b 0.1 -r 5
Belli bir süre çalıştıktan sonra ide çıktı penceresine aşağıdakine benzer bir sonuç yazdıracaktır. Her seferinde kaç farklı çiçek türü varsa o kadar satır yazdıracaktır. Rakamlar son 7 saniyede o çiçek türünden üretilen miktarı gösterir.
3> (Iris-virginica,22) 4> (Iris-setosa,14) 1> (Iris-versicolor,14) 4> (Iris-setosa,25) 1> (Iris-versicolor,24) 3> (Iris-virginica,21) 3> (Iris-virginica,24) 4> (Iris-setosa,22) 1> (Iris-versicolor,24) 3> (Iris-virginica,22) ... ... ...
Sonuç
Yukarıda gördüğümüz gibi her üç çiçek türünden son 7 saniyede ne kadar üretildiğini sayan bir Flink Streaming uygulaması yazmış olduk.
Başka bir yazıda görüşmek dileğiyle hoşça kalın.
Kapak görseli: Alexey Savchenko on Unsplash