Novo KubernetesExecutor 2.0 no Airflow 2.0

Apresentaremos os novos recursos do KubernetesExecutor 2.0. Alerta de spoiler !!! O processo é mais rápido, flexível e fácil de entender.







Juntamente com o Airflow 2.0, temos o prazer de apresentar um KubernetesExecutor completamente redesenhado. Essa nova arquitetura é mais rápida, flexível e fácil de entender do que o KubernetesExecutor 1.10. Como primeira etapa, gostaríamos de apresentar a você os novos recursos do KubernetesExecutor 2.0!







O que é KubernetesExecutor?



Em 2018, apresentamos o KubernetesExecutor com base nas ideias de escalonamento automático e flexibilidade. O Airflow ainda não tinha um conceito claro para escalonamento automático de Celery Workers (embora nosso trabalho recente com KEDA a esse respeito tenha sido muito bem-sucedido), então queríamos criar um sistema que pudesse atender às necessidades do usuário. Como resultado dessa pesquisa, foi criado um sistema que usa a API Kubernetes para executar uma tarefa de pod por fluxo de ar. Um efeito colateral valioso desse sistema baseado na API do Kubernetes é que ele abriu a capacidade de os usuários adicionarem complementos e restrições exclusivos para cada tarefa.







Usando a API Kubernetes e o KubernetesExecutor, os usuários do Airflow podem determinar se certas tarefas têm acesso a determinados segredos ou que uma tarefa só pode ser executada em um nó que existe na União Europeia (o que pode ser útil para o gerenciamento de dados). Os usuários também podem especificar quantos recursos uma tarefa está ocupando, o que pode variar muito dependendo do que a tarefa está fazendo (por exemplo, o acesso a GPUs é necessário para executar um script do TensorFlow). Com esta API, o KubernetesExecutor permite que os engenheiros de dados tenham um controle muito mais preciso sobre como o Airflow executa suas tarefas do que usariam apenas as filas existentes do Celery.







, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !







KubernetesExecutor



podtemplate



Airflow 1.10.12 pod_template_file



. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .







pod_template_files



Airflow. pod_template_file



, , , CeleryExecutor .







pod pod_template_files



, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.







Execitor_config



Airflow 2.0 executor_config



, . , Python , API Kubernetes. executor_config



podOverride



. , .







, executeor_config



- Airflow 2.0, . , .







podmutationhook



1.10.12, pod_mutation_hook



Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.









KubernetesExecutor. , pod_template_file



pod, Kubernetes pod_override



pod_mutation_hook



pod. , .







, KubernetesExecutor.













, , , . Pod , . .







.













. pod, . V1pod, .









Airflow DevOps, .







, DAG, , executor_config



podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config



, Kubernetes API podOverride , , , , . . , .







, , , , Python pod, . executeor_config



podOverride , PythonOperator API TaskFlow. DAG :







from airflow.decorators import dag, task
from datetime import datetime

import os
import json
import requests
from kubernetes.client import models as k8s

new_config ={ "pod_override": k8s.V1Pod(
                metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}),
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            env=[
                                k8s.V1EnvVar(name="STATE", value="wa")
                                ],
                            )
                        ]
                    )
                )
            }

default_args = {
    'start_date': datetime(2021, 1, 1)
}

@dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():

    @task(executor_config=new_config)
    def get_testing_increase():
        """
        Gets totalTestResultsIncrease field from Covid API for given state and returns value
        """
        url = 'https://covidtracking.com/api/v1/states/'
        res = requests.get(url+'{0}/current.json'.format(os.environ['STATE']))
        return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']}

    get_testing_increase()

dag = taskflow()
      
      





new_config



, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.







KubernetesExecutor



KubernetesExecutor, . , — .







YAML. DAG, DAG git DAG Kubernetes Volume.







, airflow.cfg YAML . YAML .







A melhor parte desses três novos recursos é que eles estão disponíveis no Airflow 1.10.13. Você pode iniciar o processo de migração imediatamente e aproveitar os benefícios e a aceleração desse design mais simples. Aguardamos seus comentários e sinta-se à vontade para nos contatar com qualquer dúvida, solicitação de recursos ou documentação!








All Articles