Airflow 사용 이유
- 데이터 엔지니어링에선 데이터 ETL(Extract, Transform, Load) 과정을 통해 데이터를 가공하며 적재함
- 머신러닝 분야에서도 모델 학습용 데이터 전처리, Train, Prediction시 사용 가능
- 위와 같은 경우 여러개의 Sequential한 로직(앞의 output이 뒤의 input이 되는)이 존재하는데 이런 로직들을 한번에 관리해야 함
- 관리할 로직이 적다면 CRON + 서버에 직접 접속해 디버깅 하는 방식으로 사용할 수 있지만, 점점 관리할 태스크들이 많아지면 헷갈리는 경우가 생김
- 이런 Workflow Management 도구는 airflow 외에도 하둡 에코시스템에 우지(oozie), luigi 같은 솔루션이 있음
- 이런 Workflow Management 도구는 airflow 외에도 하둡 에코시스템에 우지(oozie), luigi 같은 솔루션이 있음
DAG 생성
- DAG 생성하는 흐름
- (1) default_args 정의
- 누가 만들었는지, start_date는 언제부턴지 등)
- (2) DAG 객체 생성
- dag id, schedule interval 정의
- (3) DAG 안에 Operator를 활용해 Task 생성
- (4) Task들을 연결함( >>, << 활용)
- (1) default_args 정의
- Airflow는 $AIRFLOW_HOME(default는 ~/airflow)의 dags 폴더에 있는 dag file을 지속적으로 체크함
- Operator를 사용해 Task를 정의함
- Operator가 인스턴스화가 될 경우 Task라고 함
- Python Operator, Bash Operator, BigQuery Operator, Dataflow Operator 등
- Operator 관련 자료는 공식 문서 참고
- Operator는 unique한 task_id를 가져야 하고, 오퍼레이터별 다른 파라미터를 가지고 있음
- 아래 코드를 dags 폴더 아래에 test.py로 저장하고 웹서버에서 test DAG 옆에 있는 toggle 버튼을 ON으로 변경
- templated_command에서 % 앞뒤의 # 제거해주세요!
import os from datetime import datetime, timedelta from airflow import DAG from airflow.providers.papermill.operators.papermill import PapermillOperator from airflow.operators.dummy_operator import DummyOperator from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator from slack_plugin import task_fail_slack_alert import pendulum from airflow.models.variable import Variable KST = pendulum.timezone("Asia/Seoul") START_DATE = datetime(2021, 1, 1, tzinfo=KST) SCHEDULE_INTERVAL = "0 9 * * *" DAGRUN_TIMEOUT = timedelta(minutes=60) DAG_ID = "jerry_schedule" limit = 10 nbviewer_url = Variable.get("nbviewer_url") with DAG( dag_id=DAG_ID, schedule_interval=SCHEDULE_INTERVAL, start_date=START_DATE, dagrun_timeout=DAGRUN_TIMEOUT, on_failure_callback=task_fail_slack_alert, tags=["ao_product"], catchup=False, ) as dag_1: # [START howto_operator_papermill] run_this = PapermillOperator( task_id="bq_papermill_test", input_nb=os.path.join( os.path.dirname(os.path.realpath(__file__)), "bq_test.ipynb", ), output_nb="/opt/airflow/viewers/out-bq_test-{{ execution_date }}.ipynb", parameters={ "limit": limit, "execution_date": "{{execution_date}}", "ds": "{{ds}}", "ds_nodash": "{{ds_nodash}}", "run_id": "{{run_id}}", "next_ds": "{{next_ds}}", "next_ds_nodash": "{{next_ds_nodash}}", "yesterday_ds": "{{yesterday_ds}}", "yesterday_ds_nodash": "{{yesterday_ds_nodash}}", "tomorrow_ds": "{{tomorrow_ds}}", "tomorrow_ds_nodash": "{{tomorrow_ds_nodash}}", }, ) # [END howto_operator_papermill] slack_noti = SlackWebhookOperator( task_id="slack_noti", http_conn_id="slack_connection", webhook_token="", message=nbviewer_url + "/out-bq_test-{{ execution_date }}.ipynb task is completed!", channel="#emart_dt_airflow", ) end = DummyOperator( task_id="complete_job", ) run_this >> slack_noti >> end
- Scheduler : 모든 DAG와 Task에 대하여 모니터링 및 관리하고, 실행해야할 Task를 스케줄링 해줍니다.
- Web server : Airflow의 웹 UI 서버 입니다.
- DAG : Directed Acyclic Graph로 개발자가 Python으로 작성한 워크플로우 입니다. Task들의 dependency를 정의합니다.
- Database : Airflow에 존재하는 DAG와 Task들의 메타데이터를 저장하는 데이터베이스입니다.
- Worker : 실제 Task를 실행하는 주체입니다. Executor 종류에 따라 동작 방식이 다양합니다.
- Executor: 자원 활용과 task를 잘 분배하는 방법을 처리하는 중개자 역할을 합니다.
반응형
'DevOps study' 카테고리의 다른 글
Class 개념과 상속 (0) | 2021.07.12 |
---|---|
Pyqt 기초 문법 (0) | 2021.03.11 |
Spotify Web API request (0) | 2020.12.25 |
REST API (Application programming Interface) 정리 (0) | 2020.12.25 |