DevOps study

Airflow 기초

판교데싸 2022. 1. 11. 10:45

Airflow 사용 이유

  • 데이터 엔지니어링에선 데이터 ETL(Extract, Transform, Load) 과정을 통해 데이터를 가공하며 적재함
    • 머신러닝 분야에서도 모델 학습용 데이터 전처리, Train, Prediction시 사용 가능
  • 위와 같은 경우 여러개의 Sequential한 로직(앞의 output이 뒤의 input이 되는)이 존재하는데 이런 로직들을 한번에 관리해야 함
  • 관리할 로직이 적다면 CRON + 서버에 직접 접속해 디버깅 하는 방식으로 사용할 수 있지만, 점점 관리할 태스크들이 많아지면 헷갈리는 경우가 생김
    • 이런 Workflow Management 도구는 airflow 외에도 하둡 에코시스템에 우지(oozie), luigi 같은 솔루션이 있음

 

 DAG 생성

  • DAG 생성하는 흐름
    • (1) default_args 정의
      • 누가 만들었는지, start_date는 언제부턴지 등)
    • (2) DAG 객체 생성
      • dag id, schedule interval 정의
    • (3) DAG 안에 Operator를 활용해 Task 생성
    • (4) Task들을 연결함( >>, << 활용)
  • Airflow는 $AIRFLOW_HOME(default는 ~/airflow)의 dags 폴더에 있는 dag file을 지속적으로 체크함
  • Operator를 사용해 Task를 정의함
    • Operator가 인스턴스화가 될 경우 Task라고 함
    • Python Operator, Bash Operator, BigQuery Operator, Dataflow Operator 등
    • Operator 관련 자료는 공식 문서 참고
    • Operator는 unique한 task_id를 가져야 하고, 오퍼레이터별 다른 파라미터를 가지고 있음
  • 아래 코드를 dags 폴더 아래에 test.py로 저장하고 웹서버에서 test DAG 옆에 있는 toggle 버튼을 ON으로 변경
    • templated_command에서 % 앞뒤의 # 제거해주세요!
    import os
    from datetime import datetime, timedelta
     
    from airflow import DAG
    from airflow.providers.papermill.operators.papermill import PapermillOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
    from slack_plugin import task_fail_slack_alert
    import pendulum
     
    from airflow.models.variable import Variable
     
    KST = pendulum.timezone("Asia/Seoul")
    START_DATE = datetime(2021, 1, 1, tzinfo=KST)
    SCHEDULE_INTERVAL = "0 9 * * *"
    DAGRUN_TIMEOUT = timedelta(minutes=60)
    DAG_ID = "jerry_schedule"
     
    limit = 10
     
    nbviewer_url = Variable.get("nbviewer_url")
     
    with DAG(
        dag_id=DAG_ID,
        schedule_interval=SCHEDULE_INTERVAL,
        start_date=START_DATE,
        dagrun_timeout=DAGRUN_TIMEOUT,
        on_failure_callback=task_fail_slack_alert,
        tags=["ao_product"],
        catchup=False,
    ) as dag_1:
        # [START howto_operator_papermill]
        run_this = PapermillOperator(
            task_id="bq_papermill_test",
            input_nb=os.path.join(
                os.path.dirname(os.path.realpath(__file__)),
                "bq_test.ipynb",
            ),
            output_nb="/opt/airflow/viewers/out-bq_test-{{ execution_date }}.ipynb",
            parameters={
                "limit": limit,
                "execution_date": "{{execution_date}}",
                "ds": "{{ds}}",
                "ds_nodash": "{{ds_nodash}}",
                "run_id": "{{run_id}}",
                "next_ds": "{{next_ds}}",
                "next_ds_nodash": "{{next_ds_nodash}}",
                "yesterday_ds": "{{yesterday_ds}}",
                "yesterday_ds_nodash": "{{yesterday_ds_nodash}}",
                "tomorrow_ds": "{{tomorrow_ds}}",
                "tomorrow_ds_nodash": "{{tomorrow_ds_nodash}}",
            },
        )
        # [END howto_operator_papermill]
        slack_noti = SlackWebhookOperator(
            task_id="slack_noti",
            http_conn_id="slack_connection",
            webhook_token="",
            message=nbviewer_url
            + "/out-bq_test-{{ execution_date }}.ipynb task is completed!",
            channel="#emart_dt_airflow",
        )
        end = DummyOperator(
            task_id="complete_job",
        )
     
        run_this >> slack_noti >> end
  • Scheduler : 모든 DAG와 Task에 대하여 모니터링 및 관리하고, 실행해야할 Task를 스케줄링 해줍니다.
  • Web server : Airflow의 웹 UI 서버 입니다.
  • DAG : Directed Acyclic Graph로 개발자가 Python으로 작성한 워크플로우 입니다. Task들의 dependency를 정의합니다.
  • Database : Airflow에 존재하는 DAG와 Task들의 메타데이터를 저장하는 데이터베이스입니다.
  • Worker : 실제 Task를 실행하는 주체입니다. Executor 종류에 따라 동작 방식이 다양합니다.
  • Executor: 자원 활용과 task를 잘 분배하는 방법을 처리하는 중개자 역할을 합니다.
반응형

'DevOps study' 카테고리의 다른 글

Class 개념과 상속  (0) 2021.07.12
Pyqt 기초 문법  (0) 2021.03.11
Spotify Web API request  (0) 2020.12.25
REST API (Application programming Interface) 정리  (0) 2020.12.25
반응형