Airflow - 간단한 Mysql to MSSQL(SQL Server) 이관 예제

BigData/Dataflow / /
728x90

1.   Connection Setting

1)   Web UI

A.     UI 상단에 AdminSub 메뉴로 Connections


 

B.      airflow db init 명령을 진행하면 default로 몇 개의 Connection이 생성되어 있다.

C.      Connction 설정


                         i.         Conn id : 추후 DAG 작성 시 사용하는 연결 ID

                        ii.         Conn Type : airflow provider package를 설치하면 나오는 연결 타입

                       iii.         Schema : 데이터베이스명

                       iv.         Login : 로그인 아이디

                        v.         Password : 비밀번호

                       vi.         Port : DB 연결 포트

                      vii.         Extra : Connection에 사용되는 Parameter 값으로 uri 입력시의 ? 뒷 절을 의미한다.

(ㄱ)  ex >

conn-uri 'my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

 

(ㄴ)  ?param1=val1&param2=val2 절을 extra로 설정하면 다음과 같다.

json.dumps(dict(param1='val1'param2='val2'))

 

2)   CRI

A.     Shell 명령어를 통해 다음과 같이 등록 및 수정 할 수 있다.

                         i.         등록은 add 명령어를 통해 수정

                        ii.         수정은 add 명령어 대신 edit 명령어를 사용하여 수정한다.

B.      uri

                         i.         uri는 일반적인 Connction uri 형태를 따른다.

                        ii.         다음은 해당 예제이다.

airflow connections add 'my_prod_db' --conn-uri 'my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

 

C.      option

                         i.         shell uri 형태가 아닌 airflow 명령어를 통해 등록이 가능하다.

                        ii.         다음은 해당 예제이다.

airflow connections add 'my_prod_db' \

    --conn-type 'my-conn-type'

    --conn-login 'login' \

    --conn-password 'password' \

    --conn-host 'host' \

    --conn-port 'port' \

    --conn-schema 'schema' \

    ...

 

2.   Provider 패키지

1)   Mysql

A.     airflow.providers.mysql.operators.mysql 를 이용하여 mysql의 쿼리를 실행

B.      해당 PyPI 정보

https://pypi.org/project/apache-airflow-providers-mysql/

C.      해당 클래스 정보

class airflow.providers.mysql.operators.mysql.MySqlOperator(*, 

sql: str

mysql_conn_idstr = 'mysql_default'

parameters: Optional[Union[Mapping, Iterable]] = None

autocommit: bool = False

database: Optional[str] = None, **kwargs)

 

                         i.         sql : 실행할 쿼리

                        ii.         mysql_conn_id : Connections에 설정해 놓은 Conn id

                       iii.         Parameters : 쿼리 내에 동적으론 변수 처리되어 사용될 수 있는 변수들의 값

                       iv.         autocommit : 자동으로 commit될지 여부 기본값은 False

                        v.         database : 연결할 테이터베이스

D.     설치

pip install apache-airflow-providers-mysql

 

2)   SQL Server

A.     airflow.providers.microsoft.mssql.operators.mssql 를 이용하여 SQL Server의 쿼리를 실행한다.

B.      해당 PyPI 정보

https://pypi.org/project/apache-airflow-providers-microsoft-mssql/

C.      해당 클래스 정보

class airflow.providers.microsoft.mssql.operators.mssql.MsSqlOperator(*, 

sql: str

mssql_conn_idstr = 'mssql_default'

parameters: Optional[Union[Mapping, Iterable]] = None

autocommit: bool = False

database: Optional[str] = None, **kwargs)

 

                         i.         sql : 실행할 쿼리

                        ii.         mysql_conn_id : Connections에 설정해 놓은 Conn id

                       iii.         Parameters : 쿼리 내에 동적으론 변수 처리되어 사용될 수 있는 변수들의 값

                       iv.         autocommit : 자동으로 commit될지 여부 기본값은 False

                        v.         database : 연결할 테이터베이스

D.     설치

pip install apache-airflow-providers-microsoft-mssql

 

3.   Mysql -> MSSQL 예제

Mysql Database에 테이블을 생성하고 데이터를 입력하는 사이에 SQL Server에 테이블을 생성하고 이관하는 Basic한 예제 flow 입니다.


<전체 소스는 https://github.com/YunhoJIn/airflow/blob/main/mysql_to_mssql_basic.py >

1)   DAG 선언

A.     기본적으로 DAG 선언 시 아래 소스와 같이 선언 후 각 Taskdag=dag 형태로 붙여 사용이 가능하다.

dag = DAG(

    'example_mysql',

    default_args=default_args,

    start_date=days_ago(2),

    tags=['example'],

)

 

B.      하지만 TaskGroup을 사용하기 위해서 상위 depth DAG가 선언되어 있어야 함으로 with 구문으로 선언하고 있으며 airflow 2.x 버전 이후로는 해당 방식으로의 처리를 권장하고 있다.

with DAG(dag_id="mysql_to_mssql"start_date=days_ago(2), tags=['mysql','mssql']) as dag:

 

C.      dag 구문에서의 parameter 사용법

                         i.         문자열 구문으로 구성된 SQL 값들 중에는 Param 값을 이용하여 제어가 가능하다.

                        ii.         아래 구문과 같이 해당 값을 변수 처리 가능하다.

mysql_insert_sql = "INSERT INTO {{ params.table }} ({{ params.col1 }}) VALUES( {{ params.val1 }} )"

 

                       iii.         그리고 Task 선언문 안에 아래와 같이 설정하여 변수에 값을 Key:Value 형태로 사용할 수 있다.

params={

     "table":"airflow_test",

     "col1":"a",

     "val1":"1"

},

 

2)   TaskGroup

A.     여러 Task를 묶어주는 Group

B.      아래와 같이 선언하여 사용되며 추후 dag flow를 정할 수 TaskGroup명만으로 사용 한다.

                         i.         선언

with TaskGroup("mssql_create"

tooltip="Tasks for mssql_create"as mssql_create:

   mssql_drop_sql = "DROP TABLE IF EXISTS {{ params.table }} ;"

 

                        ii.         사용

start >> mysql_create >> [insert_mysql_taskmssql_create] >> transform_task >> end

 

3)   DummyOperator

A.     작업이 없는 빈 Task

B.      시작과 끝의 구분을 주기 위해 사용하였다.

start = DummyOperator(task_id="start")

end = DummyOperator(task_id="end")

 

4)   MySqlOperator

mysql_insert_sql = "INSERT INTO {{ params.table }} ({{ params.col1 }}) VALUES( {{ params.val1 }} )"

 

insert_mysql_task = MySqlOperator(

    task_id='insert_data_mysql',

    mysql_conn_id='mysql_local',

    database='test',

    sql=mysql_insert_sql,

    params={

        "table":"airflow_test",

        "col1":"a",

        "val1":"1"

        },

    dag=dag

)

 

A.     Mysql에 쿼리를 실행할 수 있다.

B.      해당 쿼리 후의 데이터를 받아 다음 Task에서 사용하려면 좀 더 조치가 필요하다.

5)   MsSqlOperator

mssql_create_sql = "CREATE TABLE {{ params.table }} (a int)"

 

create_table_mssql_task = MsSqlOperator(

task_id='create_table_mssql',

mssql_conn_id='mssql_local',

database='test',

sql=mssql_create_sql,

params={"table":"airflow_test"},

dag=dag

)

 

A.     MSSQL에 쿼리를 실행할 수 있다.

B.      MySqlOperator와 같은 방식이다.

6)   GenericTransfer

dest_table = "{{ params.database }}.{{ params.schema }}.{{ params.table }}"

trans_sql = "select {{ params.col }} from {{ params.database }}.{{ params.table }}"

 

transform_task = GenericTransfer(

    task_id = "trans_data",

    source_conn_id = "mysql_local",

    destination_conn_id = "mssql_local",

    destination_table = dest_table,

    sql=trans_sql,

    params={

        "database":"test",

        "schema":"dbo",

        "table":"airflow_test",

        "col":"a"

        },

    dag=dag

)

 

A.     서로 다른 두 Provider의 데이터 이관을 처리하는 Operator

B.      source 에 쿼리를 실행하여 결과값을 destination 의 지정 table에 값을 insert 하는 형태로 이루어 진다.

 

4.   출처

1)      https://github.com/apache/airflow/blob/main/airflow/providers/mysql/example_dags/example_mysql.py

2)      https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html

3)      https://airflow.apache.org/docs/apache-airflow-providers-mysql/stable/operators.html

4)      https://airflow.apache.org/docs/apache-airflow-providers-microsoft-mssql/stable/_api/airflow/providers/microsoft/mssql/operators/mssql/index.html

5)      https://www.programmersought.com/article/1960751958/

 

728x90

'BigData > Dataflow' 카테고리의 다른 글

Airflow Operator 정리  (0) 2021.05.26
Airflow 병렬 프로세스 및 설치  (0) 2021.05.17
Airflow 기초 설치  (0) 2021.04.27
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기