Kami akan memperkenalkan Anda pada fitur-fitur baru KubernetesExecutor 2.0. Peringatan spoiler !!! Prosesnya lebih cepat, lebih fleksibel, dan lebih mudah dipahami.
Bersama dengan Airflow 2.0, kami dengan senang hati mempersembahkan KubernetesExecutor yang didesain ulang sepenuhnya. Arsitektur baru ini lebih cepat, lebih fleksibel, dan lebih mudah dipahami daripada KubernetesExecutor 1.10. Sebagai langkah pertama, kami ingin memperkenalkan Anda pada fitur-fitur baru KubernetesExecutor 2.0!
Apa itu KubernetesExecutor?
Pada tahun 2018, kami memperkenalkan KubernetesExecutor berdasarkan ide penskalaan otomatis dan fleksibilitas. Aliran udara belum memiliki konsep yang jelas untuk penskalaan otomatis Celery Workers (meskipun pekerjaan terbaru kami dengan KEDA dalam hal ini sangat berhasil), jadi kami ingin membuat sistem yang dapat memenuhi kebutuhan pengguna. Sebagai hasil dari penelitian ini, sebuah sistem dibuat yang menggunakan Kubernetes API untuk menjalankan tugas pod per aliran udara. Efek samping yang berharga dari sistem berbasis API Kubernetes ini adalah ia membuka kemampuan bagi pengguna untuk menambahkan add-on dan batasan unik untuk setiap tugas.
Dengan menggunakan Kubernetes API dan KubernetesExecutor, pengguna Airflow dapat menentukan bahwa tugas tertentu memiliki akses ke rahasia tertentu, atau bahwa tugas hanya dapat dilakukan pada node yang ada di Uni Eropa (yang dapat berguna untuk pengelolaan data). Pengguna juga dapat menentukan berapa banyak resource yang digunakan tugas, yang bisa sangat bervariasi tergantung pada apa yang dilakukan tugas (misalnya, akses ke GPU diperlukan untuk menjalankan skrip TensorFlow). Dengan API ini, KubernetesExecutor memungkinkan teknisi data memiliki kontrol yang jauh lebih baik atas bagaimana Airflow melakukan tugasnya daripada hanya menggunakan antrean Celery yang ada.
, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !
KubernetesExecutor
podtemplate
Airflow 1.10.12 pod_template_file
. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .
pod_template_files
Airflow. pod_template_file
, , , CeleryExecutor .
pod pod_template_files
, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.
Execitor_config
Airflow 2.0 executor_config
, . , Python , API Kubernetes. executor_config
podOverride
. , .
, executeor_config
- Airflow 2.0, . , .
podmutationhook
1.10.12, pod_mutation_hook
Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.
KubernetesExecutor. , pod_template_file
pod, Kubernetes pod_override
pod_mutation_hook
pod. , .
, KubernetesExecutor.

, , , . Pod , . .
.

. pod, . V1pod, .
Airflow DevOps, .
, DAG, , executor_config
podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config
, Kubernetes API podOverride , , , , . . , .
, , , , Python pod, . executeor_config
podOverride , PythonOperator API TaskFlow. DAG :
from airflow.decorators import dag, task from datetime import datetime import os import json import requests from kubernetes.client import models as k8s new_config ={ "pod_override": k8s.V1Pod( metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", env=[ k8s.V1EnvVar(name="STATE", value="wa") ], ) ] ) ) } default_args = { 'start_date': datetime(2021, 1, 1) } @dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False) def taskflow(): @task(executor_config=new_config) def get_testing_increase(): """ Gets totalTestResultsIncrease field from Covid API for given state and returns value """ url = 'https://covidtracking.com/api/v1/states/' res = requests.get(url+'{0}/current.json'.format(os.environ['STATE'])) return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']} get_testing_increase() dag = taskflow()
new_config
, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.
KubernetesExecutor
KubernetesExecutor, . , — .
YAML. DAG, DAG git DAG Kubernetes Volume.
, airflow.cfg YAML . YAML .
Bagian terbaik dari ketiga fitur baru ini adalah semuanya tersedia di Airflow 1.10.13. Anda dapat langsung memulai proses migrasi dan menikmati manfaat serta percepatan desain yang lebih sederhana ini. Kami menantikan tanggapan Anda dan jangan ragu untuk menghubungi kami jika ada pertanyaan, permintaan fitur, atau dokumentasi!