Applying Custom Resources
Airflow can apply custom resources from within a cluster, such as triggering Spark job by applying a SparkApplication resource. The steps below outline this process. The DAG consists of modularized Python files and is provisioned using the git-sync feature.
Define an in-cluster Kubernetes connection
To start a Spark job, Airflow must communicate with Kubernetes, requiring an in-cluster connection. This can be created through the Webserver UI by enabling the "in cluster configuration" setting:
 
Alternatively, the connection can be defined using an environment variable in URI format:
AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"This can be supplied directly in the custom resource for all roles (Airflow expects configuration to be common across components):
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 2.10.5
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: simple-airflow-credentials
  webservers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
        replicas: 1
  schedulers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
        replicas: 1
  celeryExecutors:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"
        replicas: 1
# in case of using kubernetesExecutors
#  kubernetesExecutors:
#    envOverrides:
#      AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D"Define a cluster role for Airflow to create SparkApplication resources
Airflow cannot create or access SparkApplication resources by default - a cluster role is required for this:
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: airflow-spark-clusterrole
rules:
- apiGroups:
  - spark.stackable.tech
  resources:
  - sparkapplications
  verbs:
  - create
  - getand a corresponding cluster role binding:
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: airflow-spark-clusterrole-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: airflow-spark-clusterrole
subjects:
- apiGroup: rbac.authorization.k8s.io
  kind: Group
  name: system:serviceaccountsDAG code
For the DAG itself, the job is a modularized DAG that starts a one-off Spark job to calculate the value of pi. The file structure, fetched to the root git-sync folder, looks like this:
dags |_ stackable |_ __init__.py |_ spark_kubernetes_operator.py |_ spark_kubernetes_sensor.py |_ pyspark_pi.py |_ pyspark_pi.yaml
The Spark job calculates the value of pi using one of the example scripts that comes bundled with Spark:
---
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: pyspark-pi
spec:
  sparkImage:
    productVersion: 3.5.2
  mode: cluster
  mainApplicationFile: local:///stackable/spark/examples/src/main/python/pi.py
  executor:
    replicas: 1This is called from within a DAG by using the connection that was defined earlier.
It is wrapped by the KubernetesHook that the Airflow Kubernetes provider makes available here.
There are two classes that are used to:
- 
start the job 
- 
monitor the status of the job 
The classes SparkKubernetesOperator and SparkKubernetesSensor are located in two different Python modules as they are typically used for all custom resources and thus are best decoupled from the DAG that calls them.
This also demonstrates that modularized DAGs can be used for Airflow jobs as long as all dependencies exist in or below the root folder pulled by git-sync.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Optional, Sequence
from airflow.models import BaseOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
import json
if TYPE_CHECKING:
    from airflow.utils.context import Context
class SparkKubernetesOperator(BaseOperator):  (1)
    template_fields: Sequence[str] = ("application_file", "namespace")
    template_ext: Sequence[str] = (".yaml", ".yml", ".json")
    ui_color = "#f4a460"
    def __init__(
        self,
        *,
        application_file: str,
        namespace: Optional[str] = None,
        kubernetes_conn_id: str = "kubernetes_in_cluster",  (2)
        api_group: str = "spark.stackable.tech",
        api_version: str = "v1alpha1",
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.application_file = application_file
        self.namespace = namespace
        self.kubernetes_conn_id = kubernetes_conn_id
        self.api_group = api_group
        self.api_version = api_version
        self.plural = "sparkapplications"
    def execute(self, context: "Context"):
        hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
        self.log.info("Creating SparkApplication...")
        self.log.info(json.dumps(self.application_file, indent=4))
        response = hook.create_custom_object(
            group=self.api_group,
            version=self.api_version,
            plural=self.plural,
            body=self.application_file,
            namespace=self.namespace,
        )
        return response#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional, Dict
from kubernetes import client
from airflow.exceptions import AirflowException
from airflow.sensors.base import BaseSensorOperator
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
class SparkKubernetesSensor(BaseSensorOperator):  (3)
    template_fields = ("application_name", "namespace")
    # See https://github.com/stackabletech/spark-k8s-operator/pull/460/files#diff-d737837121132af6b60f50279a78464b05dcfd06c05d1d090f4198a5e962b5f6R371
    # Unknown is set immediately so it must be excluded from the failed states.
    FAILURE_STATES = "Failed"
    SUCCESS_STATES = "Succeeded"
    def __init__(
        self,
        *,
        application_name: str,
        attach_log: bool = False,
        namespace: Optional[str] = None,
        kubernetes_conn_id: str = "kubernetes_in_cluster",  (2)
        api_group: str = "spark.stackable.tech",
        api_version: str = "v1alpha1",
        poke_interval: float = 60,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.application_name = application_name
        self.attach_log = attach_log
        self.namespace = namespace
        self.kubernetes_conn_id = kubernetes_conn_id
        self.hook = KubernetesHook(conn_id=self.kubernetes_conn_id)
        self.api_group = api_group
        self.api_version = api_version
        self.poke_interval = poke_interval
    def _log_driver(self, application_state: str, response: dict) -> None:
        if not self.attach_log:
            return
        status_info = response["status"]
        if "driverInfo" not in status_info:
            return
        driver_info = status_info["driverInfo"]
        if "podName" not in driver_info:
            return
        driver_pod_name = driver_info["podName"]
        namespace = response["metadata"]["namespace"]
        log_method = (
            self.log.error
            if application_state in self.FAILURE_STATES
            else self.log.info
        )
        try:
            log = ""
            for line in self.hook.get_pod_logs(driver_pod_name, namespace=namespace):
                log += line.decode()
            log_method(log)
        except client.rest.ApiException as e:
            self.log.warning(
                "Could not read logs for pod %s. It may have been disposed.\n"
                "Make sure timeToLiveSeconds is set on your SparkApplication spec.\n"
                "underlying exception: %s",
                driver_pod_name,
                e,
            )
    def poke(self, context: Dict) -> bool:
        self.log.info("Poking: %s", self.application_name)
        response = self.hook.get_custom_object(
            group=self.api_group,
            version=self.api_version,
            plural="sparkapplications",
            name=self.application_name,
            namespace=self.namespace,
        )
        try:
            application_state = response["status"]["phase"]
        except KeyError:
            self.log.debug(
                f"SparkApplication status could not be established: {response}"
            )
            return False
        if (
            self.attach_log
            and application_state in self.FAILURE_STATES + self.SUCCESS_STATES
        ):
            self._log_driver(application_state, response)
        if application_state in self.FAILURE_STATES:
            raise AirflowException(
                f"SparkApplication failed with state: {application_state}"
            )
        elif application_state in self.SUCCESS_STATES:
            self.log.info("SparkApplication ended successfully")
            return True
        else:
            self.log.info("SparkApplication is still in state: %s", application_state)
            return False#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster"""
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.utils import yaml
import os
from stackable.spark_kubernetes_sensor import SparkKubernetesSensor
from stackable.spark_kubernetes_operator import SparkKubernetesOperator
with DAG(  (4)
    dag_id="sparkapp_dag",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
    tags=["example"],
    params={},
) as dag:
    def load_body_to_dict(body):
        try:
            body_dict = yaml.safe_load(body)
        except yaml.YAMLError as e:
            raise AirflowException(f"Exception when loading resource definition: {e}\n")
        return body_dict
    yaml_path = os.path.join(
        os.environ.get("AIRFLOW__CORE__DAGS_FOLDER", ""), "pyspark_pi.yaml"
    )
    with open(yaml_path, "r") as file:
        crd = file.read()
    with open("/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as file:
        ns = file.read()
    document = load_body_to_dict(crd)
    application_name = "pyspark-pi-" + datetime.now(timezone.utc).strftime(
        "%Y%m%d%H%M%S"
    )
    document.update({"metadata": {"name": application_name, "namespace": ns}})
    t1 = SparkKubernetesOperator(  (5)
        task_id="spark_pi_submit",
        namespace=ns,
        application_file=document,
        do_xcom_push=True,
        dag=dag,
    )
    t2 = SparkKubernetesSensor(  (6)
        task_id="spark_pi_monitor",
        namespace=ns,
        application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
        poke_interval=5,
        dag=dag,
    )
    t1 >> t2  (7)| 1 | the wrapper class used for calling the job via KubernetesHook | 
| 2 | the connection that created for in-cluster usage | 
| 3 | the wrapper class used for monitoring the job via KubernetesHook | 
| 4 | the start of the DAG code | 
| 5 | the initial task to invoke the job | 
| 6 | the subsequent task to monitor the job | 
| 7 | the jobs are chained together in the correct order | 
Once this DAG is mounted in the DAG folder it can be called and its progress viewed from within the Webserver UI:
 
Clicking on the "spark_pi_monitor" task and selecting the logs shows that the status of the job has been tracked by Airflow:
 
| If the KubernetesExecutoris employed the logs are only accessible via the SDP logging mechanism, described here. | 
| A full example of the above is used as an integration test here. | 
