Skip to main content

Amazon EMR

Compatibility Matrix

EMR ReleaseSpark VersionScala VersionDefinity Agent
7.0 - 7.103.5.x2.123.5_2.12-latest
6.12 - 6.153.4.x2.123.4_2.12-latest
6.8 - 6.113.3.x2.123.3_2.12-latest
6.6 - 6.73.2.x2.123.2_2.12-latest
6.3 - 6.53.1.x2.123.1_2.12-latest
6.0 - 6.22.4.x2.122.4_2.12-latest

Configuration Methods

There are two ways to configure the Definity agent on EMR:

  1. Bootstrap Action (Recommended) - Automatically configures the agent on cluster startup
  2. Job Submission - For serverless batches or when configuring the agent per job

Method 1: Bootstrap Action (Init Script)

Use a bootstrap action to automatically configure the Definity agent when the EMR cluster starts. The script will:

  • Automatically detect your Spark and Scala versions
  • Download the appropriate Definity Spark agent
  • Configure the Definity plugin with default settings
  • If configuration fails, the cluster will continue to start normally
Quick Evaluation

For a quick evaluation, skip to Step 3 — just set your API token in the script.

1. Store Your API Token Securely

Store your Definity API token in AWS Secrets Manager:

aws secretsmanager create-secret \
--name definity/api-token \
--secret-string "<YOUR_DEFINITY_API_TOKEN>"

Then replace the DEFINITY_API_TOKEN line in the init script with:

DEFINITY_API_TOKEN=$(aws secretsmanager get-secret-value \
--secret-id definity/api-token \
--query SecretString --output text)
tip

Make sure the EMR cluster's IAM role has secretsmanager:GetSecretValue permission for the secret.

2. Upload the Agent JARs

Download the agent JARs for the Spark/Scala versions you use (see Compatibility Matrix) and upload them to a location accessible from your cluster. The init script auto-detects the Spark and Scala version at startup and fetches the matching JAR.

Supported storage options:

StorageARTIFACT_BASE_PATH exampleNotes
S3"s3://your-bucket/definity"Cluster needs an instance profile or IAM role with S3 access
HTTP/HTTPS"https://your-artifactory.com/repo/libs-release"Artifactory, Nexus, or any HTTP server

The expected JAR filename convention is: definity-spark-agent-{spark}_{scala}-{version}.jar (e.g. definity-spark-agent-3.5_2.12-latest.jar).

3. Create an Init Script

Create an init script to automatically download and configure the Definity Spark agent. Set ARTIFACT_BASE_PATH to match your setup from the previous step:

definity_init.sh
#!/bin/bash

# ============================================================================
# Definity Agent Configuration for AWS EMR & Google Cloud Dataproc
# ============================================================================
# This script automatically detects your Spark and Scala versions and
# installs the appropriate Definity Spark Agent.
#
# IMPORTANT: Do not hardcode your API token below. Instead, fetch it from
# a secrets manager at runtime (see Step 1 in the documentation):
#
# AWS : aws secretsmanager get-secret-value --secret-id definity/api-token ...
# GCP : gcloud secrets versions access latest --secret=definity-api-token
#
# If installation fails, the cluster will start normally without the agent.
# ============================================================================

# ============================================================================
# CONFIGURATION
# ============================================================================

# Base path to the agent JARs.
# The script auto-detects Spark/Scala and appends the JAR filename, e.g.:
# {base_path}/definity-spark-agent-3.5_2.12-latest.jar
#
# IMPORTANT: For production use, upload the agent JAR to your own
# The definity.run URL shown here is for demonstration purposes only.
# artifact repository (Artifactory, Nexus, S3, etc.) and update this URL. For example:
# S3 : "s3://your-bucket/definity"
# GCS : "gs://your-bucket/definity"
# HTTP/HTTPS : "https://your-artifactory.company.com/repository/libs-release"
ARTIFACT_BASE_PATH="https://user:[email protected]/java"

# Version of the Definity agent (e.g. "0.80.2")
DEFINITY_AGENT_VERSION="latest"

# Definity API token — replace this with a secrets manager call (see above)
DEFINITY_API_TOKEN="<YOUR_API_TOKEN>"

# ============================================================================

echo "==============================================================="
echo "Definity Agent configuration"
echo "==============================================================="

# ============================================================================
# VERSION DETECTION
# ============================================================================

SPARK_VERSION=""
SCALA_VERSION=""

echo "Detecting Spark and Scala versions..."

# Method 1: RELEASE file
if [ -f /usr/lib/spark/RELEASE ]; then
FULL_SPARK_VERSION=$(cat /usr/lib/spark/RELEASE)
SPARK_VERSION=$(echo "$FULL_SPARK_VERSION" | grep -oE '[0-9]+\.[0-9]+' | head -n 1)
SCALA_VERSION=$(echo "$FULL_SPARK_VERSION" | grep -oE 'scala-[0-9]+\.[0-9]+|_[0-9]+\.[0-9]+' | sed 's/scala-//;s/_//' | head -n 1)
fi

# Method 2: EMR version (fallback for EMR 6.3 and lower, or if Scala version missing)
if { [ -z "$SPARK_VERSION" ] || [ -z "$SCALA_VERSION" ]; } && [ -f /emr/instance-controller/lib/info/extraInstanceData.json ]; then
EMR_RELEASE=$(jq -r '.releaseLabel' /emr/instance-controller/lib/info/extraInstanceData.json 2>/dev/null || echo "")

if [ -n "$EMR_RELEASE" ]; then
case "$EMR_RELEASE" in
emr-7.*) SPARK_VERSION="3.5"; SCALA_VERSION="2.12" ;;
emr-6.1[2-5].*) SPARK_VERSION="3.4"; SCALA_VERSION="2.12" ;;
emr-6.[8-9].*|emr-6.1[01].*) SPARK_VERSION="3.3"; SCALA_VERSION="2.12" ;;
emr-6.[67].*) SPARK_VERSION="3.2"; SCALA_VERSION="2.12" ;;
emr-6.[3-5].*) SPARK_VERSION="3.1"; SCALA_VERSION="2.12" ;;
emr-6.0.*) SPARK_VERSION="2.4"; SCALA_VERSION="2.12" ;;
esac
fi
fi

if [ -z "$SPARK_VERSION" ] || [ -z "$SCALA_VERSION" ]; then
echo "Could not detect Spark or Scala version"
echo "Cluster will start without Definity agent"
exit 0
fi

echo "Detected: Spark $SPARK_VERSION, Scala $SCALA_VERSION"

# ============================================================================
# DOWNLOAD
# ============================================================================

FULL_AGENT_VERSION="${SPARK_VERSION}_${SCALA_VERSION}-${DEFINITY_AGENT_VERSION}"
AGENT_JAR_NAME="definity-spark-agent-${FULL_AGENT_VERSION}.jar"
AGENT_JAR_SRC="${ARTIFACT_BASE_PATH}/${AGENT_JAR_NAME}"
JAR_TEMP_PATH="/tmp/definity-spark-agent.jar"

echo "Fetching Definity Spark Agent ${FULL_AGENT_VERSION} from ${AGENT_JAR_SRC} ..."

if [[ "$ARTIFACT_BASE_PATH" == s3://* ]]; then
aws s3 cp "$AGENT_JAR_SRC" "$JAR_TEMP_PATH"
elif [[ "$ARTIFACT_BASE_PATH" == gs://* ]]; then
gsutil cp "$AGENT_JAR_SRC" "$JAR_TEMP_PATH"
else
curl -f --connect-timeout 30 --max-time 120 -o "$JAR_TEMP_PATH" "$AGENT_JAR_SRC"
fi

if [ $? -ne 0 ]; then
echo "Failed to fetch agent JAR from: $AGENT_JAR_SRC"
echo "Cluster will start without Definity agent"
exit 0
fi

echo "Agent JAR downloaded"

if [ -d /usr/lib/spark/jars ]; then
sudo cp "$JAR_TEMP_PATH" /usr/lib/spark/jars/definity-spark-agent.jar
echo "Agent copied to /usr/lib/spark/jars/"
fi

# ============================================================================
# PLUGIN REGISTRATION
# ============================================================================

cat > /tmp/definity_config.sh <<'SCRIPT_END'
#!/bin/bash
set -eu

TIMEOUT=300
START=$(date +%s)
LAST_LOG=0
JAR_TEMP_PATH="/tmp/definity-spark-agent.jar"

DEFINITY_API_TOKEN="${DEFINITY_API_TOKEN}"

check_timeout() {
local CONTEXT="${1:-unknown}"
local ELAPSED=$(($(date +%s) - START))

if [ $ELAPSED -ge $TIMEOUT ]; then
echo "ERROR: Timeout after ${TIMEOUT}s while waiting for: ${CONTEXT}"
exit 1
fi

if [ $((ELAPSED - LAST_LOG)) -ge 30 ] && [ $ELAPSED -gt 0 ]; then
echo "Still waiting for ${CONTEXT} (${ELAPSED}s elapsed)..."
LAST_LOG=$ELAPSED
fi
return 0
}

echo ""
echo "Background configuration started"

if [ ! -d /usr/lib/spark/jars ]; then
echo "Waiting for Spark jars directory..."
while [ ! -d /usr/lib/spark/jars ]; do
check_timeout "Spark jars directory"
sleep 5
done
echo "Spark jars directory found"
fi

if [ ! -f /usr/lib/spark/jars/definity-spark-agent.jar ]; then
if [ -f "$JAR_TEMP_PATH" ]; then
sudo cp "$JAR_TEMP_PATH" /usr/lib/spark/jars/definity-spark-agent.jar
echo "Agent JAR copied to /usr/lib/spark/jars/"
else
echo "ERROR: JAR not found at $JAR_TEMP_PATH"
exit 1
fi
fi

if [ ! -f /etc/spark/conf/spark-defaults.conf ]; then
echo "Waiting for spark-defaults.conf..."
while [ ! -f /etc/spark/conf/spark-defaults.conf ]; do
check_timeout "spark-defaults.conf"
sleep 5
done
echo "spark-defaults.conf found"
fi

cat >> /etc/spark/conf/spark-defaults.conf <<DEFINITY_CONF

spark.plugins ai.definity.spark.plugin.DefinitySparkPlugin
spark.extraListeners ai.definity.spark.AppListener
spark.executor.plugins ai.definity.spark.plugin.executor.DefinityExecutorPlugin
spark.definity.server https://app.definity.run
spark.definity.api.token ${DEFINITY_API_TOKEN}

DEFINITY_CONF

echo "Definity properties added to spark-defaults.conf"

echo ""
echo "==============================================================="
echo "Definity Spark Agent configured successfully"
echo "==============================================================="

SCRIPT_END

chmod +x /tmp/definity_config.sh

export DEFINITY_API_TOKEN

nohup sudo -E /tmp/definity_config.sh &

exit 0
Token Security

Do not hardcode the API token in the init script. Follow Step 1 above to store the token in a secrets manager and fetch it at runtime.

4. Upload the Init Script to S3

aws s3 cp definity_init.sh s3://your-bucket/scripts/definity_init.sh

5. Create Cluster with Bootstrap Action

aws emr create-cluster \
--name "EMR Cluster with Definity" \
--release-label emr-7.0.0 \
--applications Name=Spark \
--bootstrap-actions Path=s3://your-bucket/scripts/definity_init.sh \
--ec2-attributes KeyName=your-key \
--instance-type m5.xlarge \
--instance-count 3

6. Configure Additional Settings [Optional]

You can extend the spark-defaults.conf section in the init script to include additional configuration parameters.


Method 2: Job Submission

Alternatively, you can specify the Definity agent JAR and configuration parameters directly when submitting each job. This approach gives you more control over individual job configurations but requires specifying the agent settings for every submission.

Cluster Job Submission

Submit a job to an existing EMR cluster:

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
"Name": "Example PySpark Job",
"Type": "Spark",
"ActionOnFailure": "CONTINUE",
"Args": [
"--deploy-mode", "cluster",
"--master", "yarn",
"--jars", "s3://path/to/definity-spark-agent-X-X.jar",
"--conf", "spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin",
"--conf", "spark.definity.server=https://app.definity.run",
"--conf", "spark.definity.api.token='$DEFINITY_API_TOKEN'",
"--conf", "spark.definity.env.name=demo",
"--conf", "spark.definity.pipeline.name=example_pipeline",
"--conf", "spark.definity.pipeline.pit=2024-12-10",
"--conf", "spark.definity.task.name=example_task",
"--conf", "spark.executor.memory=3g",
"--conf", "spark.executor.cores=3",
"s3://path/to/task.py"
]
}]'

EMR Serverless Batch Submission

aws emr-serverless start-job-run \
--application-id <your-application-id> \
--execution-role-arn <your-execution-role-arn> \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://your-bucket/your-spark-job.jar",
"entryPointArguments": ["arg1", "arg2"],
"sparkSubmitParameters": "--class com.example.spark.MySparkJob --jars s3://your-bucket/definity-spark-agent-X-X.jar --conf spark.plugins=ai.definity.spark.plugin.DefinitySparkPlugin --conf spark.definity.server=https://app.definity.run --conf spark.definity.api.token=$DEFINITY_API_TOKEN --conf spark.definity.env.name=demo --conf spark.definity.pipeline.name=example_pipeline"
}
}'