Running Spark on Mesos
- How it Works
- Installing Mesos
- Connecting Spark to Mesos
- Mesos Run Modes
- Mesos Docker Support
- Running Alongside Hadoop
- Dynamic Resource Allocation with Mesos
- Troubleshooting and Debugging
Spark can run on hardware clusters managed by Apache Mesos.
The advantages of deploying Spark with Mesos include:
- dynamic partitioning between Spark and other frameworks
- scalable partitioning between multiple instances of Spark
How it Works
In a standalone cluster deployment, the cluster manager in the below diagram is a Spark master instance. When using Mesos, the Mesos master replaces the Spark master as the cluster manager.
Now when a driver creates a job and starts issuing tasks for scheduling, Mesos determines what machines handle what tasks. Because it takes into account other frameworks when scheduling these many short-lived tasks, multiple frameworks can coexist on the same cluster without resorting to a static partitioning of resources.
To get started, follow the steps below to install Mesos and deploy Spark jobs via Mesos.
Spark 2.3.0-SNAPSHOT is designed for use with Mesos 1.0.0 or newer and does not require any special patches of Mesos. File and environment-based secrets support requires Mesos 1.3.0 or newer.
If you already have a Mesos cluster running, you can skip this Mesos installation step.
Otherwise, installing Mesos for Spark is no different than installing Mesos for use by other frameworks. You can install Mesos either from source or using prebuilt packages.
To install Apache Mesos from source, follow these steps:
- Download a Mesos release from a mirror
- Follow the Mesos Getting Started page for compiling and installing Mesos
Note: If you want to run Mesos without installing it into the default paths on your system
(e.g., if you lack administrative privileges to install it), pass the
--prefix option to
configure to tell it where to install. For example, pass
--prefix=/home/me/mesos. By default the prefix is
The Apache Mesos project only publishes source releases, not binary packages. But other third party projects publish binary releases that may be helpful in setting Mesos up.
One of those is Mesosphere. To install Mesos using the binary releases provided by Mesosphere:
- Download Mesos installation package from downloads page
- Follow their instructions for installation and configuration
The Mesosphere installation documents suggest setting up ZooKeeper to handle Mesos master failover, but Mesos can be run without ZooKeeper using a single master as well.
To verify that the Mesos cluster is ready for Spark, navigate to the Mesos master webui at port
:5050 Confirm that all expected machines are present in the slaves tab.
Connecting Spark to Mesos
To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and a Spark driver program configured to connect to Mesos.
Alternatively, you can also install Spark in the same location in all the Mesos slaves, and configure
spark.mesos.executor.home (defaults to SPARK_HOME) to point to that location.
Uploading Spark Package
When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
package for running the Spark Mesos executor backend.
The Spark package can be hosted at any Hadoop-accessible URI, including HTTP via
Amazon Simple Storage Service via
s3n://, or HDFS via
To use a precompiled package:
- Download a Spark binary package from the Spark download page
- Upload to hdfs/http/s3
To host on HDFS, use the Hadoop fs put command:
hadoop fs -put spark-2.3.0-SNAPSHOT.tar.gz
Or if you are using a custom-compiled version of Spark, you will need to create a package using
dev/make-distribution.sh script included in a Spark source tarball/checkout.
- Download and build Spark using the instructions here
- Create a binary package using
- Upload archive to http/s3/hdfs
Using a Mesos Master URL
The Master URLs for Mesos are in the form
mesos://host:5050 for a single-master Mesos
mesos://zk://host1:2181,host2:2181,host3:2181/mesos for a multi-master Mesos cluster using ZooKeeper.
In client mode, a Spark Mesos framework is launched directly on the client machine and waits for the driver output.
The driver needs some configuration in
spark-env.sh to interact properly with Mesos:
spark-env.shset some environment variables:
export MESOS_NATIVE_JAVA_LIBRARY=<path to libmesos.so>. This path is typically
<prefix>/lib/libmesos.sowhere the prefix is
/usr/localby default. See Mesos installation instructions above. On Mac OS X, the library is called
export SPARK_EXECUTOR_URI=<URL of spark-2.3.0-SNAPSHOT.tar.gz uploaded above>.
- Also set
<URL of spark-2.3.0-SNAPSHOT.tar.gz>.
Now when starting a Spark application against the cluster, pass a
URL as the master when creating a
SparkContext. For example:
val conf = new SparkConf() .setMaster("mesos://HOST:5050") .setAppName("My app") .set("spark.executor.uri", "<path to spark-2.3.0-SNAPSHOT.tar.gz uploaded above>") val sc = new SparkContext(conf)
When running a shell, the
spark.executor.uri parameter is inherited from
it does not need to be redundantly passed in as a system property.
./bin/spark-shell --master mesos://host:5050
Spark on Mesos also supports cluster mode, where the driver is launched in the cluster and the client can find the results of the driver from the Mesos Web UI.
To use cluster mode, you must start the
MesosClusterDispatcher in your cluster via the
passing in the Mesos master URL (e.g: mesos://host:5050). This starts the
MesosClusterDispatcher as a daemon running on the host.
By setting the Mesos proxy config property (requires mesos version >= 1.4),
--conf spark.mesos.proxy.baseURL=http://localhost:5050 when launching the dispacther, the mesos sandbox URI for each driver is added to the mesos dispatcher UI.
If you like to run the
MesosClusterDispatcher with Marathon, you need to run the
MesosClusterDispatcher in the foreground (i.e:
bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher). Note that the
MesosClusterDispatcher not yet supports multiple instances for HA.
MesosClusterDispatcher also supports writing recovery state into Zookeeper. This will allow the
MesosClusterDispatcher to be able to recover all submitted and running containers on relaunch. In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env by configuring
spark.deploy.recoveryMode and related spark.deploy.zookeeper.* configurations.
For more information about these configurations please refer to the configurations doc.
You can also specify any additional jars required by the
MesosClusterDispatcher in the classpath by setting the environment variable SPARK_DAEMON_CLASSPATH in spark-env.
From the client, you can submit a job to Mesos cluster by running
spark-submit and specifying the master URL
to the URL of the
MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
Spark cluster Web UI.
./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master mesos://220.127.116.11:7077 \ --deploy-mode cluster \ --supervise \ --executor-memory 20G \ --total-executor-cores 100 \ http://path/to/examples.jar \ 1000
Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves, as the Spark driver doesn’t automatically upload local jars.
Mesos Run Modes
Spark can run over Mesos in two modes: “coarse-grained” (default) and “fine-grained” (deprecated).
In “coarse-grained” mode, each Spark executor runs as a single Mesos task. Spark executors are sized according to the following configuration variables:
- Executor memory:
- Executor cores:
- Number of executors:
Please see the Spark Configuration page for details and default values.
Executors are brought up eagerly when the application starts, until
spark.cores.max is reached. If you don’t set
Spark application will consume all resources offered to it by Mesos,
so we of course urge you to set this variable in any sort of
multi-tenant cluster, including one which runs multiple concurrent
The scheduler will start executors round-robin on the offers Mesos gives it, but there are no spread guarantees, as Mesos does not provide such guarantees on the offer stream.
In this mode spark executors will honor port allocation if such is
provided from the user. Specifically if the user defines
spark.blockManager.port in Spark configuration,
the mesos scheduler will check the available offers for a valid port
range containing the port numbers. If no such range is available it will
not launch any task. If no restriction is imposed on port numbers by the
user, ephemeral ports are used as usual. This port honouring implementation
implies one task per host if the user defines a port. In the future network
isolation shall be supported.
The benefit of coarse-grained mode is much lower startup overhead, but at the cost of reserving Mesos resources for the complete duration of the application. To configure your job to dynamically adjust to its resource requirements, look into Dynamic Allocation.
In “fine-grained” mode, each Spark task inside the Spark executor runs as a separate Mesos task. This allows multiple instances of Spark (and other frameworks) to share cores at a very fine granularity, where each application gets more or fewer cores as it ramps up and down, but it comes with an additional overhead in launching each task. This mode may be inappropriate for low-latency requirements like interactive queries or serving web requests.
Note that while Spark tasks in fine-grained will relinquish cores as they terminate, they will not relinquish memory, as the JVM does not give memory back to the Operating System. Neither will executors terminate when they’re idle.
To run in fine-grained mode, set the
spark.mesos.coarse property to false in your
You may also make use of
spark.mesos.constraints to set
attribute-based constraints on Mesos resource offers. By default, all
resource offers will be accepted.
For example, Let’s say
spark.mesos.constraints is set to
os:centos7;us-east-1:false, then the resource offers will
be checked to see if they meet both these constraints and only then will be accepted to start new executors.
To constrain where driver tasks are run, use
Mesos Docker Support
Spark can make use of a Mesos Docker containerizer by setting the property
in your SparkConf.
The Docker image used must have an appropriate version of Spark already part of the image, or you can have Mesos download Spark via the usual methods.
Requires Mesos version 0.20.1 or later.
Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
tags you can set
true in order to force the agent to always pull the
image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.
Running Alongside Hadoop
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
separate service on the machines. To access Hadoop data from Spark, a full
hdfs:// URL is required
hdfs://<namenode>:9000/path, but you can find the right URL on your Hadoop Namenode web
In addition, it is possible to also run Hadoop MapReduce on Mesos for better resource isolation and sharing between the two. In this case, Mesos will act as a unified scheduler that assigns cores to either Hadoop or Spark, as opposed to having them share resources via the Linux scheduler on each node. Please refer to Hadoop on Mesos.
In either case, HDFS runs separately from Hadoop MapReduce, without being scheduled through Mesos.
Dynamic Resource Allocation with Mesos
Mesos supports dynamic allocation only with coarse-grained mode, which can resize the number of executors based on statistics of the application. For general information, see Dynamic Resource Allocation.
The External Shuffle Service to use is the Mesos Shuffle Service. It provides shuffle data cleanup functionality
on top of the Shuffle Service since Mesos doesn’t yet support notifying another framework’s
termination. To launch it, run
$SPARK_HOME/sbin/start-mesos-shuffle-service.sh on all slave nodes, with
spark.shuffle.service.enabled set to
This can also be achieved through Marathon, using a unique host constraint, and the following command:
See the configuration page for information on Spark configurations. The following configs are specific for Spark on Mesos.
If set to
||Set the extra number of cores for an executor to advertise. This does not result in more cores allocated. It instead means that an executor will "pretend" it has more cores, so that the driver will send it more tasks. Use this to increase parallelism. This setting is only used for Mesos coarse-grained mode.|
||(Fine-grained mode only) Number of cores to give each Mesos executor. This does not include the cores used to run the Spark tasks. In other words, even if no Spark task is being run, each Mesos executor will occupy the number of cores configured here. The value can be a floating point number.|
Set the name of the docker image that the Spark executors will run in. The selected
image must have Spark installed, as well as a compatible version of the Mesos library.
The installed path of Spark in the image can be specified with
Force Mesos agents to pull the image specified in
Set the list of custom parameters which will be passed into the
Set the list of volumes which will be mounted into the Docker image, which was set using
||(none)||Set the Mesos labels to add to each task. Labels are free-form key-value pairs. Key-value pairs should be separated by a colon, and commas used to list more than one. If your label includes a colon or comma, you can escape it with a backslash. Ex. key:value,key2:a\:b.|
Set the directory in which Spark is installed on the executors in Mesos. By default, the
executors will simply use the driver's Spark home directory, which may not be visible to
them. Note that this is only relevant if a Spark binary package is not specified through
||executor memory * 0.10, with minimum of 384||
The amount of additional memory, specified in MB, to be allocated per executor. By default,
the overhead will be larger of either 384 or 10% of
||(none)||A comma-separated list of URIs to be downloaded to the sandbox when driver or executor is launched by Mesos. This applies to both coarse-grained and fine-grained mode.|
||(none)||Set the principal with which Spark framework will use to authenticate with Mesos.|
||(none)||Set the secret with which Spark framework will use to authenticate with Mesos. Used, for example, when authenticating with the registry.|
||Set the role of this Spark framework for Mesos. Roles are used in Mesos for reservations and resource weight sharing.|
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. This setting
applies only to executors. Refer to Mesos
Attributes & Resources for more information on attributes.
||This only affects docker containers, and must be one of "docker" or "mesos". Mesos supports two types of containerizers for docker: the "docker" containerizer, and the preferred "mesos" containerizer. Read more here: http://mesos.apache.org/documentation/latest/container-image/|
||Set the Spark Mesos driver webui_url for interacting with the framework. If unset it will point to Spark's internal web UI.|
Mesos labels to add to the driver. See
A secret is specified by its contents and destination. These properties specify a secret's contents. To specify a secret's destination, see the cell below.
You can specify a secret's contents either (1) by value or (2) by reference.
(1) To specify a secret by value, set the
(2) To specify a secret that has been placed in a secret store
by reference, specify its name within the secret store
by setting the
Note: To use a secret store, make sure one has been integrated with Mesos via a custom SecretResolver module.
To specify multiple secrets, provide a comma-separated list:
A secret is specified by its contents and destination. These properties specify a secret's destination. To specify a secret's contents, see the cell above.
You can specify a secret's destination in the driver or executors as either (1) an environment variable or (2) as a file.
(1) To make an environment-based secret, set the
(2) To make a file-based secret, set the
Paths are relative to the container's work directory. Absolute paths must already exist. Note: File-based secrets require a custom SecretResolver module.
To specify env vars or file names corresponding to multiple secrets, provide a comma-separated list:
||This only affects drivers submitted in cluster mode. Add the environment variable specified by EnvironmentVariableName to the driver process. The user can specify multiple of these to set multiple environment variables.|
||Set the Spark Mesos dispatcher webui_url for interacting with the framework. If unset it will point to Spark's internal web UI.|
||Set default properties for drivers submitted through the dispatcher. For example, spark.mesos.dispatcher.driverProperty.spark.executor.memory=32g results in the executors for all drivers submitted in cluster mode to run in 32g containers.|
||Set the URL of the history server. The dispatcher will then link each driver to its entry in the history server.|
||Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found since this configuration is just a upper limit and not a guaranteed amount.|
||Attach containers to the given named network. If this job is launched in cluster mode, also launch the driver in the given named network. See the Mesos CNI docs for more details.|
Pass network labels to CNI plugins. This is a comma-separated list
of key-value pairs, where each key-value pair has the format key:value.
key1:val1,key2:val2See the Mesos CNI docs for more details.
||If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache|
||The amount of time (in seconds) that the master will wait for the driver to reconnect, after being temporarily disconnected, before it tears down the driver framework by killing all its executors. The default value is zero, meaning no timeout: if the driver disconnects, the master immediately tears down the framework.|
||Time to consider unused resources refused, serves as a fallback of `spark.mesos.rejectOfferDurationForUnmetConstraints`, `spark.mesos.rejectOfferDurationForReachedMaxCores`|
||Time to consider unused resources refused with unmet constraints|
Time to consider unused resources refused when maximum number of cores
Troubleshooting and Debugging
A few places to look during debugging:
- Mesos master on port
- Slaves should appear in the slaves tab
- Spark applications should appear in the frameworks tab
- Tasks should appear in the details of a framework
- Check the stdout and stderr of the sandbox of failed tasks
- Mesos logs
- Master and slave logs are both in
- Master and slave logs are both in
And common pitfalls:
- Spark assembly not reachable/accessible
- Slaves must be able to download the Spark binary package from the
s3n://URL you gave
- Slaves must be able to download the Spark binary package from the
- Firewall blocking communications
- Check for messages about failed connections
- Temporarily disable firewalls for debugging and then poke appropriate holes