KubernetesExecutor 2.0 baru di Airflow 2.0

Kami akan memperkenalkan Anda pada fitur-fitur baru KubernetesExecutor 2.0. Peringatan spoiler !!! Prosesnya lebih cepat, lebih fleksibel, dan lebih mudah dipahami.







Bersama dengan Airflow 2.0, kami dengan senang hati mempersembahkan KubernetesExecutor yang didesain ulang sepenuhnya. Arsitektur baru ini lebih cepat, lebih fleksibel, dan lebih mudah dipahami daripada KubernetesExecutor 1.10. Sebagai langkah pertama, kami ingin memperkenalkan Anda pada fitur-fitur baru KubernetesExecutor 2.0!







Apa itu KubernetesExecutor?



Pada tahun 2018, kami memperkenalkan KubernetesExecutor berdasarkan ide penskalaan otomatis dan fleksibilitas. Aliran udara belum memiliki konsep yang jelas untuk penskalaan otomatis Celery Workers (meskipun pekerjaan terbaru kami dengan KEDA dalam hal ini sangat berhasil), jadi kami ingin membuat sistem yang dapat memenuhi kebutuhan pengguna. Sebagai hasil dari penelitian ini, sebuah sistem dibuat yang menggunakan Kubernetes API untuk menjalankan tugas pod per aliran udara. Efek samping yang berharga dari sistem berbasis API Kubernetes ini adalah ia membuka kemampuan bagi pengguna untuk menambahkan add-on dan batasan unik untuk setiap tugas.







Dengan menggunakan Kubernetes API dan KubernetesExecutor, pengguna Airflow dapat menentukan bahwa tugas tertentu memiliki akses ke rahasia tertentu, atau bahwa tugas hanya dapat dilakukan pada node yang ada di Uni Eropa (yang dapat berguna untuk pengelolaan data). Pengguna juga dapat menentukan berapa banyak resource yang digunakan tugas, yang bisa sangat bervariasi tergantung pada apa yang dilakukan tugas (misalnya, akses ke GPU diperlukan untuk menjalankan skrip TensorFlow). Dengan API ini, KubernetesExecutor memungkinkan teknisi data memiliki kontrol yang jauh lebih baik atas bagaimana Airflow melakukan tugasnya daripada hanya menggunakan antrean Celery yang ada.







, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !







KubernetesExecutor



podtemplate



Airflow 1.10.12 pod_template_file



. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .







pod_template_files



Airflow. pod_template_file



, , , CeleryExecutor .







pod pod_template_files



, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.







Execitor_config



Airflow 2.0 executor_config



, . , Python , API Kubernetes. executor_config



podOverride



. , .







, executeor_config



- Airflow 2.0, . , .







podmutationhook



1.10.12, pod_mutation_hook



Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.









KubernetesExecutor. , pod_template_file



pod, Kubernetes pod_override



pod_mutation_hook



pod. , .







, KubernetesExecutor.













, , , . Pod , . .







.













. pod, . V1pod, .









Airflow DevOps, .







, DAG, , executor_config



podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config



, Kubernetes API podOverride , , , , . . , .







, , , , Python pod, . executeor_config



podOverride , PythonOperator API TaskFlow. DAG :







from airflow.decorators import dag, task
from datetime import datetime

import os
import json
import requests
from kubernetes.client import models as k8s

new_config ={ "pod_override": k8s.V1Pod(
                metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}),
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            env=[
                                k8s.V1EnvVar(name="STATE", value="wa")
                                ],
                            )
                        ]
                    )
                )
            }

default_args = {
    'start_date': datetime(2021, 1, 1)
}

@dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():

    @task(executor_config=new_config)
    def get_testing_increase():
        """
        Gets totalTestResultsIncrease field from Covid API for given state and returns value
        """
        url = 'https://covidtracking.com/api/v1/states/'
        res = requests.get(url+'{0}/current.json'.format(os.environ['STATE']))
        return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']}

    get_testing_increase()

dag = taskflow()
      
      





new_config



, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.







KubernetesExecutor



KubernetesExecutor, . , — .







YAML. DAG, DAG git DAG Kubernetes Volume.







, airflow.cfg YAML . YAML .







Bagian terbaik dari ketiga fitur baru ini adalah semuanya tersedia di Airflow 1.10.13. Anda dapat langsung memulai proses migrasi dan menikmati manfaat serta percepatan desain yang lebih sederhana ini. Kami menantikan tanggapan Anda dan jangan ragu untuk menghubungi kami jika ada pertanyaan, permintaan fitur, atau dokumentasi!








All Articles