Airflow EmailOperator Kullanarak E-Posta Gönderme

Merhaba, bu yazıda Airflow EmailOperator’ü kullanarak nasıl e-posta gönderileceğinden bahsedeceğim. İlk olarak Docker ile Airflow’u kuracağız. Kurulumu, Airflow’un resmi web sitesinden indirilebilen docker-compose.yaml dosyasını kullanarak tamamlayabilirsiniz.

Dosya içerisindeki servisler:

  • airflow-scheduler: scheduler tüm görevleri ve DAG’ları izler, ardından bağımlılıkları tamamlandığında görev örneklerini tetikler.
  • airflow-webserver: Web sunucusu http://localhost:8080 adresinde bulunur.
  • airflow-worker: Scheduler tarafından verilen görevleri yürüten işçi.
  • airflow-init: Başlatma servisi.
  • postgres: Veritabanı.
  • redis: Scheduler’dan işçiye iletilen iletileri ileten redis broker.

E-posta göndermek için Gmail SMTP sunucusunu kullanacağız. Bunun için öncelikle docker-compose dosyasında SMTP sunucusunun ayarlarını eklemek gerekiyor. Bu ayarlar doğru olmadığı takdirde e-postalar gönderilemeyecektir.

Gmail için SMTP ayarları şu şekildedir:

  • SMTP sunucusu: smtp.gmail.com
  • Port: 587 (TLS) veya 465 (SSL)
  • Kullanıcı adı: Gmail hesabınızın tam adı veya e-posta adresi
  • Şifre: Gmail hesabınızın şifresi

Eğer iki adımlı doğrulamanız açık ise uygulama şifresi oluşturmak için buraya gidebilirsiniz.

Şekil-1: Uygulama Şifreleri Sayfası
Şekil-1: Uygulama Şifreleri Sayfası

Turuncu kutu içerisinde yer alan şifreyi kopyalayın.

Şekil-2: Uygulama Şifresi
Şekil-2: Uygulama Şifresi

Ve ardından aşağıdaki değişkenleri kendi bilgilerinize göre doldurarak docker-compose dosyasında ‘x-airflow-common’ altında yer alan ‘environment’ kısmına ekleyelim.

AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' 
AIRFLOW__SMTP__SMTP_MAIL_FROM: 'YOUR_EMAIL' 
AIRFLOW__SMTP__SMTP_USER: 'YOUR_EMAIL' 
AIRFLOW__SMTP__SMTP_PASSWORD: 'YOUR_PASSWORD' 
AIRFLOW__SMTP__SMTP_PORT: '587'

Yukarıdaki ayarları ekledikten sonra docker-compose dosyasının son hali şöyle olmalı:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME         - Docker image name used to run Airflow.
#                              Default: apache/airflow:master-python3.8
# AIRFLOW_UID                - User ID in Airflow containers
#                              Default: 50000
# AIRFLOW_GID                - Group ID in Airflow containers
#                              Default: 50000
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account.
#                              Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account.
#                              Default: airflow
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com'
    AIRFLOW__SMTP__SMTP_MAIL_FROM: 'YOUR_EMAIL'
    AIRFLOW__SMTP__SMTP_USER: 'YOUR_EMAIL'
    AIRFLOW__SMTP__SMTP_PASSWORD: 'YOUR_PASSWORD'
    AIRFLOW__SMTP__SMTP_PORT: '587'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./datasets:/opt/airflow/datasets
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    ports:
      - "6379:6379"
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "<http://localhost:8080/health>"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: always

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    restart: always

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "<http://localhost:5555/>"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

volumes:
  postgres-db-volume:

Evet, servisleri çalıştırmak için hazırız. Docker Compose dosyasının bulunduğu dizinde terminali açıp Airflow servislerini çalıştırabiliriz.

docker-compose up airflow-init
docker-compose up -d
docker-compose ps

Servisler doğru şekilde başlatıldıysa docker-compose ps komutunun çıktısı aşağıdaki gibi olmalı:

Şekil-3: docker-compose ps komutunun çıktısı
Şekil-3: docker-compose ps komutunun çıktısı

Docker Compose çalıştıktan sonra docker-compose dosyasının bulunduğu dizinde aşağıdaki klasörler oluşacaktır.

Şekil-3: Airflow’un oluşturduğu dizinler
Şekil-4: Airflow’un oluşturduğu dizinler

 

Daha sonra ‘dags’ dizininde send_email isminde bir Python dosyası oluşturup aşağıdaki kodu içerisine kopyalayalım.

NOT: ’email’ ve ‘to_email’ değişkenlerine kendi e-posta adresinizi atayınız.

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator

email = 'your_email'
to_email ='your_email'

default_args = {
    "owner": "vbo",
    "start_date": datetime(2023, 2, 16),
    'email': email,
    'email_on_failure': True,
}

with DAG(dag_id="airflow_email_notification",
         schedule_interval="@once",
         default_args=default_args,
         ) as dag:
    sending_email = EmailOperator(
        task_id='sending_email',
        to=to_email,
        subject='Airflow Alert !!!',
        html_content="""<h1>Testing Email using Airflow</h1>""",
    )

    sending_email

Airflow UI sayfasına gidin. Giriş yapmak için:

  • Kullanıcı adı: airflow
  • Şifre: airflow

bilgilerini kullanabilirsiniz. Ardından send_email isimli dag’ı aktif etmeniz gerekiyor.

Şekil-5: Aktif DAGs
Şekil-5: Aktif DAGs

 

DAG’ı aktifleştirdiğinizde tetiklenecek ve bir e-posta gönderilecektir.

Şekil-6: Başarılı İşlem
Şekil-6: Başarılı İşlem

 

Ve posta kutunuzu kontrol edebilirsiniz. Eğer bir sorun oluşmadıysa, aşağıdaki gibi bir e-posta gelmiş olmalıdır.

Şekil-7: E-Posta
Şekil-7: E-Posta

 

Evet, yazının sonuna geldik. Airflow ile e-posta göndermenin yanı sıra birçok farklı işlemi gerçekleştirmek mümkündür. Örneğin, Airflow kullanarak veri akışlarını otomatikleştirebilir, farklı sistemler arasında veri transferi yapabilir veya veri analizi yapabilirsiniz. Bunların hepsi, Airflow’un sahip olduğu güçlü özelliklerden sadece birkaçıdır. Bu yazıda, sadece temel bir özellik olan e-posta göndermekle ilgili bir örnek verdik. Tekrar görüşmek üzere.

Kaynaklar

Running Airflow in Docker — Airflow Documentation

Sending Emails using Airflow EmailOperator Simplified: 9 Easy Steps

 

Yazar Hakkında
Toplam 9 yazı
Seda KAYADEMİR
Seda KAYADEMİR
Üniversite yıllarında tanışmış olduğu veri dünyasına ilgisi artan ve bu alanda kariyer hedeflemiş bir süre yazılım mühendisi olarak çalışmış ve hedefi doğrultusunda kendini geliştirerek kariyerine veri mühendisi olarak devam etmektedir. İlgilendiği Alanlar; Dataops, Mlops, Bulut Teknolojileri.
Yorumlar (Yorum yapılmamış)

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir

×

Bir Şeyler Ara