Skip to main content

Databricks

Currently supported versions: 12.2 13.3 14.3

Need to add definity agent to Spark's classpath and configure the relevant definity properties in order to use it:

Create init script

(to download definity's spark agent and add it to the classpath) and upload it to your cloud storage, e.g s3.

Example:

definity_init.sh
#!/bin/bash
JAR_DIR="/databricks/jars"
mkdir -p $JAR_DIR
DEFINITY_JAR_URL="https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar"
curl -o $JAR_DIR/definity-spark-agent.jar $DEFINITY_JAR_URL
export CLASSPATH=$CLASSPATH:$JAR_DIR/definity-spark-agent.jar

Configure Cluster

Add definity init script

When creating a compute cluster in Databricks, add definity init script to download the agent and add it to the classpath:
Cluster configurationAdvanced optionsInit Scriptsadd script.

Example:

  • Source: s3
  • File path: s3://your-s3-bucket/init-scripts-dir/definity_init.sh
  • Region: ...

Add static definity Spark configurations

Cluster configurationAdvanced optionsInit ScriptsSpark.

Example:

spark.extraListeners ai.definity.spark.AppListener
spark.definity.server https://app.definity.run
spark.definity.api.token eyJhb...
...

Note: the cluster is created with a default Spark session. The above configurations will monitor this session. To monitor multiple tasks for the same cluster see below.

Multi-Task Clusters

If the cluster is not isolated but shared and used by multiple tasks, we need to set a few "context properties" per such task execution, in order to be able to attribute it to that execution in definity.

Multi-Task notebook or python jobs

  • In that case you can just pass definity configs under "parameters" field of spark_python_task or "base_parameters" field of notebook_task, for example:
  • this is controlled by spark.definity.databricks.automaticSessions.enabled=true (set to false if you do it manually)
   ...
"tasks": [
{
"task_key": "task1",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_1",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task1"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "task2",
"notebook_task": {
"notebook_path": "/Workspace/Users/user@org/task_notebook_2",
"source": "WORKSPACE",
"base_parameters": {
"spark.definity.pipeline.name": "my_pipeline",
"spark.definity.pipeline.pit": "2025-01-01 01:00:00",
"spark.definity.task.name": "task2"
}
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
{
"task_key": "python_task1",
"spark_python_task": {
"python_file": "s3://my-bucket/python_task.py",
"parameters": ["yourArg1", "yourArg2", ..., "spark.definity.task.name=python_task_1", "spark.definity.pipeline.name=my_pipeline", "spark.definity.pipeline.pit=2025-01-01 01:00:00"]
},
"existing_cluster_id": "${DATABRICKS_CLUSTER}"
},
],
"format": "MULTI_TASK",
"queue": {
"enabled": true
}
...

Manually set scope

For other cases, you can set your task scope explicitly using spark configs, for example:

 // 1. set definity context properties such as task name, pipeline.name etc
spark.conf.set("spark.definity.task.name", "task1")
spark.conf.set("spark.definity.pipeline.name", "pipe1")
spark.conf.set("spark.definity.pipeline.pit", "2024-12-08 01:00:00")

// 2. finally set spark.definity.session property to signal you have finished with the settings
spark.conf.set("spark.definity.session", "")
...
try {
// here comes your job's logic
spark.sql("your job's first query").show()
spark.sql("some other query").write.save("someOutputPath")
...
} finally {
// 3. clear definity session property to signal that the job has finished (in a finally block, to get called on failures too)
spark.conf.unset("spark.definity.session")
}
...