1. Data Workflow
1) Data Workflow의 필요성
(1) 데이터 처리를 하는 배치가 Source의 유형이 매우 다양해짐
(2) 데이터의 필요성과 그 양이 매우 늘어남으로 데이터 가공 배치의 개수가 빠르게 상승함
(3) 기존의 방식은 데이터를 시간에 대한 예측이 필요하고 선 작업이 끝난 후에 작업을 하도록 수동 설정을 했어야 함
2) Data Workflow의 목적
(1) 전체 Flow를 한 곳에서 관찰 가능하도록 함
(2) 배치 파이프라인 모니터링 및 스케줄링이 가능함
(3) 각 Task 별로 병렬 처리 가능
(4) 유연한 DAG 작성
3) Airflow Open Project
(1) AirBnb에서 2015년에 만든 Workflow Open Source, DAG Workflow를 정의하여 스케줄링, 모니터링이 용이함
(2) 중요 특징
I. 풍부한 CLI, UI 제공하여 DAG의 Dependency, 배치 진행 현황, Logs, DAG Code를 한 눈에 볼 수 있다.
II. Python Code로 다양한 형태의 DAG 구성(관리, 테스트 용이)
III. 모듈성, 높은 확장성 (메시지 큐 사용으로 worker의 수 제약사항 없이 적용)
IV. Jinja 템플릿 엔진 지원하여 다양한 파라미터 설정할 수 있다.
V. 다양한 Connecter 지원으로 Mysql, MongoDB, HDFS, S3, Hive등과 상호작용
VI. Auto Retry, Backfill
VII. Data Profiling(Ad-hoc query, Char)
(3) 장점
I. 거의 모든 종류의 플러그인을 지원하여 범용적으로 Workflow 사용 가능
II. Web UI를 통해 DAG, 배치 현황, Connector 등을 관리, 모니터링 가능
III. 많은 Reference
IV. Python Code로 정교한 DAG 구성
V. 배치 실패 시 Retry
(4) 단점
I. 아직 발전 중인 프로젝트
II. 병렬 처리 하기 위해서는 여러 설정이 필요함
2. Airflow 설치 (Docker-compose 환경 예제)
1) Docker 환경 설정
(1) docker-compose.yml
I. airflow 에서 사용할 카탈로그 DB와 airflow를 설치할 os를 준비한다.
II. docker-compose.yml
version: '3.1'
services:
mdb:
build: ./mariadb_set
container_name: afmdb
restart: always
volumes:
- ./home_mariadb:/home/mariadb
- ./mariadb_set/dbfile:/var/lib/mysql
privileged: true
environment:
MYSQL_ROOT_PASSWORD: "passwd`"
TZ: "Asia/Seoul"
ports:
- "3306:3306"
hostname: mdb
airflow:
build: ./setting
container_name: airflow
privileged: true
volumes:
- ./home:/home/af
- ./setting:/home/setting
ports:
- 8080:8080
environment:
TZ: "Asia/Seoul"
hostname: af
command: tail -f /dev/null
III. docker hub 에서 각 시스템의 이미지를 확인한다.
A. ubuntu
B. mairadb
(2) Mariadb
I. airflow 에서 전체적으로 사용하는 카탈로그 및 로그 등의 정보를 저장
II. 설치
A. dockerhub의 mariadb를 import
B. Dockerfile
FROM mariadb:10.5.8
RUN apt-get update
RUN apt-get install curl wget vim -y
RUN echo "yes" | apt-get upgrade -y
WORKDIR /home/mariadb
(3) ubuntu
I. ubuntu 이미지를 바로 받아 사용해도 되지만 간단한 기본 패키지와 한글 설정등을 추가하였다.
II. Dockerfile
FROM ubuntu:18.04
# basic system install
RUN apt-get update
RUN DEBIAN_FRONTEND="noninteractive" apt-get install -y tzdata
RUN set -ex; \
apt-get install -y --no-install-recommends \
sudo \
net-tools \
iputils-ping \
wget \
gnupg \
dirmngr \
curl \
vim \
locales \
language-pack-ko \
software-properties-common
# set to language korean
RUN locale-gen ko_KR.UTF-8
ENV LC_ALL C.UTF-8
ENV LANGUAGE ko_KR.UTF-8
ENV LANG ko_KR.UTF-8
RUN update-locale LANG=ko_KR.UTF-8 LC_MESSAGES=POSIX
# set environment variable
ENV TZ=Asia/Seoul
ENV TERM=xterm-256color
ENV NLS_LANG=KOREAN_KOREA.AL32UTF8
2) Mariadb 설정
(1) Global 설정 변경
I. airflow에서는 explicit_defaults_for_timestamp를 사용함으로 해당 값을 True로 변경해줘야 한다.
vi /etc/mysql/mariadb.conf.d/50-server.cnf
[mysqld]
……
explicit_defaults_for_timestamp = 1
……
II. 외부에서 원격 연결을 하기 위해 bind-address도 수정해 준다.
[mysqld]
……
bind-address = 0.0.0.0
……
III. Mariadb Container 를 재시작 해준다
docker stop afmdb
……
docker start afmdb
(2) 사용자 계정 생성
I. Mariadb 접속
mysql -u root -p
II. airflow 계정 생성
create user 'airflow'@'%' identified by 'passwd';
III. Database 생성
create database airflow character set utf8mb4 collate utf8mb4_general_ci;
show databases;
+--------------------+
| Database |
+--------------------+
| airflow |
| information_schema |
| mysql |
| performance_schema |
+--------------------+
IV. Database에 권한 생성
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
3) Airflow 설정
(1) Python 환경 여부 결정
I. Airflow는 Python 패키지로 존재하는 서비스
II. Python은 가상화된 환경인 Conda를 제공
III. 가상 환경을 쓰는 이유
A. 버전 별 지원패키지가 다름
B. Python을 이용하는 솔루션들이 Dependency하는 Python 버전이 솔루션마다 다름
IV. 3가지의 환경을 선택할 수 있다.
A. Pip Python 환경
B. Conda Python 환경
C. Miniconda Python 환경(Conda에서 최소한의 패키지만 설치)
(2) Miniconda 환경 셋팅
I. https://docs.conda.io/en/latest/miniconda.html 에서 버전에 맞는 설치를 다운로드 받아 설치한다.
II. 현재는 Docker Ubuntu 버전에 맞는 버전을 받으면 Shell Script를 받을 수 있다.
III. 해당 Shell을 실행하여 설치한다.
sh Miniconda3-latest-Linux-x86_64.sh
A. 라이선스 정보를 읽고 설치 위치를 결정한다.
B. Conda 실행을 한다.
IV. 세션을 종료 후 재접속한다.
V. Conda 패키지 전체 업데이트
conda update -y --all
VI. werkzeug가 1.0으로 올라가면서 url_encde함수가 변경되었다. 그래서 2가지 작업 중 1개를 선택하여 작업해야 한다.
A. 최신 airflow 업데이트
pip install --upgrade apache-airflow
B. werkzeug를 1.0 미만을 설치
conda search werkzeug
Loading channels: done
# Name Version Build Channel
werkzeug 0.12.2 py27hbf75dff_0 pkgs/main
werkzeug 0.12.2 py35hbfc1ea6_0 pkgs/main
werkzeug 0.12.2 py36hc703753_0 pkgs/main
werkzeug 0.14.1 py27_0 pkgs/main
werkzeug 0.14.1 py35_0 pkgs/main
werkzeug 0.14.1 py36_0 pkgs/main
werkzeug 0.14.1 py37_0 pkgs/main
werkzeug 0.15.2 py_0 pkgs/main
werkzeug 0.15.4 py_0 pkgs/main
werkzeug 0.15.5 py_0 pkgs/main
werkzeug 0.16.0 py_0 pkgs/main
werkzeug 0.16.1 py_0 pkgs/main
werkzeug 1.0.0 py_0 pkgs/main
werkzeug 1.0.1 py_0 pkgs/main
werkzeug 1.0.1 pyhd3eb1b0_0 pkgs/main
conda install -y werkzeug=0.16.1
C. pymysql은 설치
conda install –y pymysql
(3) airflow 설치
I. Conda airflow 설치
conda install -y airflow
II. airflow 초기화
airflow initdb
III. airflow.cfg 파일은 config파일로 중요 옵션은 다음과 같다.
dags_folder = /root/airflow/dags #DAG 저장 위치
base_log_folder = /root/airflow/logs #로그 위치
executor = LocalExecutor #실행 방법
sql_alchemy_conn = mysql+pymysql://id:passwd@ip:port/db?charset=utf8mb4
#기본 airflow에서 시용하는 디비 연결
A. 지원되는 Executor의 종류
가) SequentialExecutor : 기본 Executor로 한 번의 하나의 인스턴스만 실행한다.
나) LocalExecutor : 단일 로컬에서 프로세스를 늘려 병렬 처리하는 Executor
다) CeleryExecutor : Celery를 이용한 분산 클러스터 Executor, 메시지 큐나 데이터베이스를 이용하여 순서대로 분산 처리
라) DaskExecutor : Dask를 이용한 분산클러스터 Executor, Queue를 지원하지 않고 즉시 실행
마) KubernetesExecutor : Kubernetes를 이용한 분산 클러스터 Executor
IV. 다시 한번 초기화
airflow initdb
3. Airflow 실행
1) Web UI
(1) airflow가 제공하는 Web UI
(2) ad-hoc execute, Dag, Log, Connector 등 다양한 정보를 확인하고 Dag를 Trigger할 수 있다.
(3) Web서브를 구동하기 위해서는 Python nvd3 패키지가 필요하다.
conda install -y python-nvd3
(4) airflow web서버 실행
airflow webserver –p 8080 &
(5) airflow WebUI 접속 (http://ip:8080/admin/)
2) Scheduler
(1) 생성된 Dag가 정해진 조건에 의해 Trigger 되기 위한 장치
(2) 스케줄은 Crontab 형태,변수 형태 Time 형태로 생성할 수 있다.
(3) 다음과 같이 스케줄러 실행
airflow scheduler &
4. 출처
2) https://hub.docker.com/_/mariadb
3) https://hub.docker.com/_/ubuntu
4) https://dhznsdl.tistory.com/19?category=841698
5) https://stackoverflow.com/questions/64971281/airflow-webserver-gettins-valueerrorsamesite
6) https://docs.conda.io/en/latest/miniconda.html
7) https://airflow.readthedocs.io/en/1.10.14/executor/
8) https://docs.sqlalchemy.org/en/14/dialects/mysql.html
'BigData > Dataflow' 카테고리의 다른 글
Airflow - 간단한 Mysql to MSSQL(SQL Server) 이관 예제 (2) | 2021.06.16 |
---|---|
Airflow Operator 정리 (0) | 2021.05.26 |
Airflow 병렬 프로세스 및 설치 (0) | 2021.05.17 |
최근댓글