Node Decommission¶
This section shows how to use an Apache Spark feature that allows you to store the shuffle data and cached RDD blocks present on the terminating executors to peer executors before a Spot node gets decommissioned. Consequently, your job does not need to recalculate the shuffle and RDD blocks of the terminating executor that would otherwise be lost, thus allowing the job to have minimal delay in completion.
This feature is supported for releases EMR 6.3.0+.
How does it work?¶
When spark.decommission.enabled
is true, Spark will try its best to shut down the executor gracefully. spark.storage.decommission.enabled
will enable migrating data stored on the executor. Spark will try to migrate all the cached RDD blocks (controlled by spark.storage.decommission.rddBlocks.enabled
) and shuffle blocks (controlled by spark.storage.decommission.shuffleBlocks.enabled
) from the decommissioning executor to all remote executors when spark decommission is enabled. Relevant Spark configurations for using node decommissioning in the jobs are
Configuration | Description | Default Value |
---|---|---|
spark.decommission.enabled | Whether to enable decommissioning | false |
spark.storage.decommission.enabled | Whether to decommission the block manager when decommissioning executor | false |
spark.storage.decommission.rddBlocks.enabled | Whether to transfer RDD blocks during block manager decommissioning. | false |
spark.storage.decommission.shuffleBlocks.enabled | Whether to transfer shuffle blocks during block manager decommissioning. Requires a migratable shuffle resolver (like sort based shuffle) | false |
spark.storage.decommission.maxReplicationFailuresPerBlock | Maximum number of failures which can be handled for migrating shuffle blocks when block manager is decommissioning and trying to move its existing blocks. | 3 |
spark.storage.decommission.shuffleBlocks.maxThreads | Maximum number of threads to use in migrating shuffle files. | 8 |
This feature can currently be enabled through a temporary workaround on EMR 6.3.0+ releases. To enable it, Spark’s decom.sh file permission must be modified using a custom image. Once the code is fixed, the page will be updated.
Dockerfile for custom image:
FROM <release account id>.dkr.ecr.<aws region>.amazonaws.com/spark/<release>
USER root
WORKDIR /home/hadoop
RUN chown hadoop:hadoop /usr/bin/decom.sh
Setting decommission timeout:
Each executor has to be decommissioned within a certain time limit controlled by the pod’s terminationGracePeriodSeconds configuration. The default value is 30 secs but can be modified using a custom pod template. The pod template for this modification would look like
apiVersion: v1
kind: Pod
spec:
terminationGracePeriodSeconds: <seconds>
Note: terminationGracePeriodSeconds timeout should be lesser than spot instance timeout with around 5 seconds buffer kept aside for triggering the node termination
Request:
cat >spark-python-with-node-decommissioning.json << EOF
{
"name": "my-job-run-with-node-decommissioning",
"virtualClusterId": "<virtual-cluster-id>",
"executionRoleArn": "<execution-role-arn>",
"releaseLabel": "emr-6.3.0-latest",
"jobDriver": {
"sparkSubmitJobDriver": {
"entryPoint": "s3://<s3 prefix>/trip-count.py",
"sparkSubmitParameters": "--conf spark.driver.cores=5 --conf spark.executor.memory=20G --conf spark.driver.memory=15G --conf spark.executor.cores=6"
}
},
"configurationOverrides": {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.kubernetes.container.image": "<account_id>.dkr.ecr.<region>.amazonaws.com/<custom_image_repo>",
"spark.executor.instances": "5",
"spark.decommission.enabled": "true",
"spark.storage.decommission.rddBlocks.enabled": "true",
"spark.storage.decommission.shuffleBlocks.enabled" : "true",
"spark.storage.decommission.enabled": "true"
}
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "<log group>",
"logStreamNamePrefix": "<log-group-prefix>"
},
"s3MonitoringConfiguration": {
"logUri": "<S3 URI>"
}
}
}
}
EOF
Observed Behavior:
When executors begin decommissioning, its shuffle data gets migrated to peer executors instead of recalculating the shuffle blocks again. If sending shuffle blocks to an executor fails, spark.storage.decommission.maxReplicationFailuresPerBlock
will give the number of retries for migration. The driver log’s stderr will see log lines Updating map output for <shuffle_id> to BlockManagerId(<executor_id>, <ip_address>, <port>, <topology_info>)
denoting details about shuffle block