Skip to main content

Spark Agent

Spark Agent Jars

Latest version is 0.42.1

Download relevant version:

# latest
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-latest.jar
# or specific version
wget https://user:[email protected]/java/definity-spark-agent-[spark.version]-[agent.version].jar

Configuration Options

Basic

NameDetails
spark.jarsSpecifies definity-spark-agent-X.X.jar and optionally definity-spark-iceberg-1.2-X.X.jar. mandatory.
spark.extraListenersRequires ai.definity.spark.AppListener. mandatory.
spark.definity.serverSpecifies definity server URL, e.g., https://app.definity.run. mandatory.
spark.definity.enabledEnables or disables functionality with options: true, false, or opt-in (default: true). For opt-in, users can toggle this in the pipeline settings page.
spark.definity.api.tokenIntegration token required for SaaS usage. mandatory.
spark.definity.env.nameEnvironment name; defaults to default.
spark.definity.pipeline.namePipeline name; defaults to spark.app.name.
spark.definity.pipeline.pitPoint-in-Time grouping for tasks, defaults to the current time.
spark.definity.pipeline.run.idUsed for grouping tasks in the same run.
spark.definity.task.nameLogical name for the task, should be stable across runs; defaults to spark.app.name.
spark.definity.task.idUser defined task ID to show in the UI and notifications (e.g., YARN run ID); defaults to spark.app.name.
spark.definity.tagsComma-separated tags, supports key:value format (e.g., team:team-A).
spark.definity.email.toComma-separated list of notification recipient emails.

Advanced

NameDetails
spark.definity.task.heartbeat.intervalInterval in seconds for sending heartbeat to the server; defaults to 60.
spark.definity.server.request.retry.countNumber of retries for server request errors; defaults to 1.
spark.definity.ignoredTablesComma-separated list of tables to ignore. Names can be full (e.g., db_a.table_a) or partial (e.g., table_a), which applies to all databases.
spark.definity.files.sanitizedNamePatternRegular expression to extract time partitions from file names. Defaults to ^.*?(?=/\d+/|/[^/]_=[^/]_/). Set empty to disable.
spark.definity.delta.enabledEnables Delta instrumentation; defaults to true. Set to false to opt-out.
spark.definity.inputs.maxPerQueryMaximum number of allowed inputs per query; defaults to 100.
spark.definity.default.session.enabledEnables default session for multi-concurrent SparkSession apps; defaults to true. Set to false to disable.
spark.definity.default.session.rotationSecondsMaximum duration in seconds for the default session before rotation; defaults to 3600.
spark.definity.metrics.injection.enabledEnable in flight data distribution metrics; defaults to false.
spark.definity.debugEnable debug logs; defaults to false.
spark.definity.databricks.automaticSessions.enabledEnable auto detection of tasks in Databricks multi-task workflows; defaults to false. defaults to true.
spark.definity.events.enabledFlag to enable reporting of events. defaults to true.
spark.definity.events.maxPerTaskRunMaximum number of events to report in one task. defaults to 5000.
spark.definity.slowPlanning.thresholdSecondsThreshold to decide when execution planning is too slow and trigger event. defaults to 60.

Metrics Calculation

NameDetails
spark.definity.num.threadsNumber of threads for metrics calculation; defaults to 2.
spark.definity.metrics.timeoutTimeout for metrics calculation, in seconds; defaults to 180.
spark.definity.metrics.histogram.maxNumValuesMaximum number of values for histogram distribution; defaults to 10.
spark.definity.metrics.executorsMetrics.enabledSpecifies whether to extract metrics from Spark's ExecutorMetricsUpdate event; defaults to true.
spark.definity.metrics.timeSeries.intervalSecondsTime-series metrics bucket size in seconds; defaults to 60.
spark.definity.driver.containerMemoryTotal container memory for the driver in bytes (for client mode).
spark.definity.driver.heapMemoryTotal heap memory for the driver in bytes (for client mode).

Custom Metrics

It is possible to report custom metrics to definity based on a custom sql logic. We track collected queries results and report them as custom user metrics if the result dataset contains two columns:

  • definity_metric_name - String
  • definity_metric_value - Numeric

Example:

spark.sql(
"select 'my_new_metric' as definity_metric_name, 1.5 as definity_metric_value").collect()

Output Diversion

Useful for CI shadow runs flows

NameDetails
spark.definity.output.table.suffixSuffix to add to all output tables
spark.definity.output.database.suffixSuffix to add to all output tables' database name
spark.definity.output.database.baseLocationBase location for all the created output databases
spark.definity.output.file.baseLocationBase location for output files. Either a full base location path, to divert all files to a single location regardless of their original location, or partial path to keep each in its own bucket but under a different base directory. e.g: - gs://my-tests-bucket, or my-tests-base-dir)

Skew events

Skew events are calculated in the executors and use Spark's plugins mechanism

NameDetails
spark.pluginsDefines ai.definity.spark.plugin.DefinitySparkPlugin to use definity Spark plugin.
spark.definity.plugin.executor.driverPollingIntervalMsInterval in milliseconds between consecutive polling requests from executor to driver when using the Definity plugin; defaults to 20000.
spark.definity.skewDetection.minTaskSkewTimeSecondsMinimum difference in seconds between suspected skewed task duration and the average task duration in its stage; defaults to 60.
spark.definity.skewDetection.minTaskSkewFactorMinimum ratio between suspected skewed task duration and the average task duration in its stage; defaults to 2.
spark.definity.skewDetection.samplingRatioSampling ratio of task rows (e.g., 0.01 equals 1% sampling); defaults to 0.01.
spark.definity.skewDetection.maxSampledRowsPerTaskMaximum number of sampled rows per task; defaults to 1000.
spark.definity.skewDetection.maxReportedKeysPerTaskMaximum number of reported keys per task; defaults to 10.

Examples

Pyspark

 spark = (
SparkSession.builder.appName("demo_pyspark"))
.config("spark.jars", "definity-spark-agent-X.X.jar")
.config("spark.extraListeners", "ai.definity.spark.AppListener")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.env.name", "dev")
.config("spark.definity.pipeline.name", "demo-pipeline")
.config("spark.definity.pipeline.pit", "2023-04-01")
.config("spark.definity.task.name", "demo-spark-task")
.enableHiveSupport()
.getOrCreate()
)

Airflow Integration

When submitting Spark jobs through Airflow we can set some of the parameters to match those of Airflow.

With Jinja template:

 with DAG(
dag_id="spark_dag",
) as dag:
op1 = BashOperator(
task_id="spark_task",
bash_command="spark-submit ... \
--jars ...,definity-spark-agent-X.X.jar \
--conf spark.extraListeners=ai.definity.spark.AppListener \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=dev \
--conf spark.definity.pipeline.name='{{ dag_run.dag_id }}' \
--conf spark.definity.pipeline.pit='{{ dag_run.execution_date }}' \
--conf spark.definity.task.name='{{ ti.task_id }}' \
...",
)