airflow hdfs provider 설치하기
airflow에서 hdfs를 깔끔하게 접근하는 방법이 없을까 고민하다가 airflow provider란 걸 발견했다.
airflow provider는 aws나 azure, google cloud 등을 airflow에서 좀 더 쉽게 연결해 줄 수 있도록 제공해주는 plugin이라고 생각하면 쉬울 것 같다.
일단 내가 설치한 airflow 환경에서 hdfs provider가 있는지 여부부터 살펴 보자.
airflow UI에서 admin < providers를 클릭한 후 설치된 provider 목록을 확인해 보자.
클릭하면 현재 airflow에서 제공되는 provider의 목록들이 나타난다. 불행하게도 hdfs 관련된 provider는 보이지 않는다.
그럼 이제 hdfs provider를 직접 설치해 보도록 하자.
설치를 하기 위해서는 airflow Docker 이미지의 customized version이 필요하다. 아래와 같이 Dockerfile을 생성 후 Dockerfile build를 통해 이미지를 만들어 주자.
FROM apache/airflow:2.9.3-python3.8
USER root
RUN apt-get update && apt-get -y install heimdal-dev gcc
RUN pip install apache-airflow-providers-apache-hdfs
USER airflow
$> docker build --tag airflow:hdfs-proviser .
이렇게 만들어진 이미지를 k8s에 배포하고 이제 다시 접속!!
Admin < connections를 들어간 후 + 버튼을 통해 connection을 추가해주는 페이지로 이동하자.
아래와 같이 apache HDFS provider가 생성된 걸 확인할 수 있다!!
ConnectionType에 WebHDFS가 생성되었으니 이제 connection을 만들어 보자. 만드는 건 생각보다 간단했다.
Connection id에는 해당 connection의 unique id를 입력해주면 된다. ( ex) hdfs_connection)
Connection Type은 Apache WebHDFS type을 선택해 주고, Host에는 name node host를 입력해주자.
마지막으로 port에는 webhdfs port 번호를 입력해 주면 된다. hdfs port (8020)가 아닌 webhdfs port(50070)을 입력해 주어야 정상 동작한다.
이렇게 기입한 후 SAVE 버튼을 누르면 connection이 생성된 걸 확인할 수 있다. 이렇게 생성된 connection은 dag에서 사용할 수가 있는데, 아래와 같은 예제로 만들어진 webhdfs sensor dag을 사용할 수가 있다.
import logging
from datetime import timedelta
from airflow import DAG
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.operators.bash import BashOperator
from datetime import datetime
log = logging.getLogger(__name__)
dag = DAG(
dag_id='hdfs_test',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': False,
'email': ['test@daum.net'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5),
},
description='tutorial test',
schedule_interval=None,
start_date=datetime(2024, 8, 21),
catchup=False,
tags=['test'],
)
t1 = WebHdfsSensor(
task_id="hdfs_test",
filepath="/output/test/LICENSE.txt",
webhdfs_conn_id="hdfs_connection",
queue="half",
poke_interval=30,
timeout=11400,
dag=dag
)
t2 = BashOperator(
task_id='echo',
bash_command='echo "test"',
dag=dag,
)
t1 >> t2
WebHdfsSensor를 만들 때, webhdfs_conn_id를 입력받게 되어 있는데, 이 때 사전에 만든 connection_id 값을 넣어주면 된다.
이렇게 만들어진 dag을 실행해 보면 정상적으로 success 하는 걸 확인할 수가 있다.
이번에는 sensor하려는 파일을 없는 파일로 설정한 후에 다시 dag을 실행해봤다. fail 후 process 종료를 예상했지만, 파일이 없는 경우 지속적으로 running 상태로 남아있는 걸 확인할 수 있었다. fail은 따로 옵션이 있는 것 같아서 추후 찾아보기로 했다.