Apache Flink Dataset Api ile Temel Veri Analizi

Herkese merhabalar. Bu yazımızda Apache Flink Dataset Api ile örnek veri setleri üzerinden veri analizi yapacağız. Yapacağımız örnek meşhur retail_db veri tabanındaki csv dosyalarını kullanarak tutar bakımından en çok iptal edilen ürünleri bulmaya çalışmak olacak. Çalışmamda kullandığım ortam bilgileri şu şekildedir: Ubuntu, Intellij IDEA (maven projesi), Java8, Scala 2.11, Flink 1.9.2, veri setlerine buradan github projesine buradan erişebilirsiniz.

Apache Flink büyük veri dünyasında akan veri işlemeye odaklı olarak başlayan ve halihazırda bu alanda en iyi proje olmayı başarmış bir projedir.  Benim görüşüme göre elbette, yoksa bunun böyle kolaylıkla söylenemeyecek bir iddia olduğunun farkındayım.

Uzun zamandır Spark kullanıyorum. Biraz da Flink’e bakayım  ve hadi şu Spark’ta yaptığım basit bir işi Flink ile de yapayım dedim. İş yukarıda bahsettiğim iş. Ancak farkettim ki Spark ile karşılaştırdığımda Flink ile ilgili neredeyse hiç bir kaynak, örnek, eğitim, kitap vs yok. Aldığım hataları gidermekte bile zorlandım, hatta bazıları için Stackoverflow’da soru sorarak yardım istedim. Sonra düşündüm ki öğrendiklerimi insalarla paylaşayım ve hiç olmazsa Flink ile bir dosya nasıl okunur ve analiz edilir bir örneği olsun.

Şunu belirtmeliyim ki dataset api datastream api’ye göre biraz üvey evlat. Çünkü Flink’in asıl iddiası akan veri tarafında. Bunu söylüyorum ki sizi yanlış yönlendirmiş olmayayım.

Kullandığım kütüphaneler

import org.apache.flink.api.scala._
import java.sql.Timestamp
import org.apache.flink.api.common.operators.Order

Spark’ta nasıl SparkSession varsa Flink’te de ExecutionEvironment var. Her işe başlarken bunu oluşturuyoruz.

val env = ExecutionEnvironment.getExecutionEnvironment

İş problemi ( tutar bakımından en çok iptal edilen ürünleri bulma ) bana;

  1. products, order_items ve orders verisetlerini kullanmam,
  2. Önce orders ile order_items’ı birleştirmemi, sonra bunlara products’ı eklemem,
  3. İptal edilen ürünleri filtreleyip ürün adına göre gruplarken tutarları toplamam,
  4. Sonucu da azalan şekilde sıralamam

gerektiğini söylüyor.

Case Class ile Şema Tanımlama

Önce Products ile başlıyorum. Tabi şema derdi burada da var. Bu derdi Scala kullandığımız için case class ile rahatlıkla hallediyoruz. Yani veriye bakıyorum (linux head olabilir) başlık var mı, neyle ayrılmış, veri türleri ne olabilir inceliyorum ve sonra aşağıdaki veri yapısını oluşturuyorum:

// Define Products schema
  case class Products(productId: Int,
                       productCategoryId: Int,
                       productName: String,
                       productDescription: String,
                       productPrice: Double,
                       productImage: String)

Flink ile  Veri Okuma

Şimdi products.csv’yi okuyalım ve bir göz atalım.

val productsDS = env.readCsvFile[Products]("/home/erkan/datasets/retail_db/products2.csv",
    fieldDelimiter=",", ignoreFirstLine = true, quoteCharacter = '"')

  println(productsDS.count())
  // 1345

  productsDS.first(5).print()
  /*
  Products(1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U,,59.98,http://images.acmesports.sports/Quest+Q64+10+FT.+x+10+FT.+Slant+Leg+Instant+Up+Canopy)
  Products(2,2,Under Armour Men's Highlight MC Football Clea,,129.99,http://images.acmesports.sports/Under+Armour+Men%27s+Highlight+MC+Football+Cleat)
  Products(3,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat)
  Products(4,2,Under Armour Men's Renegade D Mid Football Cl,,89.99,http://images.acmesports.sports/Under+Armour+Men%27s+Renegade+D+Mid+Football+Cleat)
  Products(5,2,Riddell Youth Revolution Speed Custom Footbal,,199.99,http://images.acmesports.sports/Riddell+Youth+Revolution+Speed+Custom+Football+Helmet)
   */

Orders için şema tanımlama ve veri okuyup bir göz atma

// Define Orders schema
  case class Orders(orderId: Int,
                    orderDate: Timestamp,
                    orderCustomerId: Int,
                    orderStatus: String)

  // Create dataset from file
  val ordersDS = env.readCsvFile[Orders]("/home/erkan/datasets/retail_db/orders.csv",
    fieldDelimiter=",", ignoreFirstLine = true)

  println(ordersDS.count())
  // 68883

  ordersDS.first(5).print()
  /**
   * Orders(51730,2014-06-14 00:00:00.0,2153,COMPLETE)
   * Orders(51731,2014-06-14 00:00:00.0,8895,PROCESSING)
   * Orders(51732,2014-06-14 00:00:00.0,10222,PENDING)
   * Orders(51733,2014-06-14 00:00:00.0,4078,CANCELED)
   * Orders(51734,2014-06-14 00:00:00.0,6522,PENDING_PAYMENT)
   */

Order Items için şema tanımlama ve veri okuyup bir göz atma

// Define Order Items Schema
  case class OrderItems(orderItemName: Int,
                        orderItemOrderId: Int,
                        orderItemProductId: Int,
                        orderItemQuantity: Int,
                        orderItemSubTotal: Double,
                        orderItemProductPrice: Double)
  val orderItemsDS = env.readCsvFile[OrderItems](filePath = "/home/erkan/datasets/retail_db/order_items.csv",
    fieldDelimiter = ",", ignoreFirstLine = true)

  println(orderItemsDS.count())
  // 172198

  orderItemsDS.first(5).print()
  /**
   * OrderItems(1,1,957,1,299.98,299.98)
   * OrderItems(2,2,1073,1,199.99,199.99)
   * OrderItems(3,2,502,5,250.0,50.0)
   * OrderItems(4,2,403,1,129.99,129.99)
   * OrderItems(5,4,897,2,49.98,24.99)
   */

Flink Join

Şimdi orders ile order_items’ı birleştirelim. Birleşme anahtaların kod içinde yorumu mevcuttur. where ve equalTo içindeki rakamlar ilgili join anahtarların dataset içindeki sütun indeksleridir.

// where refers left dataset (ordersDS) orderId, equalTo refers right dataset (orderItemsDS) orderItemOrderId
 val ordersJoinedDS = ordersDS.join(orderItemsDS).where(0).equalTo(1)

 ordersJoinedDS.first(5).print()
 /**
  * (Orders(35211,2014-02-27 00:00:00.0,4849,COMPLETE),OrderItems(87944,35211,365,4,239.96,59.99))
  * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87954,35214,403,1,129.99,129.99))
  * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87955,35214,1014,3,149.94,49.98))
  * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87956,35214,1004,1,399.98,399.98))
  * (Orders(35214,2014-02-27 00:00:00.0,388,PENDING_PAYMENT),OrderItems(87957,35214,365,4,239.96,59.99))
  */
 println(ordersJoinedDS.count())
 // 172198

Şimdi iki elemanlı bir tuple’dan oluşan bir Dataset elde ettim.  Ancak hala products’ı eklemedik onu da ekleyelim tam olsun.

// Join ordersJoinedDS and products
  val ordersWithProductsDS = ordersJoinedDS.join(productsDS).where(_._2.orderItemProductId).equalTo(_.productId)

  println(ordersWithProductsDS.count())
  // 172198

  ordersWithProductsDS.first(5).print()
  /*
  ((Orders(53282,2014-06-25 00:00:00.0,4285,PENDING_PAYMENT),OrderItems(133137,53282,37,2,69.98,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat))
  ((Orders(53364,2014-06-26 00:00:00.0,9001,COMPLETE),OrderItems(133333,53364,37,1,34.99,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat))
  ((Orders(53539,2014-06-27 00:00:00.0,1499,PROCESSING),OrderItems(133801,53539,37,5,174.95,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat))
  ((Orders(21,2013-07-25 00:00:00.0,2711,PENDING),OrderItems(66,21,37,2,69.98,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat))
  ((Orders(36176,2014-03-04 00:00:00.0,9331,PENDING),OrderItems(90342,36176,37,2,69.98,34.99)),Products(37,3,adidas Kids' F5 Messi FG Soccer Cleat,,34.99,http://images.acmesports.sports/adidas+Kids%27+F5+Messi+FG+Soccer+Cleat))
   */

Flink Group By Order By

Şimdi son dokunuşu yapıyoruz:

// Find most selling products in terms of total amount
  ordersWithProductsDS.map(x => (x._2.productName, x._1._1.orderStatus, x._1._2.orderItemSubTotal))
    .filter(_._2.contains("CANCELED"))
    .groupBy(0)
    .sum(2)
    .sortPartition(2, Order.DESCENDING).setParallelism(1)
    .first(10).print()
    /*
    (Field & Stream Sportsman 16 Gun Fire Safe,CANCELED,134393.27999999965)
    (Perfect Fitness Perfect Rip Deck,CANCELED,85785.70000000008)
    (Nike Men's Free 5.0+ Running Shoe,CANCELED,80691.9300000001)
    (Diamondback Women's Serene Classic Comfort Bi,CANCELED,80094.6600000001)
    (Pelican Sunstream 100 Kayak,CANCELED,66196.6899999998)
    (Nike Men's Dri-FIT Victory Golf Polo,CANCELED,65750.0)
    (Nike Men's CJ Elite 2 TD Football Cleat,CANCELED,60705.32999999974)
    (O'Brien Men's Neoprene Life Vest,CANCELED,58126.74000000006)
    (Under Armour Girls' Toddler Spine Surge Runni,CANCELED,26153.459999999992)
    (LIJA Women's Eyelet Sleeveless Golf Polo,CANCELED,2145.0)
     */

Evet bu kadar. Satış tutarı bakımından en çok iptal edilen ürünleri bulduk.

Kapak Görseli: Caleb Martin on Unsplash

Yazar Hakkında
Toplam 179 yazı
Erkan ŞİRİN
Erkan ŞİRİN
10 yılı aşkın süredir yurtiçi ve yurtdışında sektörde büyük veri mühendisliği, platform yönetimi ve makine öğrenmesi ile ilgili çalışmalar yürütmekte ve aynı zamanda birçok kurum ve şirkete danışmanlık ve eğitimler vermektedir. Çalışma alanları: Data ve MLOps platformları, gerçek zamanlı veri işleme, değişen veriyi yakalama (CDC) ve Lakehouse.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara