PySpark
This guide shows how to integrate Definity with standalone PySpark applications.
Basic Setup
Add Definity configuration when creating your SparkSession:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("demo_pyspark")
.config("spark.jars", "definity-spark-agent-3.5_2.12-0.75.1.jar")
.config("spark.plugins", "ai.definity.spark.plugin.DefinitySparkPlugin")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.env.name", "dev")
.config("spark.definity.pipeline.name", "demo-pipeline")
.config("spark.definity.pipeline.pit", "2023-04-01")
.config("spark.definity.task.name", "demo-spark-task")
.enableHiveSupport()
.getOrCreate()
)
Configuration Parameters
Replace the following with your actual values:
spark.jars- Path to the Definity agent JAR (see Installation)spark.definity.server- Your Definity server URLspark.definity.env.name- Environment name (e.g., dev, staging, prod)spark.definity.pipeline.name- Name of your data pipelinespark.definity.pipeline.pit- Point-in-time for this run (e.g., date or timestamp)spark.definity.task.name- Name of this specific task
Authentication
For SaaS deployments, add your API token:
.config("spark.definity.api.token", "your-token-here")
Alternatively, set the DEFINITY_API_TOKEN environment variable on the driver.
Using spark-submit
If you're using spark-submit, pass the configuration via command-line arguments:
spark-submit \
--jars definity-spark-agent-3.5_2.12-0.75.1.jar \
--conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin \
--conf spark.definity.server=https://app.definity.run \
--conf spark.definity.env.name=dev \
--conf spark.definity.pipeline.name=demo-pipeline \
--conf spark.definity.pipeline.pit=2023-04-01 \
--conf spark.definity.task.name=demo-spark-task \
your_script.py
Dynamic Configuration
For parameterized pipelines, use Python variables:
import sys
from datetime import datetime
# Read parameters
pipeline_name = sys.argv[1]
execution_date = sys.argv[2] if len(sys.argv) > 2 else datetime.now().strftime("%Y-%m-%d")
spark = (
SparkSession.builder
.appName(f"{pipeline_name}_task")
.config("spark.jars", "definity-spark-agent-3.5_2.12-0.75.1.jar")
.config("spark.plugins", "ai.definity.spark.plugin.DefinitySparkPlugin")
.config("spark.definity.server", "https://app.definity.run")
.config("spark.definity.pipeline.name", pipeline_name)
.config("spark.definity.pipeline.pit", execution_date)
.config("spark.definity.task.name", f"{pipeline_name}_task")
.getOrCreate()
)
Next Steps
- See Configuration Reference for all available parameters
- Learn about Custom Metrics to track KPIs
- Explore Tracking Modes for advanced scenarios