Airflow pipeline을 만들어보자
DAG
- DAG
- node들이 task이고 edge가 각 task의 dependency를 의미한다
Operator
-
operator
- one task in data pipeline
- 하나의 operator에는 하나의 task만 넣어라
- 특히 dependency가 있는 task의 경우는 더 그래야한다
-
3 types of operator
- Action operators: Execute an action
- Transfer operators: Transfer data
- Sensors: Wait for a condition to be met
pipeline 만들기
만들고자 하는 pipeline은 다음과 같다.
1
|
creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user
|
먼저 webserver와 scheduler를 활성화한다.
1
2
|
airflow webserver
airflow scheduler
|
creating_table
airflow를 설치하면 최소한의 패키지만 설치되기 때문에 추가적으로 패키지가 필요하면 설치해야 한다. 지금 만드려고 하는 db의 경우 sqlite를 이용할 것이기 때문에 sqlite operator가 필요하다. 링크에 가면 third-party와의 연결을 위한 package들이 있다. pip install apache-airflow-providers-sqlite
으로 설치를 해준다.
파이썬 가상환경을 활성화해주면
그리고 아래와 같이 user_processing.py
파일을 만들어준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from datetime import datetime
# default로 넣고 싶은 인자들
default_args = {
'start_date': datetime(2022, 4, 20)
}
with DAG(
'user_processing', # dag id는 unique 해야한다: 'user_processing'
schedule_interval='@daily', # start_date이후로 얼마나 자주 DAG가 run되야하는지
default_args=default_args,
catchup=False
) as dag:
# define the task/operator
creating_table = SqliteOperator(
task_id='creating_table', # 하나의 pipeline에서 unique 해야함
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
|
그리고 Admin -> Connections 에 가서 connection을 아래와 같이 추가해줘야 한다. conn_id는 위에서 정한 db_sqlite
로 해줘야하고 Host
는 아래와 같이 airflow.db
가 있는 path를 해주면 된다.
- test
airflow tasks test user_processing creating_table 2022-04-23
is_api_available
task이름 그래도 api가 작동하는지 확인하는 것이다. 기존 airflow의 example 데이터를 이용한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from datetime import datetime
# default로 넣고 싶은 인자들
default_args = {
'start_date': datetime(2022, 4, 20)
}
with DAG(
'user_processing', # dag_id는 unique 해야한다: 'user_processing'
schedule_interval='@daily',
default_args=default_args,
catchup=False
) as dag:
# define the task/operator
creating_table = SqliteOperator(
task_id='creating_table', # 하나의 pipeline에서 unique 해야함
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
|
- test
airflow tasks test user_processing is_api_available 2022-04-23
airflow example 데이터를 extract하는 task를 생성한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime
import json
# default로 넣고 싶은 인자들
default_args = {
'start_date': datetime(2022, 4, 20)
}
with DAG(
'user_processing', # dag_id는 unique 해야한다: 'user_processing'
schedule_interval='@daily',
default_args=default_args,
catchup=False
) as dag:
# define the task/operator
creating_table = SqliteOperator(
task_id='creating_table', # 하나의 pipeline에서 unique 해야함
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
|
- test
airflow tasks test user_processing is_api_available 2022-04-23
test를 진행하면 그 결과가 나온다. 아래는 GET을 통해 가져온 데이터를 의미한다.
1
2
3
|
[2022-04-23 06:35:15,589] {http.py:140} INFO - Sending 'GET' to url: https://randomuser.me/api/
[2022-04-23 06:35:16,104] {http.py:115} INFO - {"results":[{"gender":"female","name":{"title":"Mrs","first":"Lonne","last":"Schilstra"},"location":{"street":{"number":4722,"name":"Jaspis"},"city":"Zutphen","state":"Flevoland","country":"Netherlands","postcode":64338,"coordinates":{"latitude":"40.3466","longitude":"58.3229"},"timezone":{"offset":"+4:30","description":"Kabul"}},"email":"lonne.schilstra@example.com","login":{"uuid":"897a7df1-9964-4baf-9b3c-001f9cc815ef","username":"smallbear142","password":"chrysler","salt":"LysRP72T","md5":"92f39f731719df04767870a0cdcfb43d","sha1":"2cedb9b8ae797b56ddf40e8425db36df1f4866ee","sha256":"a1fd36358cf8d94e243ccfba37d9478b4b129bff367aa2ff218a4c975851ef11"},"dob":{"date":"1949-10-26T18:21:22.653Z","age":73},"registered":{"date":"2019-06-05T08:17:56.847Z","age":3},"phone":"(875)-139-9718","cell":"(524)-611-0968","id":{"name":"BSN","value":"47403055"},"picture":{"large":"https://randomuser.me/api/portraits/women/55.jpg","medium":"https://randomuser.me/api/portraits/med/women/55.jpg","thumbnail":"https://randomuser.me/api/portraits/thumb/women/55.jpg"},"nat":"NL"}],"info":{"seed":"ff9863d9e80ca539","results":1,"page":1,"version":"1.3"}}
[2022-04-23 06:35:16,123] {taskinstance.py:1184} INFO - Marking task as SUCCESS. dag_id=user_processing, task_id=extracting_user, execution_date=20220423T000000, start_date=20220423T060222, end_date=20220423T063516
|
processing_user
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
from pandas import json_normalize
# default로 넣고 싶은 인자들
default_args = {
'start_date': datetime(2022, 4, 20)
}
# extracting_user 를 통해 얻은 결과를 이용
# airflow tasks test user_processing extracting_user 2022-04-23
def _processing_user(ti):
# extracting_user task의 결과가 metastore에 저장이 되고 이 결과를 pull 한다
users = ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError('User is empty')
user = users[0]['results'][0]
# json_normalize로 dict를 pd.DataFrame으로 변환
processed_user = json_normalize({
'first_name': user['name']['first'],
'lastname': user['name']['last'],
'country': user['location']['country'],
'username': user['login']['username'],
'password': user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)
with DAG(
'user_processing', # dag_id는 unique 해야한다: 'user_processing'
schedule_interval='@daily',
default_args=default_args,
catchup=False
) as dag:
# define the task/operator
creating_table = SqliteOperator(
task_id='creating_table', # 하나의 pipeline에서 unique 해야함
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
|
- test
airflow tasks test user_processing is_api_available 2022-04-23
test를 진행하면 processed_user.csv가 생성된다. cat /tmp/processed_user.csv
를 하면 아래와 같은 결과가 나온다.
1
|
Mercedes,Jimenez,Spain,goldenbutterfly721,darius,mercedes.jimenez@example.com
|
storing_user
이제 /home/airflow/airflow/airflow.db
에 user를 저장해보자. 처음에 sqlite3 airflow.db
로 sqlite에 접속하고 SELECT * FROM users;
하면 아무것도 없다가 나중에 storing_user
task를 실행하면 그 유저가 들어간 것을 확인 할 수 있다.
task 순서(dependency)정하기
>>
를 통해서 dependency를 알려준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
from airflow.models import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
import json
from pandas import json_normalize
# default로 넣고 싶은 인자들
default_args = {
'start_date': datetime(2022, 4, 20)
}
# extracting_user 를 통해 얻은 결과를 이용
# airflow tasks test user_processing extracting_user 2022-04-23
def _processing_user(ti):
# extracting_user task의 결과가 metastore에 저장이 되고 이 결과를 pull 한다
users = ti.xcom_pull(task_ids=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError('User is empty')
user = users[0]['results'][0]
# json_normalize로 dict를 pd.DataFrame으로 변환
processed_user = json_normalize({
'first_name': user['name']['first'],
'lastname': user['name']['last'],
'country': user['location']['country'],
'username': user['login']['username'],
'password': user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)
with DAG(
'user_processing', # dag_id는 unique 해야한다: 'user_processing'
schedule_interval='@daily',
default_args=default_args,
catchup=False
) as dag:
# define the task/operator
creating_table = SqliteOperator(
task_id='creating_table', # 하나의 pipeline에서 unique 해야함
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE IF NOT EXISTS users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
storing_user = BashOperator(
task_id='storing_user',
bash_command='echo -e ".separator ","\n.import /tmp/processed_user.csv" | sqlite3 /home/airflow/airflow/airflow.db'
)
creating_table >> is_api_available >> extracting_user >> processing_user >> storing_user
|
Reference