Amazon EMR
Compatibility Matrix
| EMR Release | Spark Version | Scala Version | Definity Agent |
|---|---|---|---|
| 7.0 - 7.10 | 3.5.x | 2.12 | 3.5_2.12-latest |
| 6.12 - 6.15 | 3.4.x | 2.12 | 3.4_2.12-latest |
| 6.8 - 6.11 | 3.3.x | 2.12 | 3.3_2.12-latest |
| 6.6 - 6.7 | 3.2.x | 2.12 | 3.2_2.12-latest |
| 6.3 - 6.5 | 3.1.x | 2.12 | 3.1_2.12-latest |
| 6.0 - 6.2 | 2.4.x | 2.12 | 2.4_2.12-latest |
Configuration Methods
There are two ways to configure the Definity agent on EMR:
- Bootstrap Action (Recommended) - Automatically configures the agent on cluster startup
- Job Submission - For serverless batches or when configuring the agent per job
Method 1: Bootstrap Action (Init Script)
Use a bootstrap action to automatically configure the Definity agent when the EMR cluster starts. The script will:
- Automatically detect your Spark and Scala versions
- Download the appropriate Definity Spark agent
- Configure the Definity plugin with default settings
- If configuration fails, the cluster will continue to start normally
For a quick evaluation, skip to Step 3 — just set your API token in the script.
1. Store Your API Token Securely
Store your Definity API token in AWS Secrets Manager:
aws secretsmanager create-secret \
--name definity/api-token \
--secret-string "<YOUR_DEFINITY_API_TOKEN>"
Then replace the DEFINITY_API_TOKEN line in the init script with:
DEFINITY_API_TOKEN=$(aws secretsmanager get-secret-value \
--secret-id definity/api-token \
--query SecretString --output text)
Make sure the EMR cluster's IAM role has secretsmanager:GetSecretValue permission for the secret.
2. Upload the Agent JARs
Download the agent JARs for the Spark/Scala versions you use (see Compatibility Matrix) and upload them to a location accessible from your cluster. The init script auto-detects the Spark and Scala version at startup and fetches the matching JAR.
Supported storage options:
| Storage | ARTIFACT_BASE_PATH example | Notes |
|---|---|---|
| S3 | "s3://your-bucket/definity" | Cluster needs an instance profile or IAM role with S3 access |
| HTTP/HTTPS | "https://your-artifactory.com/repo/libs-release" | Artifactory, Nexus, or any HTTP server |
The expected JAR filename convention is: definity-spark-agent-{spark}_{scala}-{version}.jar (e.g. definity-spark-agent-3.5_2.12-latest.jar).
3. Create an Init Script
Create an init script to automatically download and configure the Definity Spark agent. Set ARTIFACT_BASE_PATH to match your setup from the previous step:
definity_init.sh
#!/bin/bash
# ============================================================================
# Definity Agent Configuration for AWS EMR & Google Cloud Dataproc
# ============================================================================
# This script automatically detects your Spark and Scala versions and
# installs the appropriate Definity Spark Agent.
#
# IMPORTANT: Do not hardcode your API token below. Instead, fetch it from
# a secrets manager at runtime (see Step 1 in the documentation):
#
# AWS : aws secretsmanager get-secret-value --secret-id definity/api-token ...
# GCP : gcloud secrets versions access latest --secret=definity-api-token
#
# If installation fails, the cluster will start normally without the agent.
# ============================================================================
# ============================================================================
# CONFIGURATION
# ============================================================================
# Base path to the agent JARs.
# The script auto-detects Spark/Scala and appends the JAR filename, e.g.:
# {base_path}/definity-spark-agent-3.5_2.12-latest.jar
#
# IMPORTANT: For production use, upload the agent JAR to your own
# The definity.run URL shown here is for demonstration purposes only.
# artifact repository (Artifactory, Nexus, S3, etc.) and update this URL. For example:
# S3 : "s3://your-bucket/definity"
# GCS : "gs://your-bucket/definity"
# HTTP/HTTPS : "https://your-artifactory.company.com/repository/libs-release"
ARTIFACT_BASE_PATH="https://user:[email protected]/java"
# Version of the Definity agent (e.g. "0.80.2")
DEFINITY_AGENT_VERSION="latest"
# Definity API token — replace this with a secrets manager call (see above)
DEFINITY_API_TOKEN="<YOUR_API_TOKEN>"
# ============================================================================
echo "==============================================================="
echo "Definity Agent configuration"
echo "==============================================================="
# ============================================================================
# VERSION DETECTION
# ============================================================================
SPARK_VERSION=""
SCALA_VERSION=""
echo "Detecting Spark and Scala versions..."
# Method 1: RELEASE file
if [ -f /usr/lib/spark/RELEASE ]; then
FULL_SPARK_VERSION=$(cat /usr/lib/spark/RELEASE)
SPARK_VERSION=$(echo "$FULL_SPARK_VERSION" | grep -oE '[0-9]+\.[0-9]+' | head -n 1)
SCALA_VERSION=$(echo "$FULL_SPARK_VERSION" | grep -oE 'scala-[0-9]+\.[0-9]+|_[0-9]+\.[0-9]+' | sed 's/scala-//;s/_//' | head -n 1)
fi
# Method 2: EMR version (fallback for EMR 6.3 and lower, or if Scala version missing)
if { [ -z "$SPARK_VERSION" ] || [ -z "$SCALA_VERSION" ]; } && [ -f /emr/instance-controller/lib/info/extraInstanceData.json ]; then
EMR_RELEASE=$(jq -r '.releaseLabel' /emr/instance-controller/lib/info/extraInstanceData.json 2>/dev/null || echo "")
if [ -n "$EMR_RELEASE" ]; then
case "$EMR_RELEASE" in
emr-7.*) SPARK_VERSION="3.5"; SCALA_VERSION="2.12" ;;
emr-6.1[2-5].*) SPARK_VERSION="3.4"; SCALA_VERSION="2.12" ;;
emr-6.[8-9].*|emr-6.1[01].*) SPARK_VERSION="3.3"; SCALA_VERSION="2.12" ;;
emr-6.[67].*) SPARK_VERSION="3.2"; SCALA_VERSION="2.12" ;;
emr-6.[3-5].*) SPARK_VERSION="3.1"; SCALA_VERSION="2.12" ;;
emr-6.0.*) SPARK_VERSION="2.4"; SCALA_VERSION="2.12" ;;
esac
fi
fi
if [ -z "$SPARK_VERSION" ] || [ -z "$SCALA_VERSION" ]; then
echo "Could not detect Spark or Scala version"
echo "Cluster will start without Definity agent"
exit 0
fi
echo "Detected: Spark $SPARK_VERSION, Scala $SCALA_VERSION"
# ============================================================================
# DOWNLOAD
# ============================================================================
FULL_AGENT_VERSION="${SPARK_VERSION}_${SCALA_VERSION}-${DEFINITY_AGENT_VERSION}"
AGENT_JAR_NAME="definity-spark-agent-${FULL_AGENT_VERSION}.jar"
AGENT_JAR_SRC="${ARTIFACT_BASE_PATH}/${AGENT_JAR_NAME}"
JAR_TEMP_PATH="/tmp/definity-spark-agent.jar"
echo "Fetching Definity Spark Agent ${FULL_AGENT_VERSION} from ${AGENT_JAR_SRC} ..."
if [[ "$ARTIFACT_BASE_PATH" == s3://* ]]; then
aws s3 cp "$AGENT_JAR_SRC" "$JAR_TEMP_PATH"
elif [[ "$ARTIFACT_BASE_PATH" == gs://* ]]; then
gsutil cp "$AGENT_JAR_SRC" "$JAR_TEMP_PATH"
else
curl -f --connect-timeout 30 --max-time 120 -o "$JAR_TEMP_PATH" "$AGENT_JAR_SRC"
fi
if [ $? -ne 0 ]; then
echo "Failed to fetch agent JAR from: $AGENT_JAR_SRC"
echo "Cluster will start without Definity agent"
exit 0
fi
echo "Agent JAR downloaded"
if [ -d /usr/lib/spark/jars ]; then
sudo cp "$JAR_TEMP_PATH" /usr/lib/spark/jars/definity-spark-agent.jar
echo "Agent copied to /usr/lib/spark/jars/"
fi
# ============================================================================
# PLUGIN REGISTRATION
# ============================================================================
cat > /tmp/definity_config.sh <<'SCRIPT_END'
#!/bin/bash
set -eu
TIMEOUT=300
START=$(date +%s)
LAST_LOG=0
JAR_TEMP_PATH="/tmp/definity-spark-agent.jar"
DEFINITY_API_TOKEN="${DEFINITY_API_TOKEN}"
check_timeout() {
local CONTEXT="${1:-unknown}"
local ELAPSED=$(($(date +%s) - START))
if [ $ELAPSED -ge $TIMEOUT ]; then
echo "ERROR: Timeout after ${TIMEOUT}s while waiting for: ${CONTEXT}"
exit 1
fi
if [ $((ELAPSED - LAST_LOG)) -ge 30 ] && [ $ELAPSED -gt 0 ]; then
echo "Still waiting for ${CONTEXT} (${ELAPSED}s elapsed)..."
LAST_LOG=$ELAPSED
fi
return 0
}
echo ""
echo "Background configuration started"
if [ ! -d /usr/lib/spark/jars ]; then
echo "Waiting for Spark jars directory..."
while [ ! -d /usr/lib/spark/jars ]; do
check_timeout "Spark jars directory"
sleep 5
done
echo "Spark jars directory found"
fi
if [ ! -f /usr/lib/spark/jars/definity-spark-agent.jar ]; then
if [ -f "$JAR_TEMP_PATH" ]; then
sudo cp "$JAR_TEMP_PATH" /usr/lib/spark/jars/definity-spark-agent.jar
echo "Agent JAR copied to /usr/lib/spark/jars/"
else
echo "ERROR: JAR not found at $JAR_TEMP_PATH"
exit 1
fi
fi
if [ ! -f /etc/spark/conf/spark-defaults.conf ]; then
echo "Waiting for spark-defaults.conf..."
while [ ! -f /etc/spark/conf/spark-defaults.conf ]; do
check_timeout "spark-defaults.conf"
sleep 5
done
echo "spark-defaults.conf found"
fi
cat >> /etc/spark/conf/spark-defaults.conf <<DEFINITY_CONF
spark.plugins ai.definity.spark.plugin.DefinitySparkPlugin
spark.extraListeners ai.definity.spark.AppListener
spark.executor.plugins ai.definity.spark.plugin.executor.DefinityExecutorPlugin
spark.definity.server https://app.definity.run
spark.definity.api.token ${DEFINITY_API_TOKEN}
DEFINITY_CONF
echo "Definity properties added to spark-defaults.conf"
echo ""
echo "==============================================================="
echo "Definity Spark Agent configured successfully"
echo "==============================================================="
SCRIPT_END
chmod +x /tmp/definity_config.sh
export DEFINITY_API_TOKEN
nohup sudo -E /tmp/definity_config.sh &
exit 0
Do not hardcode the API token in the init script. Follow Step 1 above to store the token in a secrets manager and fetch it at runtime.
4. Upload the Init Script to S3
aws s3 cp definity_init.sh s3://your-bucket/scripts/definity_init.sh
5. Create Cluster with Bootstrap Action
aws emr create-cluster \
--name "EMR Cluster with Definity" \
--release-label emr-7.0.0 \
--applications Name=Spark \
--bootstrap-actions Path=s3://your-bucket/scripts/definity_init.sh \
--ec2-attributes KeyName=your-key \
--instance-type m5.xlarge \
--instance-count 3
6. Configure Additional Settings [Optional]
You can extend the spark-defaults.conf section in the init script to include additional configuration parameters.
Method 2: Job Submission
Alternatively, you can specify the Definity agent JAR and configuration parameters directly when submitting each job. This approach gives you more control over individual job configurations but requires specifying the agent settings for every submission.
Cluster Job Submission
Submit a job to an existing EMR cluster:
aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
"Name": "Example PySpark Job",
"Type": "Spark",
"ActionOnFailure": "CONTINUE",
"Args": [
"--deploy-mode", "cluster",
"--master", "yarn",
"--jars", "s3://path/to/definity-spark-agent-X-X.jar",
"--conf", "spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin",
"--conf", "spark.definity.server=https://app.definity.run",
"--conf", "spark.definity.api.token='$DEFINITY_API_TOKEN'",
"--conf", "spark.definity.env.name=demo",
"--conf", "spark.definity.pipeline.name=example_pipeline",
"--conf", "spark.definity.pipeline.pit=2024-12-10",
"--conf", "spark.definity.task.name=example_task",
"--conf", "spark.executor.memory=3g",
"--conf", "spark.executor.cores=3",
"s3://path/to/task.py"
]
}]'
EMR Serverless Batch Submission
aws emr-serverless start-job-run \
--application-id <your-application-id> \
--execution-role-arn <your-execution-role-arn> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://your-bucket/your-spark-job.jar",
"entryPointArguments": ["arg1", "arg2"],
"sparkSubmitParameters": "--class com.example.spark.MySparkJob --jars s3://your-bucket/definity-spark-agent-X-X.jar --conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin --conf spark.definity.server=https://app.definity.run --conf spark.definity.api.token=$DEFINITY_API_TOKEN --conf spark.definity.env.name=demo --conf spark.definity.pipeline.name=example_pipeline"
}
}'