Airflow
This guide shows how to integrate Definity with Apache Airflow to track Spark jobs orchestrated through Airflow DAGs.
Using Jinja Templating
Airflow provides Jinja templating to dynamically inject DAG and task information into Spark configurations. This allows automatic tracking of pipeline lineage.
BashOperator with spark-submit
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="spark_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
spark_task = BashOperator(
task_id="spark_task",
bash_command="""
spark-submit \
--jars definity-spark-agent-3.5_2.12-0.75.1.jar \
--conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=prod \
--conf spark.definity.pipeline.name='{{ dag_run.dag_id }}' \
--conf spark.definity.pipeline.pit='{{ ts }}' \
--conf spark.definity.task.name='{{ ti.task_id }}' \
/path/to/your_script.py
""",
)
SparkSubmitOperator
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
with DAG(
dag_id="spark_submit_dag",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
spark_task = SparkSubmitOperator(
task_id="process_data",
application="/path/to/your_script.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
"spark.plugins": "ai.definity.spark.plugin.DefinitySparkPlugin",
"spark.definity.server": "https://app.definity.run",
"spark.definity.env.name": "prod",
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}",
},
)
Jinja Template Variables
Airflow provides these useful template variables for Definity tracking:
| Variable | Description | Example |
|---|---|---|
{{ dag_run.dag_id }} | DAG identifier | spark_etl_pipeline |
{{ ts }} | Execution timestamp (ISO 8601) | 2023-04-01T00:00:00+00:00 |
{{ ds }} | Execution date (YYYY-MM-DD) | 2023-04-01 |
{{ ti.task_id }} | Task identifier | process_data |
{{ run_id }} | Unique run identifier | scheduled__2023-04-01T00:00:00+00:00 |
Multi-Task DAG Example
Track multiple Spark tasks within a single pipeline:
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
with DAG(
dag_id="multi_task_pipeline",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
# Common Definity configuration
definity_conf = {
"spark.plugins": "ai.definity.spark.plugin.DefinitySparkPlugin",
"spark.definity.server": "https://app.definity.run",
"spark.definity.env.name": "prod",
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ds }}",
}
extract_task = SparkSubmitOperator(
task_id="extract_data",
application="/path/to/extract.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
**definity_conf,
"spark.definity.task.name": "extract_data",
},
)
transform_task = SparkSubmitOperator(
task_id="transform_data",
application="/path/to/transform.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
**definity_conf,
"spark.definity.task.name": "transform_data",
},
)
load_task = SparkSubmitOperator(
task_id="load_data",
application="/path/to/load.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
**definity_conf,
"spark.definity.task.name": "load_data",
},
)
extract_task >> transform_task >> load_task
Environment-Specific Configuration
Use Airflow Variables or Connections to manage environment-specific settings:
from airflow import DAG
from airflow.models import Variable
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
# Store in Airflow Variables
DEFINITY_SERVER = Variable.get("definity_server_url")
DEFINITY_TOKEN = Variable.get("definity_api_token")
ENVIRONMENT = Variable.get("environment", "dev")
with DAG(
dag_id="configurable_pipeline",
start_date=datetime(2023, 1, 1),
schedule_interval="@daily",
) as dag:
spark_task = SparkSubmitOperator(
task_id="spark_job",
application="/path/to/script.py",
jars="definity-spark-agent-3.5_2.12-0.75.1.jar",
conf={
"spark.plugins": "ai.definity.spark.plugin.DefinitySparkPlugin",
"spark.definity.server": DEFINITY_SERVER,
"spark.definity.api.token": DEFINITY_TOKEN,
"spark.definity.env.name": ENVIRONMENT,
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}",
},
)
Authentication
For SaaS deployments, provide your API token via:
- Spark Configuration (shown in examples above)
- Environment Variable - Set
DEFINITY_API_TOKENon Airflow workers - Airflow Variable - Store securely in Airflow's metadata database
Best Practices
- Use template variables for pipeline and task names to maintain consistency
- Use
{{ ds }}or{{ ts }}forpipeline.pitto track execution dates - Store sensitive data (tokens, URLs) in Airflow Variables or Secrets Backend
- Use connection pooling for shared Spark clusters to optimize resource usage
- Tag your tasks with
spark.definity.tagsfor better organization
Databricks with Airflow
If you're using Databricks with Airflow, see the Databricks Integration guide for specific examples.
Next Steps
- See Configuration Reference for all available parameters
- Learn about Custom Metrics to track pipeline KPIs
- Explore Tracking Modes for shared cluster scenarios