Airflow - một số ghi chép
Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.
-
Viết các functions (tasks) luôn cho mọi kết quả giống nhau với các input giống nhau (stateless).
- Tránh sử dụng global variables, random values, hardware timers.
-
Một số tính năng nên biết
depends_on_past
sử dụng khi viết DAGs để chắc chắn mọi task instance trước đó đều success.LatestOnlyOperator
để skip một số bước phía sau nếu một số task bị trễ.BranchPythonOperator
cho phép rẽ nhánh workflow tùy vào điều kiện được định nghĩa.
-
Sử dụng
airflow test <dag-id> <task-id> ...
để test task instance trên local khi code. -
Sử dụng Docker Compose để thiết lập môi trường local cho dễ.
-
Để test DAG với scheduler, hãy set
schedule_interval=@once
, chạy thử, để chạy lại thì chỉ cần clear DagRuns trên UI hoặc bằng lệnhairflow clear
-
Khi DAG đã được chạy, airflow chứa các task instance trong DB. Nếu bạn thay đổi
start_date
hoặc interval, scheduler có thể sẽ gặp lỗi. Nên đổi têndag_id
nếu muốn thay đổistart_date
hoặc interval. -
Sử dụng Bitshift thay vì
set_upstream()
andset_downstream()
để code dễ nhìn hơn, ví dụop1 >> op2 # tương đương: op1.set_downstream(op2) op1 >> op2 >> op3 << op4 # tương đương: # op1.set_downstream(op2) # op2.set_downstream(op3) # op3.set_upstream(op4) op1 >> [op2, op3] >> op4 # tương đương # op1 >> op2 # op1 >> op3 # op2 >> op4 # op3 >> op4 # hoặc tương đương # op1.set_downstream([op2, op3]) # op2.set_downstream(op4) # op3.set_downstream(op4)
-
Sử dụng
Variables
để lưu trữ params của DAGs (Admin
->Variables
)from airflow.models import Variable foo = Variable.get("foo") bar = Variable.get("bar", deserialize_json=True) baz = Variable.get("baz", default_var=None)
hoặc sử dụng variable trong jinja template:
echo {{ var.value.<variable_name> }}
-
Sử dụng default arguments để tránh lặp lại các tham số
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'params': { 'foo': 'baz' } } with DAG(dag_id='airflow', default_args=default_args): op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1') op2 = BigQueryOperator(task_id='query_2', sql='SELECT 2') op1 >> op2
-
Lưu password, token trong
Connections
from airflow.hooks.base_hook import BaseHook aws_token = BaseHook.get_connection('aws_token').password
-
Có thể generate DAG một cách tự động, ví dụ
def create_dag(id): dag = DAG(f'dag_job_{id}', default_args) op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1', dag=dag) ... return dag for i in range(100): globals()[f'dag_job_{id}'] = create_dag(id)