Skip to main content

Databricks

Supported Databricks Runtime versions: 12.2, 13.3, and 14.3.

Note: Databricks Serverless is not supported for this instrumentation. You may optionally use the DBT agent instead.

To enable integration with definity on Databricks, follow these steps:

  1. Attach the Spark Agent JAR to your compute cluster.
  2. Configure jobs or tasks with definity parameters.

Cluster Configuration

1. Create an Init Script

Create a script to download and add the definity Spark agent to the cluster’s CLASSPATH. Save this script in cloud storage (e.g., S3).

definity_init.sh
#!/bin/bash
JAR_DIR="/databricks/jars"
mkdir -p $JAR_DIR
DEFINITY_JAR_URL="https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar"
curl -o $JAR_DIR/definity-spark-agent.jar $DEFINITY_JAR_URL
export CLASSPATH=$CLASSPATH:$JAR_DIR/definity-spark-agent.jar

2. Attach the Init Script to Your Compute Cluster

In the Databricks UI:

  1. Go to Cluster configurationAdvanced optionsInit Scripts.
  2. Add your script with:
    • Source: s3
    • File path: s3://your-s3-bucket/init-scripts-dir/definity_init.sh

3. Configure Spark Parameters

Navigate to Cluster configurationAdvanced optionsSpark and add:

spark.plugins                    ai.definity.spark.plugin.DefinitySparkPlugin
spark.definity.server https://app.definity.run
spark.definity.api.token eyJhb...

Note: These settings affect the default Spark session created by the cluster. Definity will monitor this session automatically.


Job Configuration

Example: Airflow Notebook Job

run_notebook = DatabricksSubmitRunOperator(
task_id="run_notebook",
json={
"notebook_task": {
"notebook_path": "/Users/[email protected]/my_notebook",
"base_parameters": {
"spark.definity.pipeline.name": "{{ dag_run.dag_id }}",
"spark.definity.pipeline.pit": "{{ ts }}",
"spark.definity.task.name": "{{ ti.task_id }}"
},
},
"name": "notebook-job",
}
)

Example: Airflow Python Job

run_python = DatabricksSubmitRunOperator(
task_id="run_python_script",
json={
"spark_python_task": {
"python_file": "dbfs:/path/to/job.py",
"parameters": [
"spark.definity.pipeline.name={{ dag_run.dag_id }}",
"spark.definity.pipeline.pit={{ ts }}",
"spark.definity.task.name={{ ti.task_id }}"
]
},
"name": "python-job",
}
)

Example: Manual Task Configuration

You can manually set task scope in your code.

When doing so, set the following Spark config at the cluster level to disable automatic session detection:

spark.definity.databricks.automaticSessions.enabled=false

Basic Example

# Set this property to define a new task scope
spark.conf.set("spark.definity.session", f"pipeline.name={my_pipeline},pipeline.pit={pit_date},task.name={my_task}")

Advanced Example

For multiple logical tasks in a single session, unset the property when the task ends:

try {
// your job logic here
...
} finally {
// Unset the session to signal task completion (recommended in a `finally` block to catch failures)
spark.conf.unset("spark.definity.session")
}

Note: This is not required for Python script jobs and notebook jobs.


Example: Jobs API

definity parameters can be passed via the base_parameters or parameters fields depending on the task type.

{
"tasks": [
{
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_1",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task1"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "task2",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_2",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task2"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "python_task1",
"spark_python_task": {
"python_file": "s3://my-bucket/python_task.py",
"parameters": [
"yourArg1",
"yourArg2",
"spark.definity.task.name=python_task_1",
"spark.definity.pipeline.name=my_pipeline",
"spark.definity.pipeline.pit=2025-01-01 01:00:00"
]
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
}
],
"format": "MULTI_TASK",
"queue": {
"enabled": true
}
}