Nếu như bạn muốn biết được air flow là gì thì hãy đọc bài viết của chúng mình nhé. Chúng mính sẽ luôn giúp bạn giải đáp được những thắc mắc từ đơn giản cho tới phức tạp trong cuộc sống này. Chính vì thế hãy luôn cạnh bên chúng mình bạn à, hãy ủng hộ chúng mình bằng cách đọc bài viết mà chúng mình đăng tải trên web để có được đáp án cho thắc mắc air flow là gì nhé bạn.
Air flow là gì
Nếu như muốn biết air flow là gì ấy thì bạn không nên bỏ qua bài viết này đâu bạn à. Bởi bài viết này sẽ cho bạn có được đáp án cho thắc mắc air flow là gì sau khi bạn đọc xong ấy. Vì thế mà đừng bỏ lỡ bài viết này bạn nhé. Bởi khi đọc bạn sẽ biết thêm được một điều thú vị, một điều hay ho trong cuộc sống này ấy. Như thế cuộc đời của bạn sẽ đẹp đẽ hơn nhiều bạn à.
dòng khí
-
- air flow meter
- lưu lượng kế dòng khí
- air flow rate
- tốc độ dòng khí
- air flow requirements
- nhu cầu dòng khí
- cooling air flow
- dòng khí lạnh
- refrigerating air flow
- dòng khí lạnh
dòng không khí
-
- air flow diffusion
- khuếch tán dòng không khí
- air flow direction
- hướng dòng không khí
- air flow mixing
- hòa trộn dòng không khí
- air flow sensor
- bộ cảm biến dòng không khí
- air flow sensor
- đầu cảm do dòng không khí
- air flow sensor
- đầu cảm đo dòng không khí
- cooled air flow
- dòng không khí lạnh
- discharge air flow
- dòng không khí cấp
- even air flow distribution
- phân bố đồng đều dòng không khí
- exhaust air flow
- dòng không khí thải
- forced air flow
- dòng không khí cưỡng bức
- horizontal air flow
- dòng không khí thổi ngang
- intake air flow
- dòng không khí hút
- leakage air flow
- dòng không khí rò lọt
- leakage air flow
- dòng không khí thẩm thấu
- multijet air flow
- dòng không khí nhiều tia
- outdoor intake air flow
- dòng không khí bên ngoài vào
- rate of air flow
- tốc độ dòng không khí
- return air flow
- dòng không khí hồi
- supply air flow
- dòng không khí cấp
- two-directional air flow
- dòng không khí hai hướng
luồng gió
-
- intake air flow
- luồng gió hút
- rate of air flow
- tốc độ luồng gió
- rate of coolant air flow
- tốc độ luồng gió làm lạnh
- rate of coolant air flow
- tốc độ luồng gió lạnh cần thiết
- refrigerating air flow
- luồng gió lạnh
luồng khí
-
- air flow meter
- cảm biến lưu lượng khí
- cooling air flow
- luồng khí lạnh
Dag airflow la gì
Cùng đọc bài viết này để có thể biết được câu trả lời cho thắc mắc dag airflow la gì bạn nhé. Chúng mình tin chắc rằng những thông tin trong bài viết này sẽ khiến cho bạn bất ngờ lắm bạn à. Chính vì thế hãy luôn đồng hành cùng chúng mình để có thể biết được đáp án cho những thắc mắc như kiểu dag airflow la gì nhé bạn.
Airflow cũng phân phối API Endpoint. Bạn hoàn toàn hoàn toàn có thể dùng curl để thử hoặc dùng mấy framework như Postman
ENDPOINT_URL="http://localhost:8080/" curl -X GET \ --user "airflow:airflow" \ "${ENDPOINT_URL}/api/v1/pools"
Sau khi bạn đăng nhập thông tin tài khoản airflow trên web http://localhost:8080
sẽ hiện ra một danh sách những DAG, chi tiết phần này mình sẽ nói phía dưới bởi DAG là khái niệm quan trọng của Airflow
P/s: Nếu bạn không thích WebUI chứa những file DAG mẫu mà nhà tăng trưởng cung ứng thì có thể thay đổi env AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
trong file docker-compose.yaml .
DAGs
1 DAG (Directed Acyclic Graph) sẽ link các tasks, kiến thiết xây dựng liên kết, tự động hóa chạy task, …
Dưới đấy là 1 ví dụ về DAG:
Hình phía trên gồm có 4 task: A, B, C, D có mối link với nhau và đuổi theo thứ tự mà người tiêu dùng quy định: a -> b, c -> d. Nếu task a oẳng thì sẽ dẫn đến task b, c, d oẳng theo. Bạn cũng hoàn toàn có thể tùy chỉnh thời hạn để những tasks này chạy vào thời điểm mong muốn. Bạn hoàn toàn có thể yên tâm một điều là DAG sẽ không còn chăm sóc đến code của bạn, mục tiêu của nó chỉ là workflow mà thôi.
Có 3 phương pháp để khai báo DAG trong code của bạn:
- Context manager
- Standard constructor
Context manager
Với cách này các các bạn sẽ bọc code của mình trong DAG bằng with
with DAG( "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False ) as dag: op = EmptyOperator(task_id="task")
Standard constructor
Với cách này, bạn sẽ khai báo một constructer gọi tới class DAG
my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) op = EmptyOperator(task_id="task", dag=my_dag)
Decorator
Nếu bạn quen với Python decorator thì có lẽ sẽ quen với DAG decorator.
@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) def generate_dag(): op = EmptyOperator(task_id="task") dag = generate_dag()
Sự độc lạ giữa Context manager và Decorator
Nếu để chọn cách nào trong 3 cách trên thì tôi sẽ chọn viết DAG bằng decorator. Tại sao ? Bởi vì cách viết của context manager và standard constructor phức tạp, dài dòng và mơ hồ. Chính vì thế những phiên bản của Airflow từ 2.x trở lên phân phối thêm cách viết bằng decorator cho những người tiêu dùng thuận tiện sử dụng. Tôi sẽ để 2 file ETL python ( Extract-Transform-Load ) được viết bởi 2 cách context manager và decorator phía dưới cho bạn dễ so sánh.
- Context manager
import json from textwrap import dedent import pendulum from airflow import DAG from airflow.operators.python import PythonOperator with DAG( 'tutorial_etl_dag', # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={'retries': 2}, description='ETL DAG tutorial', schedule_interval=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: dag.doc_md = __doc__ def extract(**kwargs): ti = kwargs['ti'] data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' ti.xcom_push('order_data', data_string) def transform(**kwargs): ti = kwargs['ti'] extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data') order_data = json.loads(extract_data_string) total_order_value = 0 for value in order_data.values(): total_order_value += value total_value = {"total_order_value": total_order_value} total_value_json_string = json.dumps(total_value) ti.xcom_push('total_order_value', total_value_json_string) def load(**kwargs): ti = kwargs['ti'] total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value') total_order_value = json.loads(total_value_string) print(total_order_value) extract_task = PythonOperator( task_id='extract', python_callable=extract, ) extract_task.doc_md = dedent( """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ ) transform_task = PythonOperator( task_id='transform', python_callable=transform, ) transform_task.doc_md = dedent( """\ #### Transform task A simple Transform task which takes in the collection of order data from xcom and computes the total order value. This computed value is then put into xcom, so that it can be processed by the next task. """ ) load_task = PythonOperator( task_id='load', python_callable=load, ) load_task.doc_md = dedent( """\ #### Load task A simple Load task which takes in the result of the Transform task, by reading it from xcom and instead of saving it to end user review, just prints it out. """ ) extract_task >> transform_task >> load_task
import json import pendulum from airflow.decorators import dag, task @dag( schedule_interval=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) def tutorial_taskflow_api_etl(): """ ### TaskFlow API Tutorial Documentation This is a simple ETL data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) """ @task() def extract(): """ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict @task(multiple_outputs=True) def transform(order_data_dict: dict): """ #### Transform task A simple Transform task which takes in the collection of order data and computes the total order value. """ total_order_value = 0 for value in order_data_dict.values(): total_order_value += value return {"total_order_value": total_order_value} @task() def load(total_order_value: float): """ #### Load task A simple Load task which takes in the result of the Transform task and instead of saving it to end user review, just prints it out. """ print(f"Total order value is: {total_order_value:.2f}") order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"]) tutorial_etl_dag = tutorial_taskflow_api_etl()
- Import modules: 2 cách viết khác nhau sẽ sở hữu được 2 cách import thư viện khác nhau
- Context manager:
from airflow import DAG from airflow.operators.python import PythonOperator
from airflow.decorators import dag, task
- Khởi tạo DAG:
- Context manager: bọc toàn bộ những tasks với DAG bằng keyword
with
- Decorator: khởi tạo
@dag
đầu file
Do các tham số của 2 cách viết giống nhau, nên tôi chỉ lấy ví dụ của một cách viết.
@dag( start_date=datetime.now(), schedule_interval=None, catchup=False, tags=['example'], ... )
start_date: thời hạn bắt đầu
schedule_interval: lịch trình chạy ( @once, @hourly, @daily, @weekly, @monthly, @yearly )
catchup: nếu như khách hàng có start_date
, end_date
( optional ) và schedule_interval
thì khi bạn để giá trị của catchup là True thì schedule_interval
sẽ không còn bị giới hạn và thực thi tasks tức thì.
- Khởi tạo tasks trong DAG: những tasks ở đấy là extract, transform, loading.
- Context manager: truyền những function ETL vào PythonOperator ( Operator có nghĩa vụ và trách nhiệm giải quyết và xử lý tasks trong phần kiến trúc đề cập bên trên ) và gán 1 biến bất kỳ
extract_task = PythonOperator( task_id='extract', python_callable=extract, )
- Decorator: đơn giản dùng như python decorator
@task() def extract(): ...
- Viết doc cho function.
- Context manager: từ những biến tượng trưng cho những task gọi hàm
doc_md
extract_task.doc_md = dedent( """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ )
- Decorator: không biến hóa gì, viết doc trong hàm như bình thường
- Xác định thứ tự xử lý của những tasks: Chi tiết phần này mình sẽ đề cập ở bên dưới
- Context manager:
extract_task >> transform_task >> load_task
- Decorator: cách viết này sẽ rõ ràng hơn cho những người đọc
order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"])
Dagster là gì
Với câu hỏi dagster là gì này thì có nhiều nơi cung cấp cho bạn đáp án đúng không nào. Nhưng bạn có biết đâu là đáp án chuẩn xác, là đáp án đáng tin cậy không? Nếu như bạn muốn có câu trả lời ấy thì hãy đọc bài viết dưới đây nhé. Bởi bài viết này sẽ cho bạn biết câu trả lời chính xác của thắc mắc dagster là gì ấy.
- Kafka: mạng lưới mạng lưới hệ thống streaming được sử dụng nhiều nhất, thường nếu có kafka thì biết java sẽ là một plus point (không biết cũng không sao)
- Kinesis/ PubSub/…: Các hệ thống này thực ra cũng tựa như kafka như của cloud, nên chỉ việc có một kinh nghiệm tay nghề trong số này là được.
- Spark hoặc pySpark : Hệ thống xử lí tài liệu Bigdata -> đấy là requirement mà bạn phải quan tâm là có đi kèm theo với scala không, hay là chỉ python thôi (pySpark). Requirement này sẽ ảnh hưởng tác động tới công việc trong thực tiễn của ứng viên.
- AWS EMR: là một service của AWS, là vì công ty bạn đang dùng AWS chứ không hẳn là 1 requirement cứng(thường là vậy).
- ETL/ELT: đại khái là khái niệm DE nào thì cũng phải biết, không phải ngôn từ lập trình hay công cụ gì. (Nếu muốn tìm hiểu thêm hoàn toàn có thể xem tại đây)
- CI/CD: role này còn có nhu yếu kiến thức và kỹ năng tốt về software engineer hoặc có nhu yếu về kĩ năng dataops/ devops
- Data orchestration: Là một số ít công cụ như airflow, dagster, prefect,… biết một chiếc trong đống này đều phải có năng lực học những khác.
- Airflow: là một data orchestration tool (để riêng vì nó quá phổ biến)
- …. (sẽ update thêm khi rảnh rỗi)
Một vài term chết người (cân nhắc communicate kĩ với ứng viên)
- On-premise: công ty có server cứng , physical -> rất quan trọng cho ứng viên biết được bản thân họ có phải trực tiếp xử lí cái server cứng này sẽ không vì ảnh hưởng tới retention và hiệu quả việc làm của họ
- Cloud: ngược lại với on-premise là trên cloud -> Thường được prefer bởi mọi người
- Đang migration từ on-premise lên cloud: Cần làm rõ điều này với hiring manager để khớp kì vọng công việc. Không khéo vô được 2 ngày rồi nghỉ. (vì sao hỏi hiring manager nhé)
- Hadoop: làm rõ với hiring manager là Hadoop and map reduce hay là Hadoop filesystem (rất quan trọng).
Air flow pc
Hãy để cho câu trả lời cho thắc mắc air flow pc giúp cho bạn hiểu hơn về vấn đề này bạn à. Cuộc sống này ấy luôn tồn tại nhiều vấn đề mà bản thân bạn không biết đúng không nào. Chính vì thế hãy cho chúng mình một cơ hội giúp đỡ bạn, khiến cho bạn hiểu được air flow pc thông qua bài viết này nhé.
Trước khi chúng ta bắt đầu, bạn sẽ muốn xem xét giá treo quạt có sẵn của mình và quyết định cách tốt nhất có thể để lên kế hoạch cho luồng khí của bạn. Dưới đây là một số điều cần lưu ý.
Không khí nên chảy từ trước ra sau và từ dưới lên trên
Khi lắp quạt trường hợp, không khí chảy qua mặt mở về phía bên với lưới tản nhiệt bảo vệ, như vậy:
Vì vậy, mặt mở của quạt nên đối lập bên ngoài vỏ đối với quạt hút ở mặt trước hoặc mặt dưới, và mặt phải của mặt quạt đối với quạt ở phía sau hoặc phía trên.
Hầu hết những trường hợp được phong cách thiết kế với một luồng không khí định hướng nhất định – thường là từ trước ra sau và từ dưới lên trên. Điều đó nghĩa là bạn nên gắn quạt hút gió ở mặt trước của vỏ hoặc nhiều lúc (nếu bạn có thiết lập nhiều quạt hoặc khung gắn phía đằng trước bị chặn) ở phía dưới.
Quạt hút đi phía sau hoặc phía trên. Không gắn quạt hút dưới mặt đáy thùng máy; Tính từ lúc khi không khí nóng tăng lên, một quạt hút khí dưới đáy sẽ hoạt động chống lại vật lý bằng phương pháp trục xuất không khí mát hơn một chút thay vì không khí ấm hơn. Hướng xả khí nên đi từ trước ra sau và từ dưới lên trên. Quạt gắn bên hoàn toàn có thể là ống hút hoặc ống xả, tùy theo thiết lập.
Quản lý cáp của bạn và những vật cản khác
Nói chung, rất tốt là có càng ít chướng ngại vật càng tốt giữa quạt hút gió ở mặt trước của vỏ và quạt hút ở phía sau và phía trên của vỏ. Điều này tạo nên luồng không khí nhanh hơn và hiệu suất cao hơn, làm mát hiệu quả hơn những bộ phận của bạn. Cố gắng kết nối toàn bộ những thành phần dài, phẳng như ổ đĩa CD, ổ cứng và GPU theo chiều ngang – đấy là thông số kỹ thuật mặc định trên hầu hết những trường hợp PC.
Cáp, đặc biệt quan trọng là những đường ray đi kèm theo lớn từ một nguồn cung cấp cấp, hoàn toàn có thể đặc biệt rắc rối. Hầu hết những trường hợp lớn bao gồm một mạng lưới hệ thống những lỗ và hướng dẫn được cho phép người tiêu dùng luồn những dây cáp này thoát khỏi khu vực mở chính của vỏ, thường ở phía sau khay bo mạch chủ. Lấy càng nhiều dây cáp thoát khỏi đường càng tốt. Đây là một ví dụ thực sự hay về một trường hợp có quản trị cáp tốt tạo nên luồng không khí mở.
Càng và một ví dụ không hay. Trường hợp chứng khoán không cung ứng nhiều tùy chọn để tại vị cáp cung cấp điện không sử dụng thoát khỏi đường, nhưng bạn vẫn nên cố gắng nỗ lực giấu chúng đi loanh quanh đâu đó một cách tốt nhất hoàn toàn có thể có thể.
Hầu hết những trường hợp gồm có nhiều điểm kết nối cho quạt case – nhiều lúc nhiều điểm gắn kết hơn so với quạt được bao gồm. Nếu bao gồm những thiết bị chặn thông hơi, hãy sử dụng chúng: có vẻ rất mê hoặc để giữ cho chúng mở ra để sở hữu nhiều không khí nóng hơn để thoát ra ngoài, nhưng thay vào đây sẽ hiệu suất cao hơn khi đưa không khí qua quạt hút và đó chỉ là một nơi mà bụi có thể xâm nhập. Tương tự, hãy bảo vệ sử dụng tất cả những bộ đệm đi kèm theo với vỏ của bạn cho những khe PCIe chưa sử dụng, khay ổ đĩa 5.25 “, v.v..
Nhắm tiềm năng các điểm nóng
CPU của bạn có quạt tản nhiệt và quạt riêng, trong cả khi chúng ta không tự thêm một chiếc – đây là quạt duy nhất được gắn trực tiếp vào một trong những thành phần bo mạch chủ. Quạt này đang tỏa nhiệt trực tiếp từ CPU vào làn luồng khí chính của thùng máy. Lý tưởng nhất là bạn rất thích đặt một quạt hút càng gần CPU càng tốt để sớm trục xuất luồng khí nóng này. Quạt gắn bên (trục xuất hoặc vẽ trong không khí theo hướng vuông góc với bo mạch chủ) hoàn toàn hoàn toàn có thể hữu dụng ở đây, nhưng không hẳn trường hợp nào thì cũng hỗ trợ.
Nếu bạn có một bộ làm mát CPU hậu mãi lớn, nó có thể có một hoặc nhiều quạt của riêng nó. Cố gắng hướng đầu ra của những quạt này thẳng hàng với một trong những quạt hút của vỏ, gửi nhiệt trực tiếp từ CPU ra bên phía ngoài vỏ. Hầu hết những bộ làm mát CPU hoàn toàn có thể được gắn theo bất kể hướng hồng y nào để giúp đạt được điều đó (và để thuận tiện xóa những thành Phần Viền trong khác). Hãy nhớ rằng, fan hâm mộ trường hợp hút không khí ở phía mở và trục xuất không khí ở phía lưới tản nhiệt.
Cân bằng áp suất không khí của bạn
Hãy nghĩ về vỏ máy tính như một hộp kín, và không khí đi vào hoặc thoát khỏi mỗi quạt gần như là bằng nhau. (Nó không trọn vẹn kín và luồng khí nói chung không bằng nhau, nhưng tổng thể chúng ta đang nói chung chung ở đây.) Giả sử tất cả những quạt có cùng kích cỡ và tốc độ, thì bạn có một trong ba tùy chọn hoàn toàn có thể cho áp suất không khí bên trong vụ án:
- Áp suất không khí dương: Nhiều fan hâm mộ đang hút không khí vào vụ án hơn là thổi không khí thoát khỏi vụ án.
- Áp suất không khí âm: Nhiều quạt thổi khí thoát khỏi thùng hơn là hút không khí vào, gây ra hiệu ứng chân không nhẹ.
- Áp suất không khí bằng nhau: Cùng một lượng quạt thổi khí vào và ra, tạo nên áp suất không khí tương đương với phòng xung quanh.
Do cách những bộ phận bên trong tạo nên những khối trong luồng không khí, nên không ít không hề dành được áp suất không khí thực sự bằng nhau trong vỏ máy. Bạn muốn có tối thiểu một nguồn vào và một quạt hút ít nhất, vì vậy, giả sử bạn có nhiều hơn, tốt hơn, hút không khí nhiều hơn nữa cho áp suất dương hoặc thổi ra nhiều hơn cho áp suất âm?
Cả hai giải pháp đều có ưu điểm của chúng. Áp suất không khí âm sẽ tạo ra một môi trường tự nhiên mát hơn một chút ít (ít nhất là trên lý thuyết), vì những quạt đang thao tác chịu khó hơn để trục xuất không khí nóng. Nhưng điểm yếu kém là chân không nhẹ mà nó tạo ra bên trong vỏ máy có khuynh hướng hút không khí từ toàn bộ những khu vực không được bảo vệ: lỗ thông hơi, khe cắm PCIe không sử dụng trên bảng tinh chỉnh và điều khiển phía sau, thậm chí còn cả những đường nối sắt kẽm kim loại trong vỏ máy. Áp suất không khí dương cũng sẽ không còn mát, nhưng – kết hợp với những bộ lọc bụi (xem bên dưới) – sẽ hút ít bụi hơn vì những lỗ thông hơi và đường nối này sẽ đẩy hết không khí thay vì hút vào.
Ý kiến về áp lực đè nén tích cực so với xấu đi được trộn lẫn. Hầu hết mọi người lựa chọn lựa cách tiếp cận cân đối hơn, hơi nghiêng về áp suất không khí âm (để làm mát theo lý thuyết) hoặc áp suất không khí dương (để tích tụ bụi ít hơn) và có lẽ rằng chúng tôi muốn trình làng một chiếc gì đó ở giữa. Trong thực tế, các trường hợp PC không phải là một môi trường tự nhiên kín mà sự độc lạ hoàn toàn có thể là không đáng kể. Nếu bạn thấy quá nhiều bụi tích tụ, hãy vận động và di chuyển một trong số những quạt đầu ra của bạn đến vị trí đầu vào. Nếu bạn trọn vẹn chăm sóc đến nhiệt độ, hãy kiểm tra temps CPU và GPU bằng màn hình hiển thị ứng dụng và thử 1 số ít cấu hình khác nhau.
Apache airflow là gì
Hãy để cho câu hỏi apache airflow là gì kích thích sự tò mò của bản thân bạn nhé. Và bạn sẽ đọc bài viết dưới đây để có thể có được đáp án cho thắc mắc apache airflow là gì ấy. Như thế bạn đã biết thêm được một kiến thức hay cũng như bổ ích đúng không nào.
Airflow cũng phân phối API Endpoint. Bạn hoàn toàn có thể dùng curl để thử hoặc dùng mấy framework như Postman
ENDPOINT_URL="http://localhost:8080/" curl -X GET \ --user "airflow:airflow" \ "${ENDPOINT_URL}/api/v1/pools"
Sau khi chúng ta đăng nhập thông tin tài khoản airflow trên web http://localhost:8080
sẽ hiện ra một list những DAG, chi tiết cụ thể phần này mình sẽ nói phía dưới bởi DAG là khái niệm quan trọng của Airflow
P/s: Nếu bạn không thích WebUI chứa những file DAG mẫu mà nhà tăng trưởng cung cấp thì có thể biến hóa env AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
trong file docker-compose.yaml .
DAGs
1 DAG (Directed Acyclic Graph) sẽ link các tasks, kiến thiết xây dựng liên kết, tự động chạy task, …
Dưới đấy là 1 ví dụ về DAG:
Hình bên trên bao gồm 4 task: A, B, C, D có mối link với nhau và chạy theo thứ tự mà người tiêu dùng quy định: a -> b, c -> d. Nếu task a oẳng thì sẽ dẫn đến task b, c, d oẳng theo. Bạn cũng luôn có thể tùy chỉnh thời gian để các tasks này chạy vào thời điểm mong muốn. Bạn có thể yên tâm một điều là DAG sẽ không còn chăm sóc đến code của bạn, tiềm năng của nó chỉ là workflow mà thôi.
Có 3 phương pháp để khai báo DAG trong code của bạn:
- Context manager
- Standard constructor
Context manager
Với cách này các bạn sẽ bọc code của tớ trong DAG bằng with
with DAG( "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False ) as dag: op = EmptyOperator(task_id="task")
Standard constructor
Với cách này, bạn sẽ khai báo một constructer gọi tới class DAG
my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) op = EmptyOperator(task_id="task", dag=my_dag)
Decorator
Nếu bạn quen với Python decorator thì có lẽ rằng sẽ quen với DAG decorator.
@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) def generate_dag(): op = EmptyOperator(task_id="task") dag = generate_dag()
Sự khác biệt giữa Context manager và Decorator
Nếu để chọn cách nào trong 3 cách trên thì tôi sẽ chọn viết DAG bằng decorator. Tại sao ? Bởi vì cách viết của context manager và standard constructor phức tạp, dài dòng và mơ hồ. Chính cho nên vì thế những phiên bản của Airflow từ 2.x trở lên phân phối thêm cách viết bằng decorator cho người tiêu dùng thuận tiện sử dụng. Tôi sẽ để 2 file ETL python ( Extract-Transform-Load ) được viết bởi 2 cách context manager và decorator bên dưới cho bạn dễ so sánh.
- Context manager
import json from textwrap import dedent import pendulum from airflow import DAG from airflow.operators.python import PythonOperator with DAG( 'tutorial_etl_dag', # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={'retries': 2}, description='ETL DAG tutorial', schedule_interval=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: dag.doc_md = __doc__ def extract(**kwargs): ti = kwargs['ti'] data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' ti.xcom_push('order_data', data_string) def transform(**kwargs): ti = kwargs['ti'] extract_data_string = ti.xcom_pull(task_ids='extract', key='order_data') order_data = json.loads(extract_data_string) total_order_value = 0 for value in order_data.values(): total_order_value += value total_value = {"total_order_value": total_order_value} total_value_json_string = json.dumps(total_value) ti.xcom_push('total_order_value', total_value_json_string) def load(**kwargs): ti = kwargs['ti'] total_value_string = ti.xcom_pull(task_ids='transform', key='total_order_value') total_order_value = json.loads(total_value_string) print(total_order_value) extract_task = PythonOperator( task_id='extract', python_callable=extract, ) extract_task.doc_md = dedent( """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ ) transform_task = PythonOperator( task_id='transform', python_callable=transform, ) transform_task.doc_md = dedent( """\ #### Transform task A simple Transform task which takes in the collection of order data from xcom and computes the total order value. This computed value is then put into xcom, so that it can be processed by the next task. """ ) load_task = PythonOperator( task_id='load', python_callable=load, ) load_task.doc_md = dedent( """\ #### Load task A simple Load task which takes in the result of the Transform task, by reading it from xcom and instead of saving it to end user review, just prints it out. """ ) extract_task >> transform_task >> load_task
import json import pendulum from airflow.decorators import dag, task @dag( schedule_interval=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) def tutorial_taskflow_api_etl(): """ ### TaskFlow API Tutorial Documentation This is a simple ETL data pipeline example which demonstrates the use of the TaskFlow API using three simple tasks for Extract, Transform, and Load. Documentation that goes along with the Airflow TaskFlow API tutorial is located [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html) """ @task() def extract(): """ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict @task(multiple_outputs=True) def transform(order_data_dict: dict): """ #### Transform task A simple Transform task which takes in the collection of order data and computes the total order value. """ total_order_value = 0 for value in order_data_dict.values(): total_order_value += value return {"total_order_value": total_order_value} @task() def load(total_order_value: float): """ #### Load task A simple Load task which takes in the result of the Transform task and instead of saving it to end user review, just prints it out. """ print(f"Total order value is: {total_order_value:.2f}") order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"]) tutorial_etl_dag = tutorial_taskflow_api_etl()
- Import modules: 2 cách viết không giống nhau sẽ có 2 cách import thư viện khác nhau
- Context manager:
from airflow import DAG from airflow.operators.python import PythonOperator
from airflow.decorators import dag, task
- Khởi tạo DAG:
- Context manager: bọc tất cả những tasks với DAG bằng keyword
with
- Decorator: khởi tạo
@dag
đầu file
Do những tham số của 2 cách viết giống nhau, nên tôi chỉ lấy ví dụ của một cách viết.
@dag( start_date=datetime.now(), schedule_interval=None, catchup=False, tags=['example'], ... )
start_date: thời hạn bắt đầu
schedule_interval: lịch trình chạy ( @once, @hourly, @daily, @weekly, @monthly, @yearly )
catchup: nếu bạn có start_date
, end_date
( optional ) và schedule_interval
thì khi bạn để giá trị của catchup là True thì schedule_interval
sẽ không còn bị số lượng giới hạn và thực thi tasks tức thì.
- Khởi tạo tasks trong DAG: những tasks ở đây là extract, transform, loading.
- Context manager: truyền những function ETL vào PythonOperator ( Operator có nghĩa vụ và trách nhiệm xử lý tasks trong phần kiến trúc đề cập phía trên ) và gán 1 biến bất kỳ
extract_task = PythonOperator( task_id='extract', python_callable=extract, )
- Decorator: đơn thuần dùng như python decorator
@task() def extract(): ...
- Viết doc cho function.
- Context manager: từ các biến tượng trưng cho những task gọi hàm
doc_md
extract_task.doc_md = dedent( """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ )
- Decorator: không đổi khác gì, viết doc trong hàm như bình thường
- Xác định thứ tự xử lý của những tasks: Chi tiết phần này mình sẽ đề cập ở bên dưới
- Context manager:
extract_task >> transform_task >> load_task
- Decorator: cách viết này sẽ rõ ràng hơn cho những người đọc
order_data = extract() order_summary = transform(order_data) load(order_summary["total_order_value"])
Như vậy, mọi người đã được tham khảo những thông tin giải đáp cho câu hỏi air flow là gì. Với những nội dung có trong bài viết mong rằng sẽ có thể giúp ích được cho mọi người. Nếu như bạn còn có những thắc mắc nào chưa thể giải đáp hãy trao đối ngay với chúng tôi để được giải đáp một cách nhanh nhất và hiệu quả nhất bạn nhé.