Airflow Operator 정리

BigData/Dataflow / /
728x90

 

1.   역할

1)      DAG의 노드가 되는 특정 유형의 작업을 생성할 수 있도록 함

2)      모든 Operator Base Operator 에서 상속되어 생성됨

3)      모든 Sensor Base Sensor Operator에서 상속되어 생성됨

4)      모든 Sensors Base Sensor Opeatior timeout poke_interval 을 상속함

5)      연산자는 3가지 유형을 가짐

(1)    작업을 수행하거나 다른 시스템에 작업을 수행하도록 하는 지시하는 Action Operator

(2)    한 시스템에서 다른 시스템으로 데이터를 이동 시키는 Transfer Operator

(3)    특정 조건이 충족될 때까지 반복으로 실행되는 Sensor Operator

2.   packages

1)   airflow.operators

(1)    providers 패키지가 추가됨으로써 2.0에 남은 패키지만 설명

(2)    airflow.operators.bash

I.       Bash Shell Script를 실행하는 Operator

II.      Syntax

class airflow.operators.bash.BashOperator(*, 

bash_command: str

env: Optional[Dict[strstr]] = None

output_encoding: str = 'utf-8'

**kwargs)

 

III.    Parameters

A.     bash_command(str) : bash Shell Script

B.      env(dict) : 기본 값은 none 이나 사용될 경우 dict 형태의 환경변수 선언하는 내용을 기술

C.      output_encoding(str) : bash_command의 출력값 인코딩

IV.    example

bash_task = BashOperator(

    task_id="bash_task",

    bash_command='echo "here is the message: \'$message\'"',

    env={'message''{{ dag_run.conf["message"] if dag_run else "" }}'},

    output_encoding='utf-8',

)

 

(3)    airflow.operators.branch

I.       분기를 설정하는 Operator, 설정 method와 실행 method로 나뉨

II.      choose_branch : 분기를 설정하는 method

A.     Syntax

class airflow.operators.branch.BaseBranchOperator.choose_branch(self, context: Dict)

 

III.    execute : 실행하는 method

A.     Syntax

class airflow.operators.branch.BaseBranchOperator.execute(self, context: Dict)

 

IV.    Parameters

A.     context (dict) : 분기를 진행하기 원하는 정보

V.      example

options = ['branch_a''branch_b''branch_c''branch_d']

 

branching = BranchPythonOperator(

    task_id='branching',

    python_callable=lambda: random.choice(options),

)

 

(4)    airflow.operators.dummy

I.       아무 작업도 하지 않는 연산자로 여러 다른 작업들을 그룹화 하는데 사용

II.      Syntax

class airflow.operators.dummy.DummyOperator(**kwargs)

 

III.    example

with TaskGroup("section_1"tooltip="Tasks for section_1"as section_1:

    task_1 = DummyOperator(task_id="task_1")

    task_2 = BashOperator(task_id="task_2"bash_command='echo 1')

    task_3 = DummyOperator(task_id="task_3")

 


 

(5)    airflow.operators.email

I.       이메일 발송

II.      Syntax

class airflow.operators.email.EmailOperator(*, to: Union[List[str], str], 

subject: str

html_content: str

files: Optional[List] = None

cc: Optional[Union[List[str], str]] = None

bcc: Optional[Union[List[str], str]] = None

mime_subtype: str = 'mixed'

mime_charset: str = 'utf-8'

conn_id: Optional[str] = None, **kwargs)

 

III.    Parameters

A.     to(str or list[str]) : 받는 사람

B.      subject(str) : 제목

C.      html_content(str) : 이메일에 html 요소 사용 여부

D.     files(list) : 이메일에 첨부된 파일 이름

E.      cc(str or list[str]) : 참조

F.      bcc(str or list[str]) : 숨겨진 참조

G.     mime_subtype(str) : MIME 하위 컨텐츠 타입

가)    text/plain : 단순 텍스트

나)    multipart/mixed : 첨부된 내용이 있는 메시지

다)    multipart/alternative : html과 다른 포맷을 함께 보낸 메시지

H.     mime_charset : Content-Type 해더의 Charset

IV.    example

email = EmailOperator(

        task_id='send_email',

        to='to@gmail.com',

        subject='Airflow Alert',

        html_content=""" <h3>Email Test</h3> """,

        dag=dag

)

 

(6)    airflow.operators.generic_transfer

I.       Source에서 Destination 으로 데이터를 복사 한다.

II.      Syntax

class airflow.operators.generic_transfer.GenericTransfer(*, sql: str

destination_table: str

source_conn_id: str

destination_conn_id: str

preoperator: Optional[Union[str, List[str]]] = None, **kwargs)

 

III.    Parameters

A.     sql (str) : SQL 쿼리

B.      destination_table (str) : 저장될 곳의 tablea

C.      source_conn_id (str) : Source 연결 정보

D.     destination_conn_id (str) : destination 연결 정보

E.      preoperator (str or list[str]) : 복사 실행 전 실행된 SQL

IV.    example

sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"

 

t = operators.generic_transfer.GenericTransfer(

    task_id='test_m2m',

    preoperator=[

        "DROP TABLE IF EXISTS test_mysql_to_mysql",

        "CREATE TABLE IF NOT EXISTS "

        "test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES"

    ],

    source_conn_id='airflow_db',

    destination_conn_id='airflow_db',

    destination_table="test_mysql_to_mysql",

    sql=sql,

    dag=self.dag)

 

(7)    airflow.operators.latest_only

I.       airflowTask는 선언된 schedule에 의해 실행되는데 시작 날짜 이후 실행된 log가 없다면 최신 날짜까지 모두 실행되기도 한다.(backfill) 이때 해당 operator는 가장 최근 일만 실행되도록 한다.

II.      Syntax

class airflow.operators.latest_only.LatestOnlyOperator

 

III.    example

latest_only = LatestOnlyOperator(task_id='latest_only'dag=dag)

 

task1 = DummyOperator(task_id='task1'dag=dag)

task1.set_upstream(latest_only)

 

 

(8)    airflow.operators.python

I.       Python Source를 실행하기 위한 Operator

II.      PythonOperator

A.     Python을 실행

B.      Syntax

class airflow.operators.python.PythonOperator(*, python_callable: Callable

op_args: Optional[List] = None

op_kwargs: Optional[Dict] = None

templates_dict: Optional[Dict] = None

templates_exts: Optional[List[str]] = None, **kwargs)

 

C.      Parameters

가)    python_callable (python callable) : 호출할 Python 개채

나)    op_kwargs (dict) : python 개체 함수에 들어갈 키워드와 인수 정보

다)    op_args (dict) : python 개체 함수에 들어갈 인수 정보

D.     example

def my_sleeping_function(random_base):

    """This is a function that will run within the DAG execution"""

    time.sleep(random_base)

 

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively

for i in range(5):

    task = PythonOperator(

        task_id='sleep_for_' + str(i),

        python_callable=my_sleeping_function,

        op_kwargs={'random_base'float(i) / 10},

    )

 

III.    BranchPythonOperator

A.     Python 함수를 통해 분기

B.      Syntax

class airflow.operators.python.BranchPythonOperator

 

C.      example

def which_path():

  '''

  return the task_id which to be executed

  '''

  if True:

    task_id = 'path_A'

  else:

    task_id = 'path_B'

  return task_id

 

check_situation = BranchPythonOperator(

    task_id='check_situation',

    python_callable=which_path,

    dag=dag,

    )

 

IV.    ShortCircuitOperator

A.     bool 조건에 맞을 때만 실행

B.      Syntax

class airflow.operators.python.ShortCircuitOperator

 

C.      example

cond_true = ShortCircuitOperator(

    task_id='condition_is_True',

    python_callable=lambdaTrue,

)

 

ds_true = [DummyOperator(task_id='true_' + str(i)) for i in [12]]

 

chain(cond_true, *ds_true)

 

V.      PythonVirtualenvOperator

A.     Python 가상 환경에서 실행

B.      Syntax

class airflow.operators.python.PythonVirtualenvOperator(*, python_callable: Callable

requirements: Optional[Iterable[str]] = None

python_version: Optional[Union[strintfloat]] = None

use_dill: bool = False

system_site_packages: bool = True

op_args: Optional[List] = None

op_kwargs: Optional[Dict] = None

string_args: Optional[Iterable[str]] = None

templates_dict: Optional[Dict] = None

templates_exts: Optional[List[str]] = None, **kwargs)

 

C.      Parameter

가)    python_callale (function) : 외부 변수가 사용되지 않은 python 함수

나)    requirement (list[str]) : 환경 구성(pip install command)

다)    op_kwargs (dict) : python 개체 함수에 들어갈 키워드와 인수 정보

라)    op_args (dict) : python 개체 함수에 들어갈 인수 정보

D.     example

def callable_virtualenv():

    """

    Example function that will be performed in a virtual environment.

 

    Importing at the module level ensures that it will not attempt to import the

    library before it is installed.

    """

    from time import sleep

 

    from colorama import BackForeStyle

 

    print(Fore.RED + 'some red text')

    print(Back.GREEN + 'and with a green background')

    print(Style.DIM + 'and in dim text')

    print(Style.RESET_ALL)

    for _ in range(10):

        print(Style.DIM + 'Please wait...'flush=True)

        sleep(10)

    print('Finished')

 

virtualenv_task = PythonVirtualenvOperator(

    task_id="virtualenv_python",

    python_callable=callable_virtualenv,

    requirements=["colorama==0.4.0"],

    system_site_packages=False,

)

 

(9)    airflow.operators.sql

I.       SQL 구문을 실행

II.      BaseSQLOperator

A.     DB연결 hook을 리턴 (hook - 부 플랫폼, 데이터베이스에 접근할 수 있도록 만든 인터페이스)

B.      Syntax

class airflow.operators.sql.BaseSQLOperator(*, conn_id: Optional[str] = None

database: Optional[str] = None, **kwargs)

 

C.      Parameter

가)    conn_id (str) : DB 연결 정보

나)    database (str) : 연결할 Database

III.    SQLCheckOperator

A.     SQL 쿼리를 실행하고 응답에서 단일 행을 수신할 것으로 예상하고 행의 모든 값을 Bool로 캐스팅하려고 시도

B.      Syntax

class airflow.operators.sql.SQLCheckOperator(*, sql: strconn_id: Optional[str] = None

database: Optional[str] = None, **kwargs)

 

C.      Parameter

가)    sql (str) : 단일 행 반환 SQL 쿼리

나)    conn_id (str) : DB 연결 정보

다)    database (str) : 연결할 Database

D.     example

operator = SQLCheckOperator(

     sql="SELECT COUNT(*) FROM some_table WHERE some_column='{{ yesterday_ds_nodash }}'"

)

 

(10)  airflow.operators.subdag

I.       DagSubDag로 실행되도록 함

II.      Syntax

class airflow.operators.subdag.SubDagOperator(*, subdag: DAG

session: Optional[Session] = None

conf: Optional[Dict] = None

propagate_skipped_state: Optional[SkippedStatePropagationOptions] = None, **kwargs)

 

III.    Parameter

A.     subdag : 해당 DagSub로 등록

B.      session : sqlalchemy 세션

C.      conf : Sub Dag 구성

D.     propagate_skipped_state : Dagdownstream 작업에 전파되어야 하는지 여부를 정의

IV.    Example

section_1 = SubDagOperator(

    task_id='section-1',

    subdag=subdag(DAG_NAME, 'section-1', args),

)

 

(11)  airflow.operators.trigger_dagrun

I.       지정된 DAG를 실행하는 트리거

II.      Syntax

class airflow.operators.trigger_dagrun.TriggerDagRunOperator(*, trigger_dag_id: str

conf: Optional[Dict] = None

execution_date: Optional[Union[str, datetime.datetime]] = None

reset_dag_run: bool = False

wait_for_completion: bool = False

poke_interval: int = 60

allowed_states: Optional[List] = None

failed_states: Optional[List] = None, **kwargs)

 

III.    Parameter

A.     trigger_dag_id (str) : 트리거할 dag_id

B.      conf (dict) : dag 실행에 대한 구성

C.      execution_date (str or datetime.datetime) : 실행할 날짜

D.     reset_dag_run (bool) : 이미 실행되고 있는 dag라면 기존의 실행을 지우고 재실행을 하는지 여부

E.      wait_for_completion (bool) : 이전 dag의 실행완료를 기다릴지 여부

F.      poke_interval (int) : wait_for_completion=True일 때 dag 실행 상태를 확인하기 위한 interval

IV.    Example

trigger = TriggerDagRunOperator(

    task_id="test_trigger_dagrun",

    trigger_dag_id="example_trigger_target_dag",  

    conf={"message""Hello World"},

)

 

 

3.    출처

1)      https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html

2)      https://stackoverflow.com/questions/51829200/how-to-set-up-airflow-send-email

3)      https://programtalk.com/python-examples/airflow.operators.generic_transfer.GenericTransfer/

4)      https://getchan.github.io/data/airflow_2/

5)      https://moons08.github.io/programming/airflow-branch/

6)      https://www.mikulskibartosz.name/data-quality-checks-using-sqlcheckoperator/

728x90

'BigData > Dataflow' 카테고리의 다른 글

Airflow - 간단한 Mysql to MSSQL(SQL Server) 이관 예제  (2) 2021.06.16
Airflow 병렬 프로세스 및 설치  (0) 2021.05.17
Airflow 기초 설치  (0) 2021.04.27
  • 네이버 블러그 공유하기
  • 네이버 밴드에 공유하기
  • 페이스북 공유하기
  • 카카오스토리 공유하기