Apresentaremos os novos recursos do KubernetesExecutor 2.0. Alerta de spoiler !!! O processo é mais rápido, flexível e fácil de entender.
Juntamente com o Airflow 2.0, temos o prazer de apresentar um KubernetesExecutor completamente redesenhado. Essa nova arquitetura é mais rápida, flexível e fácil de entender do que o KubernetesExecutor 1.10. Como primeira etapa, gostaríamos de apresentar a você os novos recursos do KubernetesExecutor 2.0!
O que é KubernetesExecutor?
Em 2018, apresentamos o KubernetesExecutor com base nas ideias de escalonamento automático e flexibilidade. O Airflow ainda não tinha um conceito claro para escalonamento automático de Celery Workers (embora nosso trabalho recente com KEDA a esse respeito tenha sido muito bem-sucedido), então queríamos criar um sistema que pudesse atender às necessidades do usuário. Como resultado dessa pesquisa, foi criado um sistema que usa a API Kubernetes para executar uma tarefa de pod por fluxo de ar. Um efeito colateral valioso desse sistema baseado na API do Kubernetes é que ele abriu a capacidade de os usuários adicionarem complementos e restrições exclusivos para cada tarefa.
Usando a API Kubernetes e o KubernetesExecutor, os usuários do Airflow podem determinar se certas tarefas têm acesso a determinados segredos ou que uma tarefa só pode ser executada em um nó que existe na União Europeia (o que pode ser útil para o gerenciamento de dados). Os usuários também podem especificar quantos recursos uma tarefa está ocupando, o que pode variar muito dependendo do que a tarefa está fazendo (por exemplo, o acesso a GPUs é necessário para executar um script do TensorFlow). Com esta API, o KubernetesExecutor permite que os engenheiros de dados tenham um controle muito mais preciso sobre como o Airflow executa suas tarefas do que usariam apenas as filas existentes do Celery.
, KubernetesExecutor . pod , , Celery ( , ). , CeleryExecutor , . , CeleryExecutor, KubernetesExecutor Airflow, Airflow 2.0 , CeleryKubernetesExecutor, !
KubernetesExecutor
podtemplate
Airflow 1.10.12 pod_template_file
. Kubernetes KubernetesExecutor. , Airflow API Kubernetes .
pod_template_files
Airflow. pod_template_file
, , , CeleryExecutor .
pod pod_template_files
, 2.0 , , pod Kubernetes, . pod , Celery. — KubernetesExecutor.
Execitor_config
Airflow 2.0 executor_config
, . , Python , API Kubernetes. executor_config
podOverride
. , .
, executeor_config
- Airflow 2.0, . , .
podmutationhook
1.10.12, pod_mutation_hook
Kubernetes V1Pod Airflow pod Kubernetes API , Airflow pod. pod, KubernetesExecutor, pod, KubernetesPodOperator.
KubernetesExecutor. , pod_template_file
pod, Kubernetes pod_override
pod_mutation_hook
pod. , .
, KubernetesExecutor.
, , , . Pod , . .
.
. pod, . V1pod, .
Airflow DevOps, .
, DAG, , executor_config
podOverride. , Kubernetes DAG, , KubernetesPodOperator . KubernetesPodOperator Docker , . , executeor_config
, Kubernetes API podOverride , , , , . . , .
, , , , Python pod, . executeor_config
podOverride , PythonOperator API TaskFlow. DAG :
from airflow.decorators import dag, task from datetime import datetime import os import json import requests from kubernetes.client import models as k8s new_config ={ "pod_override": k8s.V1Pod( metadata=k8s.V1ObjectMeta(labels={"purpose": "pod-override-example"}), spec=k8s.V1PodSpec( containers=[ k8s.V1Container( name="base", env=[ k8s.V1EnvVar(name="STATE", value="wa") ], ) ] ) ) } default_args = { 'start_date': datetime(2021, 1, 1) } @dag('k8s_executor_example', schedule_interval='@daily', default_args=default_args, catchup=False) def taskflow(): @task(executor_config=new_config) def get_testing_increase(): """ Gets totalTestResultsIncrease field from Covid API for given state and returns value """ url = 'https://covidtracking.com/api/v1/states/' res = requests.get(url+'{0}/current.json'.format(os.environ['STATE'])) return{'testing_increase': json.loads(res.text)['totalTestResultsIncrease']} get_testing_increase() dag = taskflow()
new_config
, pod Kubernetes API. DAG , API Covid . , podOverride. Airflow Kubernetes.
KubernetesExecutor
KubernetesExecutor, . , — .
YAML. DAG, DAG git DAG Kubernetes Volume.
, airflow.cfg YAML . YAML .
A melhor parte desses três novos recursos é que eles estão disponíveis no Airflow 1.10.13. Você pode iniciar o processo de migração imediatamente e aproveitar os benefícios e a aceleração desse design mais simples. Aguardamos seus comentários e sinta-se à vontade para nos contatar com qualquer dúvida, solicitação de recursos ou documentação!