Airflow 기초 설치

BigData/Dataflow / /
728x90

 

1.   Data Workflow

1)   Data Workflow의 필요성

(1)    데이터 처리를 하는 배치가 Source의 유형이 매우 다양해짐

(2)    데이터의 필요성과 그 양이 매우 늘어남으로 데이터 가공 배치의 개수가 빠르게 상승함

(3)    기존의 방식은 데이터를 시간에 대한 예측이 필요하고 선 작업이 끝난 후에 작업을 하도록 수동 설정을 했어야 함

2)   Data Workflow의 목적

(1)    전체 Flow를 한 곳에서 관찰 가능하도록 함

(2)    배치 파이프라인 모니터링 및 스케줄링이 가능함

(3)    Task 별로 병렬 처리 가능

(4)    유연한 DAG 작성

3)   Airflow Open Project

(1)    AirBnb에서 2015년에 만든 Workflow Open Source, DAG Workflow를 정의하여 스케줄링, 모니터링이 용이함


(2)    중요 특징

I.       풍부한 CLI, UI 제공하여 DAGDependency, 배치 진행 현황, Logs, DAG Code를 한 눈에 볼 수 있다.

II.      Python Code로 다양한 형태의 DAG 구성(관리, 테스트 용이)

III.    모듈성, 높은 확장성 (메시지 큐 사용으로 worker의 수 제약사항 없이 적용)

IV.    Jinja 템플릿 엔진 지원하여 다양한 파라미터 설정할 수 있다.

V.      다양한 Connecter 지원으로 Mysql, MongoDB, HDFS, S3, Hive등과 상호작용

VI.    Auto Retry, Backfill

VII.   Data Profiling(Ad-hoc query, Char)

(3)    장점

I.       거의 모든 종류의 플러그인을 지원하여 범용적으로 Workflow 사용 가능

II.      Web UI를 통해 DAG, 배치 현황, Connector 등을 관리, 모니터링 가능

III.    많은 Reference

IV.    Python Code로 정교한 DAG 구성

V.      배치 실패 시 Retry

(4)    단점

I.       아직 발전 중인 프로젝트

II.      병렬 처리 하기 위해서는 여러 설정이 필요함

2.   Airflow 설치 (Docker-compose 환경 예제)

1)   Docker 환경 설정

(1)    docker-compose.yml

I.       airflow 에서 사용할 카탈로그 DBairflow를 설치할 os를 준비한다.

II.      docker-compose.yml

version'3.1'

services:

  mdb:

    build./mariadb_set

    container_nameafmdb

    restartalways

    volumes:

      - ./home_mariadb:/home/mariadb

      - ./mariadb_set/dbfile:/var/lib/mysql

    privilegedtrue

    environment:

        MYSQL_ROOT_PASSWORD"passwd`"

        TZ"Asia/Seoul"

    ports:

        - "3306:3306"

    hostnamemdb

  airflow:

    build./setting

    container_nameairflow

    privilegedtrue

    volumes:

      - ./home:/home/af

      - ./setting:/home/setting

    ports:

      - 8080:8080

    environment:

        TZ"Asia/Seoul"

    hostnameaf

    commandtail -f /dev/null

 

III.    docker hub 에서 각 시스템의 이미지를 확인한다.

A.     ubuntu

B.      mairadb

(2)    Mariadb

I.       airflow 에서 전체적으로 사용하는 카탈로그 및 로그 등의 정보를 저장

II.      설치

A.     dockerhub mariadb import


B.      Dockerfile

FROM mariadb:10.5.8

 

RUN apt-get update

RUN apt-get install curl wget vim -y

 

RUN echo "yes" | apt-get upgrade -y

 

WORKDIR /home/mariadb

 

(3)    ubuntu

I.       ubuntu 이미지를 바로 받아 사용해도 되지만 간단한 기본 패키지와 한글 설정등을 추가하였다.

II.      Dockerfile

FROM ubuntu:18.04

 

# basic system install

RUN apt-get update 

RUN DEBIAN_FRONTEND="noninteractive" apt-get install -y tzdata

RUN set -ex; \

    apt-get install -y --no-install-recommends \

        sudo \

        net-tools \

        iputils-ping \

        wget \

        gnupg \

        dirmngr \       

        curl \

        vim \ 

        locales \

        language-pack-ko \

        software-properties-common

 

# set to language korean

RUN locale-gen ko_KR.UTF-8

ENV LC_ALL C.UTF-8

ENV LANGUAGE ko_KR.UTF-8

ENV LANG ko_KR.UTF-8

RUN update-locale LANG=ko_KR.UTF-8 LC_MESSAGES=POSIX

 

# set environment variable

ENV TZ=Asia/Seoul

ENV TERM=xterm-256color

ENV NLS_LANG=KOREAN_KOREA.AL32UTF8

 

2)   Mariadb 설정

(1)    Global 설정 변경

I.       airflow에서는 explicit_defaults_for_timestamp를 사용함으로 해당 값을 True로 변경해줘야 한다.

vi /etc/mysql/mariadb.conf.d/50-server.cnf

 

[mysqld]

……

explicit_defaults_for_timestamp = 1

……

 

II.      외부에서 원격 연결을 하기 위해 bind-address도 수정해 준다.

[mysqld]

……

bind-address            = 0.0.0.0

……

 

III.    Mariadb Container 를 재시작 해준다

docker stop afmdb

……

docker start afmdb

 

(2)    사용자 계정 생성

I.       Mariadb 접속

mysql -u root -p

 

II.      airflow 계정 생성

create user 'airflow'@'%' identified by 'passwd';

 

III.    Database 생성

create database airflow character set utf8mb4 collate utf8mb4_general_ci;

 

show databases;

 

+--------------------+

| Database           |

+--------------------+

| airflow            |

| information_schema |

| mysql              |

| performance_schema |

+--------------------+

 

IV.    Database에 권한 생성

grant all privileges on airflow.* to 'airflow'@'%';

flush privileges;

 

3)   Airflow 설정

(1)    Python 환경 여부 결정

I.       Airflow Python 패키지로 존재하는 서비스

II.      Python은 가상화된 환경인 Conda를 제공

III.    가상 환경을 쓰는 이유

A.     버전 별 지원패키지가 다름

B.      Python을 이용하는 솔루션들이 Dependency하는 Python 버전이 솔루션마다 다름

IV.    3가지의 환경을 선택할 수 있다.

A.     Pip Python 환경

B.      Conda Python 환경

C.      Miniconda Python 환경(Conda에서 최소한의 패키지만 설치)

(2)    Miniconda 환경 셋팅

I.       https://docs.conda.io/en/latest/miniconda.html 에서 버전에 맞는 설치를 다운로드 받아 설치한다.

II.      현재는 Docker Ubuntu 버전에 맞는 버전을 받으면 Shell Script를 받을 수 있다.

III.    해당 Shell을 실행하여 설치한다.

sh Miniconda3-latest-Linux-x86_64.sh

 

A.     라이선스 정보를 읽고 설치 위치를 결정한다.

B.      Conda 실행을 한다.

IV.    세션을 종료 후 재접속한다.

V.      Conda 패키지 전체 업데이트

conda update -y --all

 

VI.    werkzeug1.0으로 올라가면서 url_encde함수가 변경되었다. 그래서 2가지 작업 중 1개를 선택하여 작업해야 한다.

A.     최신 airflow 업데이트

pip install --upgrade apache-airflow

 

B.      werkzeug 1.0 미만을 설치

conda search werkzeug

Loading channels: done

# Name       Version Build              Channel

werkzeug     0.12.2  py27hbf75dff_0     pkgs/main

werkzeug     0.12.2  py35hbfc1ea6_0     pkgs/main

werkzeug     0.12.2  py36hc703753_0     pkgs/main

werkzeug     0.14.1  py27_0             pkgs/main

werkzeug     0.14.1  py35_0             pkgs/main

werkzeug     0.14.1  py36_0             pkgs/main

werkzeug     0.14.1  py37_0             pkgs/main

werkzeug     0.15.2  py_0                      pkgs/main

werkzeug     0.15.4  py_0                      pkgs/main

werkzeug     0.15.5  py_0                      pkgs/main

werkzeug     0.16.0  py_0                      pkgs/main

werkzeug     0.16.1  py_0                      pkgs/main

werkzeug     1.0.0   py_0                      pkgs/main

werkzeug     1.0.1   py_0                      pkgs/main

werkzeug     1.0.1   pyhd3eb1b0_0       pkgs/main

 

conda install -y werkzeug=0.16.1

 

C.      pymysql은 설치

conda install –y pymysql

 

(3)    airflow 설치

I.       Conda airflow 설치

conda install -y airflow

 

II.      airflow 초기화

airflow initdb

 

III.    airflow.cfg 파일은 config파일로 중요 옵션은 다음과 같다.

dags_folder = /root/airflow/dags #DAG 저장 위치

base_log_folder = /root/airflow/logs #로그 위치

executor = LocalExecutor #실행 방법

sql_alchemy_conn = mysql+pymysql://id:passwd@ip:port/db?charset=utf8mb4

#기본 airflow에서 시용하는 디비 연결

 

A.     지원되는 Executor의 종류

가)    SequentialExecutor : 기본 Executor로 한 번의 하나의 인스턴스만 실행한다.

나)    LocalExecutor : 단일 로컬에서 프로세스를 늘려 병렬 처리하는 Executor

다)    CeleryExecutor : Celery를 이용한 분산 클러스터 Executor, 메시지 큐나 데이터베이스를 이용하여 순서대로 분산 처리

라)    DaskExecutor : Dask를 이용한 분산클러스터 Executor, Queue를 지원하지 않고 즉시 실행

마)    KubernetesExecutor : Kubernetes를 이용한 분산 클러스터 Executor

IV.    다시 한번 초기화

airflow initdb

 

3.   Airflow 실행

1)   Web UI

(1)    airflow가 제공하는 Web UI

(2)    ad-hoc execute, Dag, Log, Connector 등 다양한 정보를 확인하고 Dag Trigger할 수 있다.

(3)    Web서브를 구동하기 위해서는 Python nvd3 패키지가 필요하다.

conda install -y python-nvd3

 

(4)    airflow web서버 실행

airflow webserver –p 8080 &

 

(5)    airflow WebUI 접속 (http://ip:8080/admin/)


 

2)   Scheduler

(1)    생성된 Dag가 정해진 조건에 의해 Trigger 되기 위한 장치

(2)    스케줄은 Crontab 형태,변수 형태 Time 형태로 생성할 수 있다.

(3)    다음과 같이 스케줄러 실행

airflow scheduler &

 

 

4.   출처

1)      https://flowarc.tistory.com/entry/%EB%B0%B0%EC%B9%98-%ED%8C%8C%EC%9D%B4%ED%94%84%EB%9D%BC%EC%9D%B8-%EA%B4%80%EB%A6%AC%EB%A5%BC-%EC%9C%84%ED%95%9C-Workflow-%EB%A6%AC%EC%84%9C%EC%B9%98-Airflow-VS-Azkaban-VS-Oozie

2)      https://hub.docker.com/_/mariadb

3)      https://hub.docker.com/_/ubuntu

4)      https://dhznsdl.tistory.com/19?category=841698

5)      https://stackoverflow.com/questions/64971281/airflow-webserver-gettins-valueerrorsamesite

6)      https://docs.conda.io/en/latest/miniconda.html

7)      https://airflow.readthedocs.io/en/1.10.14/executor/

8)      https://docs.sqlalchemy.org/en/14/dialects/mysql.html   

 

728x90

'BigData > Dataflow' 카테고리의 다른 글

Airflow - 간단한 Mysql to MSSQL(SQL Server) 이관 예제  (2) 2021.06.16
Airflow Operator 정리  (0) 2021.05.26
Airflow 병렬 프로세스 및 설치  (0) 2021.05.17
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기