Airflow EmailOperator Kullanarak E-Posta Gönderme
Merhaba, bu yazıda Airflow EmailOperator’ü kullanarak nasıl e-posta gönderileceğinden bahsedeceğim. İlk olarak Docker ile Airflow’u kuracağız. Kurulumu, Airflow’un resmi web sitesinden indirilebilen docker-compose.yaml dosyasını kullanarak tamamlayabilirsiniz.
Dosya içerisindeki servisler:
airflow-scheduler
: scheduler tüm görevleri ve DAG’ları izler, ardından bağımlılıkları tamamlandığında görev örneklerini tetikler.airflow-webserver
: Web sunucusuhttp://localhost:8080
adresinde bulunur.airflow-worker
: Scheduler tarafından verilen görevleri yürüten işçi.airflow-init
: Başlatma servisi.postgres
: Veritabanı.redis
: Scheduler’dan işçiye iletilen iletileri ileten redis broker.
E-posta göndermek için Gmail SMTP sunucusunu kullanacağız. Bunun için öncelikle docker-compose dosyasında SMTP sunucusunun ayarlarını eklemek gerekiyor. Bu ayarlar doğru olmadığı takdirde e-postalar gönderilemeyecektir.
Gmail için SMTP ayarları şu şekildedir:
- SMTP sunucusu: smtp.gmail.com
- Port: 587 (TLS) veya 465 (SSL)
- Kullanıcı adı: Gmail hesabınızın tam adı veya e-posta adresi
- Şifre: Gmail hesabınızın şifresi
Eğer iki adımlı doğrulamanız açık ise uygulama şifresi oluşturmak için buraya gidebilirsiniz.
Turuncu kutu içerisinde yer alan şifreyi kopyalayın.
Ve ardından aşağıdaki değişkenleri kendi bilgilerinize göre doldurarak docker-compose dosyasında ‘x-airflow-common’ altında yer alan ‘environment’ kısmına ekleyelim.
AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' AIRFLOW__SMTP__SMTP_MAIL_FROM: 'YOUR_EMAIL' AIRFLOW__SMTP__SMTP_USER: 'YOUR_EMAIL' AIRFLOW__SMTP__SMTP_PASSWORD: 'YOUR_PASSWORD' AIRFLOW__SMTP__SMTP_PORT: '587'
Yukarıdaki ayarları ekledikten sonra docker-compose dosyasının son hali şöyle olmalı:
# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # <http://www.apache.org/licenses/LICENSE-2.0> # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. # # WARNING: This configuration is for local development. Do not use it in a production deployment. # # This configuration supports basic configuration using environment variables or an .env file # The following variables are supported: # # AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. # Default: apache/airflow:master-python3.8 # AIRFLOW_UID - User ID in Airflow containers # Default: 50000 # AIRFLOW_GID - Group ID in Airflow containers # Default: 50000 # _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account. # Default: airflow # _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account. # Default: airflow # # Feel free to modify this file to suit your needs. --- version: '3' x-airflow-common: &airflow-common image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2} environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'true' AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' AIRFLOW__SMTP__SMTP_HOST: 'smtp.gmail.com' AIRFLOW__SMTP__SMTP_MAIL_FROM: 'YOUR_EMAIL' AIRFLOW__SMTP__SMTP_USER: 'YOUR_EMAIL' AIRFLOW__SMTP__SMTP_PASSWORD: 'YOUR_PASSWORD' AIRFLOW__SMTP__SMTP_PORT: '587' volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - ./datasets:/opt/airflow/datasets user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}" depends_on: redis: condition: service_healthy postgres: condition: service_healthy services: postgres: image: postgres:13 environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] interval: 5s retries: 5 restart: always redis: image: redis:latest ports: - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 30s retries: 50 restart: always airflow-webserver: <<: *airflow-common command: webserver ports: - "8080:8080" healthcheck: test: ["CMD", "curl", "--fail", "<http://localhost:8080/health>"] interval: 10s timeout: 10s retries: 5 restart: always airflow-scheduler: <<: *airflow-common command: scheduler restart: always airflow-worker: <<: *airflow-common command: celery worker restart: always airflow-init: <<: *airflow-common command: version environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} flower: <<: *airflow-common command: celery flower ports: - "5555:5555" healthcheck: test: ["CMD", "curl", "--fail", "<http://localhost:5555/>"] interval: 10s timeout: 10s retries: 5 restart: always volumes: postgres-db-volume:
Evet, servisleri çalıştırmak için hazırız. Docker Compose dosyasının bulunduğu dizinde terminali açıp Airflow servislerini çalıştırabiliriz.
docker-compose up airflow-init docker-compose up -d docker-compose ps
Servisler doğru şekilde başlatıldıysa docker-compose ps komutunun çıktısı aşağıdaki gibi olmalı:
Docker Compose çalıştıktan sonra docker-compose dosyasının bulunduğu dizinde aşağıdaki klasörler oluşacaktır.
Daha sonra ‘dags’ dizininde send_email isminde bir Python dosyası oluşturup aşağıdaki kodu içerisine kopyalayalım.
NOT: ’email’ ve ‘to_email’ değişkenlerine kendi e-posta adresinizi atayınız.
from datetime import datetime from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.email import EmailOperator email = 'your_email' to_email ='your_email' default_args = { "owner": "vbo", "start_date": datetime(2023, 2, 16), 'email': email, 'email_on_failure': True, } with DAG(dag_id="airflow_email_notification", schedule_interval="@once", default_args=default_args, ) as dag: sending_email = EmailOperator( task_id='sending_email', to=to_email, subject='Airflow Alert !!!', html_content="""<h1>Testing Email using Airflow</h1>""", ) sending_email
Airflow UI sayfasına gidin. Giriş yapmak için:
- Kullanıcı adı: airflow
- Şifre: airflow
bilgilerini kullanabilirsiniz. Ardından send_email isimli dag’ı aktif etmeniz gerekiyor.
DAG’ı aktifleştirdiğinizde tetiklenecek ve bir e-posta gönderilecektir.
Ve posta kutunuzu kontrol edebilirsiniz. Eğer bir sorun oluşmadıysa, aşağıdaki gibi bir e-posta gelmiş olmalıdır.
Evet, yazının sonuna geldik. Airflow ile e-posta göndermenin yanı sıra birçok farklı işlemi gerçekleştirmek mümkündür. Örneğin, Airflow kullanarak veri akışlarını otomatikleştirebilir, farklı sistemler arasında veri transferi yapabilir veya veri analizi yapabilirsiniz. Bunların hepsi, Airflow’un sahip olduğu güçlü özelliklerden sadece birkaçıdır. Bu yazıda, sadece temel bir özellik olan e-posta göndermekle ilgili bir örnek verdik. Tekrar görüşmek üzere.
Kaynaklar
Running Airflow in Docker — Airflow Documentation
Sending Emails using Airflow EmailOperator Simplified: 9 Easy Steps