Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Hands-On Deep Learning with Apache Spark

You're reading from   Hands-On Deep Learning with Apache Spark Build and deploy distributed deep learning applications on Apache Spark

Arrow left icon
Product type Paperback
Published in Jan 2019
Publisher Packt
ISBN-13 9781788994613
Length 322 pages
Edition 1st Edition
Languages
Arrow right icon
Author (1):
Arrow left icon
Guglielmo Iozzia Guglielmo Iozzia
Author Profile Icon Guglielmo Iozzia
Guglielmo Iozzia
Arrow right icon
View More author details
Toc

Table of Contents (19) Chapters Close

Preface 1. The Apache Spark Ecosystem FREE CHAPTER 2. Deep Learning Basics 3. Extract, Transform, Load 4. Streaming 5. Convolutional Neural Networks 6. Recurrent Neural Networks 7. Training Neural Networks with Spark 8. Monitoring and Debugging Neural Network Training 9. Interpreting Neural Network Output 10. Deploying on a Distributed System 11. NLP Basics 12. Textual Analysis and Deep Learning 13. Convolution 14. Image Classification 15. What's Next for Deep Learning? 16. Other Books You May Enjoy Appendix A: Functional Programming in Scala 1. Appendix B: Image Data Preparation for Spark

Cluster mode using different managers

The following diagram shows how Spark applications run on a cluster. They are independent sets of processes that are coordinated by the SparkContext object in the Driver Program. SparkContext connects to a Cluster Manager, which is responsible for allocating resources across applications. Once the SparkContext is connected, Spark gets executors across cluster nodes.

Executors are processes that execute computations and store data for a given Spark application. SparkContext sends the application code (which could be a JAR file for Scala or .py files for Python) to the executors. Finally, it sends the tasks to run to the executors:

Figure 1.12

To isolate applications from each other, every Spark application receives its own executor processes. They stay alive for the duration of the whole application and run tasks in multithreading mode. The downside to this is that it isn't possible to share data across different Spark applications to share it, data needs to be persisted to an external storage system.

Spark supports different cluster managers, but it is agnostic to the underlying type.

The driver program, at execution time, must be network addressable from the worker nodes because it has to listen for and accept incoming connections from its executors. Because it schedules tasks on the cluster, it should be executed close to the worker nodes, on the same local area network (if possible).

The following are the cluster managers that are currently supported in Spark:

  • Standalone: A simple cluster manager that makes it easy to set up a cluster. It is included with Spark.
  • Apache Mesos: An open source project that's used to manage computer clusters, and was developed at the University of California, Berkeley.
  • Hadoop YARN: The resource manager available in Hadoop starting from release 2.
  • Kubernetes: An open source platform for providing a container-centric infrastructure. Kubernetes support in Spark is still experimental, so it's probably not ready for production yet.

Standalone mode

For standalone mode, you only need to place a compiled version of Spark on each node of the cluster. All the cluster nodes need to be able to resolve the hostnames of the other cluster members and are routable to one another. The Spark master URL can be configured in the $SPARK_HOME/conf/spark-defaults.conf file on all of the nodes:

spark.master                     spark://<master_hostname_or_IP>:7077

Then, the hostname or IP address of the Spark master node needs to be specified in the $SPARK_HOME/conf/spark-env.sh file on all of the nodes, as follows:

SPARK_MASTER_HOST,               <master_hostname_or_IP>

It is now possible to start a standalone master server by executing the following script:

$SPARK_HOME/sbin/start-master.sh

Once the master has completed, a web UI will be available at the http://<master_hostname_or_IP>:8080 URL. From there, it is possible to obtain the master URL that's to be used when starting the workers. One or more workers can now be started by executing the following script:

$SPARK_HOME/sbin/start-slave.sh <master-spark-URL>

Each worker, after the start, comes with its own web UI, whose URL is http://<worker_hostname_or_IP>:8081.

The list of workers, along with other information about their number of CPUs and memory, can be found in the master's web UI.

The way to do this is to run a standalone cluster manually. It is also possible to use the provided launch scripts. A $SPARK_HOME/conf/slaves file needs to be created as a preliminary step. It must contain the hostnames one per line of all of the machines where the Spark workers should start. Passwordless SSH (short for Secure Shell) for the Spark master to the Spark slaves needs to be enabled to allow remote login for the slave daemon startup and shutdown actions. A cluster can then be launched or stopped using the following shell scripts, which are available in the $SPARK_HOME/sbin directory:

  • start-master.sh: Starts a master instance
  • start-slaves.sh: Starts a slave instance on each machine specified in the conf/slaves file
  • start-slave.sh: Starts a single slave instance
  • start-all.sh: Starts both a master and a number of slaves
  • stop-master.sh: Stops a master that has been started via the sbin/start-master.sh script
  • stop-slaves.sh: Stops all slave instances on the nodes specified in the conf/slaves file
  • stop-all.sh: Stops both a master and its slaves

These scripts must be executed on the machine the Spark master will run on.

It is possible to run an interactive Spark shell against a cluster in the following way:

$SPARK_HOME/bin/spark-shell --master <master-spark-URL>

The $SPARK_HOME/bin/spark-submit script can be used to submit a compiled Spark application to the cluster. Spark currently supports two deploy modes for standalone clusters: client and cluster. In client mode, the driver and the client that submits the application are launched in the same process, while in cluster mode, the driver is launched from one of the worker processes and the client process exits as soon as it completes submitting the application (it doesn't have to wait for the application to finish).
When an application is launched through spark-submit, then its JAR file is automatically distributed to all the worker nodes. Any additional JAR that an application depends on should be specified through the jars flag using a comma as a delimiter (for example, jars, jar1, jar2).

As mentioned in the Apache Spark fundamentals section, in standalone mode, the Spark master is a single point of failure. This means that if the Spark master node should go down, the Spark cluster would stop functioning and all currently submitted or running applications would fail, and it wouldn't be possible to submit new applications.

High availability can be configured using Apache ZooKeeper (https://zookeeper.apache.org/), an open source and highly reliable distributed coordination service, or can be deployed as a cluster through Mesos or YARN, which we will talk about in the following two sections.

Mesos cluster mode

Spark can run on clusters that are managed by Apache Mesos (http://mesos.apache.org/). Mesos is a cross-platform, cloud provider-agnostic, centralized, and fault-tolerant cluster manager, designed for distributed computing environments. Among its main features, it provides resource management and isolation, and the scheduling of CPU and memory across the cluster. It can join multiple physical resources into a single virtual one, and in doing so is different from classic virtualization, where a single physical resource is split into multiple virtual resources. With Mesos, it is possible to build or schedule cluster frameworks such as Apache Spark (though it is not restricted to just this). The following diagram shows the Mesos architecture:

Figure 1.13

Mesos consists of a master daemon and frameworks. The master daemon manages agent daemons running on each cluster node, while the Mesos frameworks run tasks on the agents. The master empowers fine-grained sharing of resources (including CPU and RAM) across frameworks by making them resource offers. It decides how much of the available resources to offer to each framework, depending on given organizational policies. To support diverse sets of policies, the master uses a modular architecture that makes it easy to add new allocation modules through a plugin mechanism. A Mesos framework consists of two components a scheduler, which registers itself with the master to be offered resources, and an executor, a process that is launched on agent nodes to execute the framework's tasks. While it is the master that determines how many resources are offered to each framework, the frameworks' schedulers are responsible for selecting which of the offered resources to use. The moment a framework accepts offered resources, it passes a description of the tasks it wants to execute on them to Mesos. Mesos, in turn, launches the tasks on the corresponding agents.

The advantages of deploying a Spark cluster using Mesos to replace the Spark Master Manager include the following:

  • Dynamic partitioning between Spark and other frameworks
  • Scalable partitioning between multiple instances of Spark

Spark 2.2.1 is designed to be used with Mesos 1.0.0+. In this section, I won't describe the steps to deploy a Mesos cluster I am assuming that a Mesos cluster is already available and running. No particular procedure or patch is required in terms of Mesos installation to run Spark on it. To verify that the Mesos cluster is ready for Spark, navigate to the Mesos master web UI at port 5050:

Figure 1.14

Check that all of the expected machines are present in the Agents tab.

To use Mesos from Spark, a Spark binary package needs to be available in a place that's accessible by Mesos itself, and a Spark driver program needs to be configured to connect to Mesos. Alternatively, it is possible to install Spark in the same location across all the Mesos slaves and then configure the spark.mesos.executor.home property (the default value is $SPARK_HOME) to point to that location.

The Mesos master URLs have the form mesos://host:5050 for a single-master Mesos cluster, or mesos://zk://host1:2181,host2:2181,host3:2181/mesos for a multi-master Mesos cluster when using Zookeeper.

The following is an example of how to start a Spark shell on a Mesos cluster:

$SPARK_HOME/bin/spark-shell --master mesos://127.0.0.1:5050 -c spark.mesos.executor.home=`pwd`

A Spark application can be submitted to a Mesos managed Spark cluster as follows:

$SPARK_HOME/bin/spark-submit --master mesos://127.0.0.1:5050 --total-executor-cores 2 --executor-memory 3G  $SPARK_HOME/examples/src/main/python/pi.py 100

YARN cluster mode

YARN (http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html), which was introduced in Apache Hadoop 2.0, brought significant improvements in terms of scalability, high availability, and support for different paradigms. In the Hadoop version 1 MapReduce framework, job execution was controlled by types of processes—a single master process called JobTracker coordinates all the jobs running on the cluster and assigns map and reduce tasks to run on the TaskTrackers, which are a number of subordinate processes running assigned tasks and periodically reporting the progress to the JobTracker. Having a single JobTracker was a scalability bottleneck. The maximum cluster size was a little more than 4,000 nodes, with the number of concurrent tasks limited to 40,000. Furthermore, the JobTracker was a single point of failure and the only available programming model was MapReduce.

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling or monitoring into separate daemons. The idea is to have a global ResourceManager and per-application ApplicationMaster (App Mstr). An application is either a single job or a DAG of jobs. The following is a diagram of YARN's architecture:

Figure 1.15

The ResourceManager and the NodeManager form the YARN framework. The ResourceManager decides on resource usage across all the running applications, while the NodeManager is an agent running on any machine in the cluster and is responsible for the containers by monitoring their resource usage (including CPU and memory) and reporting to the ResourceManager. The ResourceManager consists of two components the scheduler and the ApplicationsManager. The scheduler is the component that's responsible for allocating resources to the various applications running, and it doesn't perform any monitoring of applications' statuses, nor offer guarantees about restarting any failed tasks. It performs scheduling based on an application's resource requirements.

The ApplicationsManager accepts job submissions and provides a service to restart the App Mstr container on any failure. The per-application App Mstr is responsible for negotiating the appropriate resource containers from the scheduler and monitoring their status and progress. YARN, by its nature, is a general scheduler, so support for non-MapReduce jobs (such as Spark jobs) is available for Hadoop clusters.

Submitting Spark applications on YARN

To launch Spark applications on YARN, the HADOOP_CONF_DIR or YARN_CONF_DIR env variable needs to be set and pointing to the directory that contains the client-side configuration files for the Hadoop cluster. These configurations are needed to connect to the YARN ResourceManager and to write to HDFS. This configuration is distributed to the YARN cluster so that all the containers used by the Spark application have the same configuration. To launch Spark applications on YARN, two deployment modes are available:

  • Cluster mode: In this case, the Spark driver runs inside an application master process that's managed by YARN on the cluster. The client can finish its execution after initiating the application.
  • Client mode: In this case, the driver runs and the client runs in the same process. The application master is used for the sole purpose of requesting resources from YARN.

Unlike the other modes, in which the master's address is specified in the master parameter, in YARN mode, the ResourceManager's address is retrieved from the Hadoop configuration. Therefore, the master parameter value is always yarn.

You can use the following command to launch a Spark application in cluster mode:

$SPARK_HOME/bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

In cluster mode, since the driver runs on a different machine than the client, the SparkContext.addJar method doesn't work with the files that are local to the client. The only choice is to include them using the jars option in the launch command.

Launching a Spark application in client mode happens the same way—the deploy-mode option value needs to change from cluster to client.

Kubernetes cluster mode

Kubernetes (https://kubernetes.io/) is an open source system that's used automate the deployment, scaling, and management of containerized applications. It was originally implemented at Google and then open sourced in 2014. The following are the main concepts of Kubernetes:

  • Pod: This is the smallest deployable unit of computing that can be created and managed. A pod can be seen as a group of one or more containers that share network and storage space, which also contains a specification for how to run those containers.
  • Deployment: This is a layer of abstraction whose primary purpose is to declare how many replicas of a pod should be running at a time.
  • Ingress: This is an open channel for communication with a service running in a pod.
  • Node: This is a representation of a single machine in a cluster.
  • Persistent volume: This provides a filesystem that can be mounted to a cluster, not to be associated with any particular node. This is the way Kubernetes persists information (data, files, and so on).

The following diagram (source: https://d33wubrfki0l68.cloudfront.net/518e18713c865fe67a5f23fc64260806d72b38f5/61d75/images/docs/post-ccm-arch.png) shows the Kubernetes architecture:

Figure 1.16

The main components of the Kubernetes architecture are as follows:

  • Cloud controller manager: It runs the Kubernetes controllers
  • Controllers: There are four of them—node, route, service, and PersistenceVolumeLabels
  • Kubelets: The primary agents that run on nodes

The submission of Spark jobs to a Kubernetes cluster can be done directly through spark-submit. Kubernetes requires that we supply Docker (https://www.docker.com/) images that can be deployed into containers within pods. Starting from the 2.3 release, Spark provides a Dockerfile ($SPARK_HOME/kubernetes/dockerfiles/Dockerfile, which can also be customized to match specific applications' needs) and a script ($SPARK_HOME/bin/docker-image-tool.sh) that can be used to build and publish Docker images that are to be used within a Kubernetes backend. The following is the syntax that's used to build a Docker image through the provided script:

$SPARK_HOME/bin/docker-image-tool.sh -r <repo> -t my-tag build

This following is the syntax to push an image to a Docker repository while using the same script:

$SPARK_HOME/bin/docker-image-tool.sh -r <repo> -t my-tag push

A job can be submitted in the following way:

$SPARK_HOME/bin/spark-submit \
--master k8s://https://<k8s_hostname>:<k8s_port> \
--deploy-mode cluster \
--name <application-name> \
--class <package>.<ClassName> \
--conf spark.executor.instances=<instance_count> \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/<sparkjob>.jar

Kubernetes requires application names to contain only lowercase alphanumeric characters, hyphens, and dots, and to start and end with an alphanumeric character.

The following diagram shows the way the submission mechanism works:

Figure 1.17

Here's what happens:

  • Spark creates a driver that's running within a Kubernetes pod
  • The driver creates the executors, which also run within Kubernetes pods, and then connects to them and executes application code
  • At the end of the execution, the executor pods terminate and are cleaned up, while the driver pod still persists logs and remains in a completed state (which means that it doesn't use cluster computation or memory resources) in the Kubernetes API (until it's eventually garbage collected or manually deleted)

You have been reading a chapter from
Hands-On Deep Learning with Apache Spark
Published in: Jan 2019
Publisher: Packt
ISBN-13: 9781788994613
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image