Spark Agent
Spark Agent Jars
Latest version is 0.42.1
Download relevant version:
# latest
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-latest.jar
# or specific version
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar
- For Spark 2.3: spark-agent
- For Spark 2.4: spark-agent
- For Spark 3.1: spark-agent
- For Spark 3.2: spark-agent, spark-iceberg 1.2
- For Spark 3.3: spark-agent, spark-iceberg 1.2
- For Spark 3.4: spark-agent
- For Spark 3.5: spark-agent
Configuration Options
Basic
Name | Details |
---|---|
spark.jars | Specifies definity-spark-agent-X.X.jar and optionally definity-spark-iceberg-1.2-X.X.jar . mandatory. |
spark.extraListeners | Requires ai.definity.spark.AppListener . mandatory. |
spark.definity.server | Specifies definity server URL, e.g., https://app.definity.run . mandatory. |
spark.definity.enabled | Enables or disables functionality with options: true , false , or opt-in (default: true ). For opt-in , users can toggle this in the pipeline settings page. |
spark.definity.api.token | Integration token required for SaaS usage. mandatory. |
spark.definity.env.name | Environment name; defaults to default . |
spark.definity.pipeline.name | Pipeline name; defaults to spark.app.name . |
spark.definity.pipeline.pit | Point-in-Time grouping for tasks, defaults to the current time. |
spark.definity.pipeline.run.id | Used for grouping tasks in the same run. |
spark.definity.task.name | Logical name for the task, should be stable across runs; defaults to spark.app.name . |
spark.definity.task.id | User defined task ID to show in the UI and notifications (e.g., YARN run ID); defaults to spark.app.name . |
spark.definity.tags | Comma-separated tags, supports key:value format (e.g., team:team-A ). |
spark.definity.email.to | Comma-separated list of notification recipient emails. |
Advanced
Name | Details |
---|---|
spark.definity.task.heartbeat.interval | Interval in seconds for sending heartbeat to the server; defaults to 60 . |
spark.definity.server.request.retry.count | Number of retries for server request errors; defaults to 1 . |
spark.definity.ignoredTables | Comma-separated list of tables to ignore. Names can be full (e.g., db_a.table_a ) or partial (e.g., table_a ), which applies to all databases. |
spark.definity.files.sanitizedNamePattern | Regular expression to extract time partitions from file names. Defaults to ^.*?(?=/\d+/|/[^/]_=[^/]_/) . Set empty to disable. |
spark.definity.delta.enabled | Enables Delta instrumentation; defaults to true . Set to false to opt-out. |
spark.definity.inputs.maxPerQuery | Maximum number of allowed inputs per query; defaults to 100 . |
spark.definity.default.session.enabled | Enables default session for multi-concurrent SparkSession apps; defaults to true . Set to false to disable. |
spark.definity.default.session.rotationSeconds | Maximum duration in seconds for the default session before rotation; defaults to 3600 . |
spark.definity.metrics.injection.enabled | Enable in flight data distribution metrics; defaults to false . |
spark.definity.debug | Enable debug logs; defaults to false . |
spark.definity.databricks.automaticSessions.enabled | Enable auto detection of tasks in Databricks multi-task workflows; defaults to false . defaults to true . |
spark.definity.events.enabled | Flag to enable reporting of events. defaults to true . |
spark.definity.events.maxPerTaskRun | Maximum number of events to report in one task. defaults to 5000 . |
spark.definity.slowPlanning.thresholdSeconds | Threshold to decide when execution planning is too slow and trigger event. defaults to 60 . |
Metrics Calculation
Name | Details |
---|---|
spark.definity.num.threads | Number of threads for metrics calculation; defaults to 2 . |
spark.definity.metrics.timeout | Timeout for metrics calculation, in seconds; defaults to 180 . |
spark.definity.metrics.histogram.maxNumValues | Maximum number of values for histogram distribution; defaults to 10 . |
spark.definity.metrics.executorsMetrics.enabled | Specifies whether to extract metrics from Spark's ExecutorMetricsUpdate event; defaults to true . |
spark.definity.metrics.timeSeries.intervalSeconds | Time-series metrics bucket size in seconds; defaults to 60 . |
spark.definity.driver.containerMemory | Total container memory for the driver in bytes (for client mode). |
spark.definity.driver.heapMemory | Total heap memory for the driver in bytes (for client mode). |
Custom Metrics
It is possible to report custom metrics to definity based on a custom sql logic. We track collected queries results and report them as custom user metrics if the result dataset contains two columns:
- definity_metric_name - String
- definity_metric_value - Numeric
Example:
spark.sql(
"select 'my_new_metric' as definity_metric_name, 1.5 as definity_metric_value").collect()
Output Diversion
Useful for CI shadow runs flows
Name | Details |
---|---|
spark.definity.output.table.suffix | Suffix to add to all output tables |
spark.definity.output.database.suffix | Suffix to add to all output tables' database name |
spark.definity.output.database.baseLocation | Base location for all the created output databases |
spark.definity.output.file.baseLocation | Base location for output files. Either a full base location path, to divert all files to a single location regardless of their original location, or partial path to keep each in its own bucket but under a different base directory. e.g: - gs://my-tests-bucket , or my-tests-base-dir ) |
Skew events
Skew events are calculated in the executors and use Spark's plugins mechanism
Name | Details |
---|---|
spark.plugins | Defines ai.definity.spark.plugin.DefinitySparkPlugin to use definity Spark plugin. |
spark.definity.plugin.executor.driverPollingIntervalMs | Interval in milliseconds between consecutive polling requests from executor to driver when using the Definity plugin; defaults to 20000 . |
spark.definity.skewDetection.minTaskSkewTimeSeconds | Minimum difference in seconds between suspected skewed task duration and the average task duration in its stage; defaults to 60 . |
spark.definity.skewDetection.minTaskSkewFactor | Minimum ratio between suspected skewed task duration and the average task duration in its stage; defaults to 2 . |
spark.definity.skewDetection.samplingRatio | Sampling ratio of task rows (e.g., 0.01 equals 1% sampling); defaults to 0.01 . |
spark.definity.skewDetection.maxSampledRowsPerTask | Maximum number of sampled rows per task; defaults to 1000 . |
spark.definity.skewDetection.maxReportedKeysPerTask | Maximum number of reported keys per task; defaults to 10 . |
Examples
Pyspark
spark = (
SparkSession.builder.appName("demo_pyspark"))
.config("spark.jars", "definity-spark-agent-X.X.jar")
.config("spark.extraListeners", "ai.definity.spark.AppListener")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.env.name", "dev")
.config("spark.definity.pipeline.name", "demo-pipeline")
.config("spark.definity.pipeline.pit", "2023-04-01")
.config("spark.definity.task.name", "demo-spark-task")
.enableHiveSupport()
.getOrCreate()
)
Airflow Integration
When submitting Spark jobs through Airflow we can set some of the parameters to match those of Airflow.
With Jinja template:
with DAG(
dag_id="spark_dag",
) as dag:
op1 = BashOperator(
task_id="spark_task",
bash_command="spark-submit ... \
--jars ...,definity-spark-agent-X.X.jar \
--conf spark.extraListeners=ai.definity.spark.AppListener \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=dev \
--conf spark.definity.pipeline.name='{{ dag_run.dag_id }}' \
--conf spark.definity.pipeline.pit='{{ dag_run.execution_date }}' \
--conf spark.definity.task.name='{{ ti.task_id }}' \
...",
)