Skip to main content

Airflow

This guide shows how to integrate Definity with Apache Airflow to track Spark jobs orchestrated through Airflow DAGs.

Using Jinja Templating

Airflow provides Jinja templating to dynamically inject DAG and task information into Spark configurations. This allows automatic tracking of pipeline lineage.

BashOperator with spark-submit

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
dag_id="spark_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:

spark_task = BashOperator(
task_id="spark_task",
bash_command="""
spark-submit \
--jars definity-spark-agent-3.5_2.12-0.75.1.jar \
--conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=prod \
--conf spark.definity.pipeline.name='{{ dag_run.dag_id }}' \
--conf spark.definity.pipeline.pit='{{ ts }}' \
--conf spark.definity.task.name='{{ ti.task_id }}' \
/path/to/your_script.py
""",
)

SparkSubmitOperator

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

with DAG(
dag_id="spark_submit_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:

spark_task = SparkSubmitOperator(
task_id="process_data",
application="/path/to/your_script.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
"spark.plugins": "ai.definity.spark.plugin.DefinitySparkPlugin",
"spark.definity.server": "https://app.definity.run",
"spark.definity.env.name": "prod",
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}",
},
)

Jinja Template Variables

Airflow provides these useful template variables for Definity tracking:

VariableDescriptionExample
{{ dag_run.dag_id }}DAG identifierspark_etl_pipeline
{{ ts }}Execution timestamp (ISO 8601)2023-04-01T00:00:00+00:00
{{ ds }}Execution date (YYYY-MM-DD)2023-04-01
{{ ti.task_id }}Task identifierprocess_data
{{ run_id }}Unique run identifierscheduled__2023-04-01T00:00:00+00:00

Multi-Task DAG Example

Track multiple Spark tasks within a single pipeline:

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

with DAG(
dag_id="multi_task_pipeline",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:

# Common Definity configuration
definity_conf = {
"spark.plugins": "ai.definity.spark.plugin.DefinitySparkPlugin",
"spark.definity.server": "https://app.definity.run",
"spark.definity.env.name": "prod",
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ds }}",
}

extract_task = SparkSubmitOperator(
task_id="extract_data",
application="/path/to/extract.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
**definity_conf,
"spark.definity.task.name": "extract_data",
},
)

transform_task = SparkSubmitOperator(
task_id="transform_data",
application="/path/to/transform.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
**definity_conf,
"spark.definity.task.name": "transform_data",
},
)

load_task = SparkSubmitOperator(
task_id="load_data",
application="/path/to/load.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
**definity_conf,
"spark.definity.task.name": "load_data",
},
)

extract_task >> transform_task >> load_task

Environment-Specific Configuration

Use Airflow Variables or Connections to manage environment-specific settings:

from airflow import DAG
from airflow.models import Variable
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

# Store in Airflow Variables
DEFINITY_SERVER = Variable.get("definity_server_url")
DEFINITY_TOKEN = Variable.get("definity_api_token")
ENVIRONMENT = Variable.get("environment", "dev")

with DAG(
dag_id="configurable_pipeline",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:

spark_task = SparkSubmitOperator(
task_id="spark_job",
application="/path/to/script.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
"spark.plugins": "ai.definity.spark.plugin.DefinitySparkPlugin",
"spark.definity.server": DEFINITY_SERVER,
"spark.definity.api.token": DEFINITY_TOKEN,
"spark.definity.env.name": ENVIRONMENT,
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}",
},
)

Authentication

For SaaS deployments, provide your API token via:

  1. Spark Configuration (shown in examples above)
  2. Environment Variable - Set DEFINITY_API_TOKEN on Airflow workers
  3. Airflow Variable - Store securely in Airflow's metadata database

Best Practices

  • Use template variables for pipeline and task names to maintain consistency
  • Use {{ ds }} or {{ ts }} for pipeline.pit to track execution dates
  • Store sensitive data (tokens, URLs) in Airflow Variables or Secrets Backend
  • Use connection pooling for shared Spark clusters to optimize resource usage
  • Tag your tasks with spark.definity.tags for better organization

Databricks with Airflow

If you're using Databricks with Airflow, see the Databricks Integration guide for specific examples.

Next Steps