Contents

[Airflow] Use plugin in Airflow

Contents

Airflow에서 plugin을 이용하여 사용반경을 넓혀보자.

먼저 Elasticsearch를 설치해보자.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
curl -fsSL https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -

# 그리고 비밀번호 친다

echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list

sudo apt update && sudo apt install elasticsearch

# 설치
pip install elasticsearch==7.10.1

# 시작
sudo systemctl start elasticsearch

# 잘 되는지 확인
curl -X GET 'http://localhost:9200'
  • 사용자만의 own
    • operators를 커스텀
    • views를 통해서 원하는 ui 만들기
    • hooks을 통해서 서드파티를 이용가능

직접 원하는대로 python module을 만들어서 사용할 수 있다.

이제 Elasticsearch와 interact할 수 있는 plugin을 만들어보자. postgreSQL에서 Elasticsearch로 data를 보내는 hook과 operator를 아래와 같이 만들 수 있다.

먼저, plugins(이름을 잘 지켜야함)라는 폴더를 만들고 그 안에 elasticsearch_plugin이라는 폴더를 만든다. 그리고 그 폴더안에 hooks, operators라는 폴더를 만든다. 그리고 각 폴더에 elastic_hook.py, postgres_to_elastic.py라는 파일을 만든다. 두 파일 모두 Airflow에 있는 Base hook, operator를 상속받아서 class를 만들어 사용하는 코드를 담고 있다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from airflow.hooks.base import BaseHook
from elasticsearch import Elasticsearch

class ElasticHook(BaseHook):
    def __init__(
        self,
        conn_id='elasticsearch_default',
        *args,
        **kwargs
        ):
        super().__init__(*args, **kwargs)
        
        # BaseHook에 있는 함수
        conn = self.get_conntetion(conn_id)

        conn_config = {}
        hosts = []

        if conn.host:
            hosts = conn.host.split(',')
        if conn.port:
            conn_config['port'] = int(conn.port)
        if conn.login:
            conn_config['http_auth'] = (conn.login, conn.password)

        self.es = Elasticsearch(hosts, **conn_config)
        self.index = conn.schema

    def info(self):
        return self.es.info()

    def set_index(self, index):
        self.index = index

    def add_doc(self, index, doc_type, doc):
        self.set_index(index)
        res = self.es.index(index=index, doc_type=doc_type, body=doc)
        return res
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from elasticsearch_plugin.hooks.elastic_hook import ElasticHook

from contextlib import closing
import json

class PostgresToElasticOperator(BaseOperator):
    def __init__(
        self,
        sql,
        index,
        postgres_conn_id='postgres_default',
        elastic_conn_id='elasticsearch_default',
        *args,
        **kwargs
        ):
        super(PostgresToElasticOperator, self).__init__(*args, **kwargs)

        self.sql = sql
        self.index = index
        self.postgres_conn_id = postgres_conn_id
        self.elastic_conn_id = elastic_conn_id

    def execute(self, context):
        es = ElasticHook(conn_id=self.elastic_conn_id)
        pg = PostgreHook(postgres_conn_id=self.postgres_conn_id)
        with closing(pg.get_conn()) as conn:
            with closing(conn.cursor()) as cur:
                cur.itersize = 1000
                cur.execute(self.sql)
                for row in cur:
                    doc = json.dumps(row, indent=2)
                    es.add_doc(index=self.index, doc_type='external', doc=doc)

그리고 dag을 만들어준다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from airflow import DAG
from elasticsearch_plugin.hooks.elastic_hook import ElasticHook
from elasticsearch_plugin.operators.postgres_to_elastic import PostgresToElasticOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

default_args = {
    'start_date': datetime(2022, 5, 5)
}

def _print_es_info():
    hook = ElasticHook()
    print(hook.info())

with DAG(
    'elastcisearch_dag',
    schedule_interval='@daily',
    default_args=default_args,
    catchup=False
) as dag:

    print_es_info = PythonOperator(
        task_id=-'print_es_info',
        python_callable=_print_es_info
    )

    connections_to_es = PostgresToElasticOperator(
        task_id='connections_to_es',
        sql='SELECT * FROM connection',
        index='connections'
    )

    print_es_info >> connections_to_es

web에서 connection을 추가해준다.

그리고 postgresql에 들어가서 user를 추가해준다.

1
2
sudo -u postgres psql
ALTER USER postgres PASSWORD 'postgres';

이제 dag을 실행시키고 아래 명령어를 통해 postgreSQL에서 Elasticsearch로 데이터를 보낸 것을 확인할 수 있다.

1
curl -X GET "http://localhost:9200/connections/_search" -H "Content -type: application/json" -d '{"query":{"match_all":{}}}'