Databricks
Supported Databricks Runtime versions: 12.2, 13.3, and 14.3.
❗ Note: Databricks Serverless is not supported for this instrumentation. You may optionally use the DBT agent instead.
To enable integration with definity on Databricks, follow these steps:
- Attach the Spark Agent JAR to your compute cluster.
- Configure jobs or tasks with definity parameters.
Cluster Configuration
1. Create an Init Script
Create a script to download and add the definity Spark agent to the cluster’s CLASSPATH
. Save this script in cloud storage (e.g., S3).
#!/bin/bash
JAR_DIR="/databricks/jars"
mkdir -p $JAR_DIR
DEFINITY_JAR_URL="https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar"
curl -o $JAR_DIR/definity-spark-agent.jar $DEFINITY_JAR_URL
export CLASSPATH=$CLASSPATH:$JAR_DIR/definity-spark-agent.jar
2. Attach the Init Script to Your Compute Cluster
In the Databricks UI:
- Go to Cluster configuration → Advanced options → Init Scripts.
- Add your script with:
- Source:
s3
- File path:
s3://your-s3-bucket/init-scripts-dir/definity_init.sh
- Source:
3. Configure Spark Parameters
Navigate to Cluster configuration → Advanced options → Spark and add:
spark.plugins ai.definity.spark.plugin.DefinitySparkPlugin
spark.definity.server https://app.definity.run
spark.definity.api.token eyJhb...
Note: These settings affect the default Spark session created by the cluster. Definity will monitor this session automatically.
Job Configuration
Example: Airflow Notebook Job
run_notebook = DatabricksSubmitRunOperator(
task_id="run_notebook",
json={
"notebook_task": {
"notebook_path": "/Users/[email protected]/my_notebook",
"base_parameters": {
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}"
},
},
"name": "notebook-job",
}
)
Example: Airflow Python Job
run_python = DatabricksSubmitRunOperator(
task_id="run_python_script",
json={
"spark_python_task": {
"python_file": "dbfs:/path/to/job.py",
"parameters": [
"spark.definity.pipeline.name={{ dag_run.dag_id }}",
"spark.definity.pipeline.pit={{ ts }}",
"spark.definity.task.name={{ ti.task_id }}"
]
},
"name": "python-job",
}
)
Example: Manual Task Configuration
You can manually set task scope in your code.
When doing so, set the following Spark config at the cluster level to disable automatic session detection:
spark.definity.databricks.automaticSessions.enabled=false
Basic Example
# Set this property to define a new task scope
spark.conf.set("spark.definity.session", f"pipeline.name={my_pipeline},pipeline.pit={pit_date},task.name={my_task}")
Advanced Example
For multiple logical tasks in a single session, unset the property when the task ends:
try {
// your job logic here
...
} finally {
// Unset the session to signal task completion (recommended in a `finally` block to catch failures)
spark.conf.unset("spark.definity.session")
}
Note: This is not required for Python script jobs and notebook jobs.
Example: Jobs API
definity parameters can be passed via the base_parameters
or parameters
fields depending on the task type.
{
"tasks": [
{
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_1",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task1"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "task2",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_2",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task2"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "python_task1",
"spark_python_task": {
"python_file": "s3://my-bucket/python_task.py",
"parameters": [
"yourArg1",
"yourArg2",
"spark.definity.task.name=python_task_1",
"spark.definity.pipeline.name=my_pipeline",
"spark.definity.pipeline.pit=2025-01-01 01:00:00"
]
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
}
],
"format": "MULTI_TASK",
"queue": {
"enabled": true
}
}