Skip to main content

PySpark

This guide shows how to integrate Definity with standalone PySpark applications.

Basic Setup

Add Definity configuration when creating your SparkSession:

from pyspark.sql import SparkSession

spark = (
SparkSession.builder
.appName("demo_pyspark")
.config("spark.jars", "definity-spark-agent-3.5_2.12-0.75.1.jar")
.config("spark.plugins", "ai.definity.spark.plugin.DefinitySparkPlugin")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.env.name", "dev")
.config("spark.definity.pipeline.name", "demo-pipeline")
.config("spark.definity.pipeline.pit", "2023-04-01")
.config("spark.definity.task.name", "demo-spark-task")
.enableHiveSupport()
.getOrCreate()
)

Configuration Parameters

Replace the following with your actual values:

  • spark.jars - Path to the Definity agent JAR (see Installation)
  • spark.definity.server - Your Definity server URL
  • spark.definity.env.name - Environment name (e.g., dev, staging, prod)
  • spark.definity.pipeline.name - Name of your data pipeline
  • spark.definity.pipeline.pit - Point-in-time for this run (e.g., date or timestamp)
  • spark.definity.task.name - Name of this specific task

Authentication

For SaaS deployments, add your API token:

.config("spark.definity.api.token", "your-token-here")

Alternatively, set the DEFINITY_API_TOKEN environment variable on the driver.

Using spark-submit

If you're using spark-submit, pass the configuration via command-line arguments:

spark-submit \
--jars definity-spark-agent-3.5_2.12-0.75.1.jar \
--conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=dev \
--conf spark.definity.pipeline.name=demo-pipeline \
--conf spark.definity.pipeline.pit=2023-04-01 \
--conf spark.definity.task.name=demo-spark-task \
your_script.py

Dynamic Configuration

For parameterized pipelines, use Python variables:

import sys
from datetime import datetime

# Read parameters
pipeline_name = sys.argv[1]
execution_date = sys.argv[2] if len(sys.argv) > 2 else datetime.now().strftime("%Y-%m-%d")

spark = (
SparkSession.builder
.appName(f"{pipeline_name}_task")
.config("spark.jars", "definity-spark-agent-3.5_2.12-0.75.1.jar")
.config("spark.plugins", "ai.definity.spark.plugin.DefinitySparkPlugin")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.pipeline.name", pipeline_name)
.config("spark.definity.pipeline.pit", execution_date)
.config("spark.definity.task.name", f"{pipeline_name}_task")
.getOrCreate()
)

Next Steps