Airflow 병렬 프로세스 및 설치

BigData/Dataflow / /
728x90

 

1.   개념

1)   아키텍쳐


(1)    Worker : 할당된 작업을 실행

(2)    Scheduler : 큐에 작업을 추가

(3)    Web server : DAG, 작업 등 Web UI 제공

(4)    Database : 보안, 작업, DAG, 변수, 연결 등의 상태에 대한 정보를 저장

(5)    Celery : 큐 병렬 처리 관리

I.       Broker : 실행 명령을 저장

II.      Result Backend : 완료된 명령 상태 저장

 

2)   프로세스


(1)    Scheduler Process는 실행되어야 할 작업을 Queue Broker에게 전달한다.

(2)    Scheduler Process가 작업의 상태를 Result Backend에 저장하고 Queue Broker가 주기적으로 작업의 상태를 관리한다.

(3)    Queue Broker 가 작업을 Work Process에 할당한다

(4)    Worker Process Child Process에 단일 작업을 할당한다.

(5)    Worker Child Process가 작업을 처리

I.       Local Task Job Process Task Runner를 통해 새로운 Process를 시작

II.      Raw Task Process가 작업이 완료될 때까지 Code를 실행

(6)    작업이 완료되면 Child ProcessWorker 프로세스로 보내 내용 취합

(7)    Worker Process가 취합된 결과를 Result Backend에 저장

(8)    Result Backend에서 완료 상태를 Scheduler Process가 읽어내어 종료

2.   설치

1)   Docker 환경

(1)    이전 airflow 정보는 아래에서 다음에서 확인

Airflow 기초 설치 :: 진윤호 개발 블로그 (tistory.com)

(2)    기존 설치와 달리 Conda 가상 환경이 아닌 OS에 설치 진행하도록 하고자 한다.

(3)    docker-compose.yml

I.       기존에 설치하였던 카달로그 DBairflow 설치될 OS 외에 병렬처리를 위한 Queue를 대신하기 위해 Redis를 활용한다.

II.      docker-compose.yml

version'3.7'

services:

  mdb:

    build./mariadb_set

    container_nameafmdb

    restartalways

    volumes:

      - ./home_mariadb:/home/mariadb

      - ./mariadb_set/dbfile:/var/lib/mysql

    privilegedtrue

    environment:

        MYSQL_ROOT_PASSWORD"passwd"

        TZ"Asia/Seoul"

    ports:

        - "3306:3306"

    hostnamemdb 

  rdb:

    build./redis_set

    container_nameafredis

    restartalways

    volumes:

      - ./home_redis:/home/redis

    privilegedtrue

    environment:

        TZ"Asia/Seoul"

    ports:

        - "6379:6379"

    hostnamerdb

    commandredis-server --requirepass paawd --port 6379

  airflow:

    build./setting

    container_nameairflow

    privilegedtrue

    volumes:

      - ./home:/home/af

      - ./setting:/home/setting

    ports:

      - 8080:8080

      - 8793:8793

      - 5555:5555

    environment:

        TZ"Asia/Seoul"

    hostnameaf

    commandtail -f /dev/null

 

III.    docker hub에서 각 시스템의 이미지 확인

A.     ubuntu

B.      mairadb

C.      redis

(4)    Mariadb : 이전과 같은 정보로 설치

FROM mariadb:10.5.8

 

RUN apt-get update

RUN apt-get install curl wget vim -y

 

RUN echo "yes" | apt-get upgrade -y

 

WORKDIR /home/mariadb

 

(5)    ubuntu : 이전과 같은 정보로 설치

FROM ubuntu:18.04

 

# basic system install

RUN apt-get update 

RUN DEBIAN_FRONTEND="noninteractive" apt-get install -y tzdata

RUN set -ex; \

    apt-get install -y --no-install-recommends \

        sudo \

        net-tools \

        iputils-ping \

        wget \

        gnupg \

        dirmngr \       

        curl \

        vim \ 

        locales \

        language-pack-ko \

        software-properties-common

 

# set to language korean

RUN locale-gen ko_KR.UTF-8

ENV LC_ALL C.UTF-8

ENV LANGUAGE ko_KR.UTF-8

ENV LANG ko_KR.UTF-8

RUN update-locale LANG=ko_KR.UTF-8 LC_MESSAGES=POSIX

 

# set environment variable

ENV TZ=Asia/Seoul

ENV TERM=xterm-256color

ENV NLS_LANG=KOREAN_KOREA.AL32UTF8

 

(6)    redis

I.       DAG의 같은 레벨의 각 작업을 병렬로 처리하기 위한 Queue 역할을 함

II.      docker hub에서 Redis import한다.

FROM redis:6.2.3

 

RUN apt-get update

RUN apt-get install curl wget vim -y

 

RUN echo "yes" | apt-get upgrade -y

 

WORKDIR /home/redis

 

2)   MariaDB 설정

(1)    Global 설정

I.       airflow에서는 explicit_defaults_for_timestamp를 사용함으로 해당 값을 True로 변경해줘야 한다.

vi /etc/mysql/mariadb.conf.d/50-server.cnf

 

[mysqld]

……

explicit_defaults_for_timestamp = 1

……

 

II.      외부에서 원격 연결을 하기 위해 bind-address도 수정해 준다.

[mysqld]

……

bind-address            = 0.0.0.0

……

 

III.    airflow 2.X 에서는 varchar형에 Unique index를 생성하는 작업을 init에서 작업한다. utf8mb4에서는 index 허용 데이터 범위가 초과하여 생성 Overflow가 발생되기 때문에 해당 설정을 변경하여 작업을 한다.

[mysqld]

……

innodb_file_format = barracuda

innodb_file_per_table = 1

innodb_large_prefix = 1

……

 

IV.    Mariadb Container 를 재시작 해준다

docker stop afmdb

……

docker start afmdb

 

(2)    사용자 계정 생성

I.       Mariadb 접속

mysql -u root -p

 

II.      airflow 계정 생성

create user 'airflow'@'%' identified by 'passwd';

 

III.    Database 생성

create database airflow character set utf8mb4 collate utf8mb4_general_ci;

 

show databases;

 

+--------------------+

| Database           |

+--------------------+

| airflow            |

| information_schema |

| mysql              |

| performance_schema |

+--------------------+

 

IV.    Database에 권한 생성

grant all privileges on airflow.* to 'airflow'@'%';

flush privileges

 

3)   Redis 설정

(1)    docker-composes.yml의 세팅에서 기본적인 셋팅을 진행했기 때문에 접속하여 간단한 테스트만 진행

(2)    접속

redis-cli -h 200.150.111.15 -p 3040 -a a12345

 

(3)    간단한 데이터 테스트

set a 1

get a

del a

 

4)   Airflow 설정

(1)    python 버전 설정

I.       ubuntu 18.04는 기본적으로 python 3.6설치 되어 있다.

II.      python으로 심볼릭 링크가 되어 있지 않다.

python --version

bash: python: 명령어를 찾을 없음

 

III.    지금은 Python 사용 버전을 3.8로 채택하여 사용

A.     python3.8 설치

apt install python3.8

 

B.      Alternative를 이용하여 Python 심볼릭 링크 생성

가)    python 으로 생성된 Alternative가 있는지 확인

update-alternatives --config python

update-alternatives: 오류: no alternatives for python

 

나)    설치된 3.6 3.8을 등록

update-alternatives --install /usr/bin/python python /usr/bin/python3.6 1

update-alternatives: using /usr/bin/python3.6 to provide /usr/bin/python (python) in auto mode

update-alternatives --install /usr/bin/python python /usr/bin/python3.8 2

update-alternatives: using /usr/bin/python3.8 to provide /usr/bin/python (python) in auto mode

 

다)    현재 선택된 버전 확인

update-alternatives --config python

대체 항목 python 대해 (/usr/bin/python 제공) 2 선택이 있습니다.

 

  선택       경로              우선순 상태

------------------------------------------------------------

* 0            /usr/bin/python3.8   2         자동 모드

  1            /usr/bin/python3.6   1         수동 모드

  2            /usr/bin/python3.8   2         수동 모드

 

Press <enter> to keep the current choice[*], or type selection number

(숫자를 입력하여 선택 내역을 변경할 수 있다.)

 

라)    python 버전은 확인 한다.

python --version

Python 3.8.0

 

(2)    패키지 설치

I.       pip 역시 심볼릭 링크가 없기 때문에 설치 및 버전 작업을 한다.

pip --version

bash: pip: 명령어를 찾을 없음

 

apt install python3-pip

 

update-alternatives --config pip

update-alternatives: 오류: no alternatives for pip

 

update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 1

 

pip --version

pip 9.0.1 from /usr/lib/python3/dist-packages (python 3.6)

 

II.      airflow 2.x를 설치하기 위해 update를 진행하여 설치

pip install --upgrade --ignore-installed pip setuptools

pip install --upgrade apache-airflow

 

III.    필요 패키지들을 설치(celerymethod 문제로 4점대 사용)

pip install pymysql

pip install celery==4.4.7

pip install redis

pip install flower

pip install mysqlclient

mysqlclinet 설친 중 오류가 발생하면 다음을 설치 후 재시도 한다.

apt install libmysqlclient-dev

 

IV.    airflow 초기화

airflow db init

 

V.      airflow.cfg 파일은 config파일로 중요 옵션 변경

cd ~/airflow

 

vi airflow.cfg

 

dags_folder = /root/airflow/dags #DAG 저장 위치

base_log_folder = /root/airflow/logs #로그 위치

executor = CeleryExecutor

sql_alchemy_conn = mysql+pymysql://id:passwd@ip:port/db?charset=utf8mb4

broker_url = redis://:passwd@ip:port/dbnumber

result_backend = db+mysql://id:passwd@ip:port/db

 

airflow db init

 

VI.    airflow 사용할 계정을 생성한다.

airflow users create -h

usage: airflow users create [-h] -e EMAIL -f FIRSTNAME -l LASTNAME [-p PASSWORD] -r ROLE [--use-random-password] –u                             USERNAME

 

3.   실행

1)   Airflow 실행

(1)    airflow 실행은 명령어 그룹 내 명령과 전체 범위 실행 명령어가 있다.

airflow [-h] GROUP_OR_COMMAND ...

 

positional arguments:

  GROUP_OR_COMMAND

 

    Groups:

      celery         Celery components

      config         View configuration

      connections    Manage connections

      dags           Manage DAGs

      db             Database operations

      kubernetes     Tools to help run the KubernetesExecutor

      pools          Manage pools

      providers      Display providers

      roles          Manage roles

      tasks          Manage tasks

      users          Manage users

      variables      Manage variables

 

    Commands:

      cheat-sheet    Display cheat sheet

      info           Show information about current Airflow and environment

      kerberos       Start a kerberos ticket renewer

      plugins        Dump information about loaded plugins

      rotate-fernet-key

                     Rotate encrypted connection credentials and variables

      scheduler      Start a scheduler instance

      sync-perm      Update permissions for existing roles and DAGs

      version        Show the version

      webserver      Start a Airflow webserver instance

 

(2)    Celery 구성 실행

airflow celery worker

airflow celery flower

airflow webserver

airflow scheduler

                    

4.   출처

1)      https://codechacha.com/ko/change-python-version/

2)      http://melonicedlatte.com/android/2018/05/21/221524.html

3)      https://docs.celeryproject.org/en/stable/userguide/configuration.html

4)      https://velog.io/@jongwoo328/Ubuntu-mysqlclient-%ED%8C%A8%ED%82%A4%EC%A7%80-%EC%84%A4%EC%B9%98-%EC%98%A4%EB%A5%98

5)      https://airflow.apache.org/docs/apache-airflow/stable/executor/celery.html

 

728x90

'BigData > Dataflow' 카테고리의 다른 글

Airflow - 간단한 Mysql to MSSQL(SQL Server) 이관 예제  (2) 2021.06.16
Airflow Operator 정리  (0) 2021.05.26
Airflow 기초 설치  (0) 2021.04.27
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기