Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
PySpark Cookbook

You're reading from   PySpark Cookbook Over 60 recipes for implementing big data processing and analytics using Apache Spark and Python

Arrow left icon
Product type Paperback
Published in Jun 2018
Publisher Packt
ISBN-13 9781788835367
Length 330 pages
Edition 1st Edition
Languages
Concepts
Arrow right icon
Authors (2):
Arrow left icon
Tomasz Drabas Tomasz Drabas
Author Profile Icon Tomasz Drabas
Tomasz Drabas
Denny Lee Denny Lee
Author Profile Icon Denny Lee
Denny Lee
Arrow right icon
View More author details
Toc

Table of Contents (9) Chapters Close

Preface 1. Installing and Configuring Spark 2. Abstracting Data with RDDs FREE CHAPTER 3. Abstracting Data with DataFrames 4. Preparing Data for Modeling 5. Machine Learning with MLlib 6. Machine Learning with the ML Module 7. Structured Streaming with PySpark 8. GraphFrames – Graph Theory with PySpark

Configuring a multi-node instance of Spark

Setting up a multi-node Spark cluster requires quite a few more steps to get it ready. In this recipe, we will go step-by-step through the script that will help you with this process; the script needs to run on the driver node and all the executors to set up the environment.

Getting ready

In this recipe, we are solely focusing on a Linux environment (we are using Ubuntu Server 16.04 LTS). The following prerequisites are required before you can follow with the rest of the recipe:

  • A clean installation of a Linux distribution; in our case, we have installed Ubuntu Server 16.04 LTS on each machine in our cluster of three Dell R710s.
  • Each machine needs to be connected to the internet and accessible from your local machine. You will need the machines' IPs and their hostnames; on Linux, you can check the IP by issuing the ifconfig command and reading the inet addr. To check your hostname, type at cat/etc/hostname
  • On each server, we added a user group called hadoop. Following this, we have created a user called hduser and added it to the hadoop group. Also, make sure that the hduser has sudo rights. If you do not know how to do this, check the See also section of this recipe.
  • Make sure you have added the ability to reach your servers via SSH. If you cannot do this, run sudo apt-get install openssh-server openssh-client on each server to install the necessary environments.
  • If you want to read and write to Hadoop and Hive, you need to have these two environments installed and configured on your cluster. Check https://data-flair.training/blogs/install-hadoop-2-x-on-ubuntu/ for Hadoop installation and configuration and http://www.bogotobogo.com/Hadoop/BigData_hadoop_Hive_Install_On_Ubuntu_16_04.php for Hive.
If you have these two environments set up, some of the steps from our script would be obsolete. However, we will present all of the steps as follows, assuming you only want the Spark environment.

No other prerequisites are required.

For the purpose of automating the deployment of the Spark environment in a cluster setup, you will also have to:

  1. Create a hosts.txt file. Each entry on the list is the IP address of one of the servers followed by two spaces and a hostname. Do not delete the driver: nor executors: lines. Also, note that we only allow one driver in our cluster (some clusters support redundant drivers). An example of the content of this file is as follows:
driver:
192.168.17.160 pathfinder
executors:
192.168.17.161 discovery1
192.168.17.162 discovery2
  1. On your local machine, add the IPs and hostnames to your /etc/hosts file so you can access the servers via hostnames instead of IPs (once again, we are assuming you are running a Unix-like system such as macOS or Linux). For example, the following command will add pathfinder to our /etc/hosts file: sudo echo 192.168.1.160  pathfinder >> /etc/hosts. Repeat this for all machines from your server.
  1. Copy the hosts.txt file to each machine in your cluster; we assume the file will be placed in the root folder for the hduser. You can attain this easily with the scp hosts.txt hduser@<your-server-name>:~ command, where <your-server-name> is the hostname of the machine.
  2. To run the installOnRemote.sh script (see the Chapter01/installOnRemote.sh file) from your local machine, do the following: ssh -tq hduser@<your-server-name> "echo $(base64 -i installOnRemote.sh) | base64 -d | sudo bash". We will go through these steps in detail in the installOnRemote.sh script in the next section.
  3. Follow the prompts on the screen to finalize the installation and configuration steps. Repeat step 4 for each machine in your cluster.

How to do it...

The installOnRemote.sh script for this recipe can be found in the Chapter01 folder in the GitHub repository: http://bit.ly/2ArlBck. Some portions of the script are very similar to the ones we have outlined in the previous recipes, so we will skip those; you can refer to previous recipes for more information (especially the Installing Spark requirements and the Installing Spark from binaries recipes). 

The top-level structure of the script is as follows:

#!/bin/bash
# Shell script for installing Spark from binaries
# on remote servers
#
# PySpark Cookbook
# Author: Tomasz Drabas, Denny Lee
# Version: 0.1
# Date: 12/9/2017
_spark_binary="http://mirrors.ocf.berkeley.edu/apache/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz"
_spark_archive=$( echo "$_spark_binary" | awk -F '/' '{print $NF}' )
_spark_dir=$( echo "${_spark_archive%.*}" )
_spark_destination="/opt/spark"
_java_destination="/usr/lib/jvm/java-8-oracle"

_python_binary="https://repo.continuum.io/archive/Anaconda3-5.0.1-Linux-x86_64.sh"

_python_archive=$( echo "$_python_binary" | awk -F '/' '{print $NF}' )
_python_destination="/opt/python"
_machine=$(cat /etc/hostname)
_today=$( date +%Y-%m-%d )
_current_dir=$(pwd) # store current working directory
...
printHeader
readIPs
checkJava
installScala
installPython

updateHosts
configureSSH
downloadThePackage
unpack
moveTheBinaries
setSparkEnvironmentVariables
updateSparkConfig
cleanUp

We have highlighted the portions of the script that are more relevant to this recipe in bold font.

How it works...

As with the previous recipes, we will first specify where we are going to download the Spark binaries from and create all the relevant global variables we are going to use later. 

Next, we read in the hosts.txt file:

function readIPs() {
input="./hosts.txt"
 driver=0
executors=0
_executors=""

IFS=''
while read line
do
 if [[ "$master" = "1" ]]; then
_driverNode="$line"
driver=0
fi
 if [[ "$slaves" = "1" ]]; then
_executors=$_executors"$line\n"
fi
 if [[ "$line" = "driver:" ]]; then
driver=1
executors=0
fi
 if [[ "$line" = "executors:" ]]; then
executors=1
driver=0
fi
 if [[ -z "${line}" ]]; then
continue
fi
done < "$input"
}

We store the path to the file in the input variable. The driver and the executors variables are flags we use to skip the "driver:" and the "executors:" lines from the input file. The _executors empty string will store the list of executors, which are delimited by a newline "\n".

IFS stands for internal field separator. Whenever bash reads a line from a file, it will split it on that character. Here, we will set it to an empty character '' so that we preserve the double spaces between the IP address and the hostname. 

Next, we start reading the file, line-by-line. Let's see how the logic works inside the loop; we'll start a bit out of order so that the logic is easier to understand:

  • If the line we just read equals to "driver:" (the if [[ "$line" = "driver:" ]]; conditional), we set the driver flag to 1 so that when the next line is read, we store it as a _driverNode (this is done inside the if [[ "$driver" = "1" ]]; conditional). Inside that conditional, we also reset the executors flag to 0. The latter is done in case you start with executors first, followed by a single driver in the hosts.txt. Once the line with the driver node information is read, we reset the driver flag to 0.
  • On the other hand, if the line we just read equals to "executors:" (the if [[ "$line" = "executors:" ]]; conditional), we set the executors flag to 1 (and reset the driver flag to 0). This guarantees that the next line read will be appended to the _executors string, separated by the "\n" newline character (this happens inside the if [[ "$executors" = "1" ]]; conditional). Note that we do not set the executor flag to 0 as we allow for more than one executor.
  • If we encounter an empty linewhich we can check for in bash with the if [[ -z "${line}" ]]; conditional—we skip it.

You might notice that we use the "<" redirection pipe to read in the data (indicated here by the input variable). 

You can read more about the redirection pipes here: http://www.tldp.org/LDP/abs/html/io-redirection.html.

Since Spark requires Java and Scala to work, next we have to check if Java is installed, and we will install Scala (as it normally isn't present while Java might be). This is achieved with the following functions:

function checkJava() {
if type -p java; then
echo "Java executable found in PATH"
_java=java
elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]]; then
echo "Found Java executable in JAVA_HOME"
_java="$JAVA_HOME/bin/java"
else
echo "No Java found. Install Java version $_java_required or higher first or specify JAVA_HOME variable that will point to your Java binaries."
installJava
fi
}
function installJava() {
sudo apt-get install python-software-properties
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
}
function installScala() {
sudo apt-get install scala
}

function installPython() {
curl -O "$_python_binary"
chmod 0755 ./"$_python_archive"
sudo bash ./"$_python_archive" -b -u -p "$_python_destination"
}

The logic here doesn't differ much from what we presented in the Installing Spark requirements recipe. The only notable difference in the checkJava function is that if we do not find Java on the PATH variable or inside the JAVA_HOME folder, we do not exit but run installJava, instead. 

There are many ways to install Java; we have already presented you with one of them earlier in this book—check the Installing Java section in the Installing Spark requirements recipe. Here, we used the built-in apt-get tool.

The apt-get tool is a convenient, fast, and efficient utility for installing packages on your Linux machine. APT stands for Advanced Packaging Tool.

First, we install the python-software-properties. This set of tools provides an abstraction of the used apt repositories. It enables easy management of distribution as well as independent software vendor software sources. We need this as in the next line we add the add-apt-repository; we add a new repository as we want the Oracle Java distribution. The sudo apt-get update command refreshes the contents of the repositories and, in our current case, fetches all the packages available in  ppa:webupd8team/java. Finally, we install the Java package: just follow the prompts on the screen. We will install Scala the same way.

The default location where the package should install is /usr/lib/jvm/java-8-oracle. If this is not the case or you want to install it in a different folder, you will have to alter the _java_destination variable inside the script to reflect the new destination.

The advantage of using this tool is this: if there are already Java and Scala environments installed on a machine, using apt-get will either skip the installation (if the environment is up-to-date with the one available on the server) or ask you to update to the newest version.

We will also install the Anaconda distribution of Python (as mentioned many times previously, since we highly recommend this distribution). To achieve this goal, we must download the Anaconda3-5.0.1-Linux-x86_64.sh script first and then follow the prompts on the screen. The -b parameter to the script will not update the .bashrc file (we will do that later), the -u switch will update the Python environment in case /usr/local/python already exists, and -p will force the installation to that folder.

Having passed the required installation steps, we will now update the /etc/hosts files on the remote machines:

function updateHosts() {
 _hostsFile="/etc/hosts"
 # make a copy (if one already doesn't exist)
if ! [ -f "/etc/hosts.old" ]; then
sudo cp "$_hostsFile" /etc/hosts.old
fi
 t="###################################################\n"
t=$t"#\n"
t=$t"# IPs of the Spark cluster machines\n"
t=$t"#\n"
t=$t"# Script: installOnRemote.sh\n"
t=$t"# Added on: $_today\n"
t=$t"#\n"
t=$t"$_driverNode\n"
t=$t"$_executors\n"
 sudo printf "$t" >> $_hostsFile
}

This is a simple function that, first, creates a copy of the /etc/hosts file, and then appends the IPs and hostnames of the machines in our cluster. Note that the format required by the /etc/hosts file is the same as in the hosts.txt file we use: per row, an IP address of the machine followed by two spaces followed by the hostname.

We use two spaces for readability purposes—one space separating an IP and the hostname would also work.

Also, note that we do not use the echo command here, but printf; the reason behind this is that the printf command prints out a formatted version of the string, properly handling the newline "\n" characters.

Next, we configure the passwordless SSH sessions (check the following See also subsection) to aid communication between the driver node and the executors:

function configureSSH() {
# check if driver node
IFS=" "
read -ra temp <<< "$_driverNode"
_driver_machine=( ${temp[1]} )
_all_machines="$_driver_machine\n"

if [ "$_driver_machine" = "$_machine" ]; then
# generate key pairs (passwordless)
sudo -u hduser rm -f ~/.ssh/id_rsa
sudo -u hduser ssh-keygen -t rsa -P "" -f ~/.ssh/id_rsa

IFS="\n"
read -ra temp <<< "$_executors"
for executor in ${temp[@]}; do
# skip if empty line
if [[ -z "${executor}" ]]; then
continue
fi

# split on space
IFS=" "
read -ra temp_inner <<< "$executor"
echo
echo "Trying to connect to ${temp_inner[1]}"

cat ~/.ssh/id_rsa.pub | ssh "hduser"@"${temp_inner[1]}" 'mkdir -p .ssh && cat >> .ssh/authorized_keys'

_all_machines=$_all_machines"${temp_inner[1]}\n"
done
fi

echo "Finishing up the SSH configuration"
}

Inside this function, we first check if we are on the driver node, as defined in the hosts.txt file, as we only need to perform these tasks on the driver. The read -ra temp <<< "$_driverNode" command reads the _driverNode (in our case, it is 192.168.1.160  pathfinder), and splits it at the space character (remember what IFS stands for?). The -a switch instructs the read method to store the split _driverNode string in the temp array and the -r parameter makes sure that the backslash does not act as an escape character. We store the name of the driver in the _driver_machine variable and append it to the _all_machines string (we will use this later).

If we are executing this script on the driver machine, the first thing we must do is remove the old SSH key (using the rm function with the -f, force switch) and create a new one. The sudo -u hduser switch allows us to perform these actions as the hduser (instead of the root user).

When we submit the script to run from our local machine, we start an SSH session as a root on the remote machine. You will see how this is done shortly, so take our word on that for now. 

We will use the ssh-keygen method to create the SSH key pair. The -t switch allows us to select the encryption algorithm (we are using RSA encryption), the -P switch determines the password to use (we want this passwordless, so we choose ""), and the -f parameter specifies the filename for storing the keys.

Next, we loop through all the executors: we need to append the contents of ~/.ssh/id_rsa.pub to their ~/.ssh/authorized_keys files. We split the _executors at the "\n" character and loop through all of them. To deliver the contents of the id_rsa.pub file to the executors, we use the cat tool to print out the contents of the id_rsa.pub file and then pipe it to the ssh tool. The first parameter we pass to the ssh is the username and the hostname we want to connect to. Next, we pass the commands we want to execute on the remote machine. First, we attempt to create the .ssh folder if one does not exist. This is followed by outputting the id_rsa.pub file to .ssh/authorized_keys.

Following the SSH session's configurations on the cluster, we download the Spark binaries, unpack them, and move them to _spark_destination.

We have outlined these steps in the Installing Spark from sources and Installing Spark from binaries sections, so we recommend that you check them out.

Finally, we need to set two Spark configuration files: the spark-env.sh and the slaves files:

function updateSparkConfig() {
cd $_spark_destination/conf

sudo -u hduser cp spark-env.sh.template spark-env.sh
echo "export JAVA_HOME=$_java_destination" >> spark-env.sh
echo "export SPARK_WORKER_CORES=12" >> spark-env.sh

sudo -u hduser cp slaves.template slaves
printf "$_all_machines" >> slaves
}

We need to append the JAVA_HOME variable to spark-env.sh so that Spark can find the necessary libraries. We must also specify the number of cores per worker to be 12; this goal is attained by setting the SPARK_WORKER_CORES variable.

You might want to tune the SPARK_WORKER_CORES value to your needs. Check this spreadsheet for help: http://c2fo.io/img/apache-spark-config-cheatsheet/C2FO-Spark-Config-Cheatsheet.xlsx (which is available from here: http://c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/).

Next, we have to output the hostnames of all the machines in our cluster to the slaves file.

In order to execute the script on the remote machine, and since we need to run it in an elevated mode (as root using sudo), we need to encrypt the script before we send it over the wire. An example of how this is done is as follows (from macOS to remote Linux):

ssh -tq hduser@pathfinder "echo $(base64 -i installOnRemote.sh) | base64 -d | sudo bash"

Or from Linux to remote Linux:

ssh -tq hduser@pathfinder "echo $(base64 -w0 installOnRemote.sh) | base64 -d | sudo bash"

The preceding script uses the base64 encryption tool to encrypt the installOnRemote.sh script before pushing it over to the remote. Once on the remote, we once again use base64 to decrypt the script (the -d switch) and run it as root (via sudo). Note that in order to run this type of script, we also pass the -tq switch to the ssh tool; the -t option forces a pseudo Terminal allocation so that we can execute arbitrary screen-based scripts on the remote machine, and the -q option quiets all the messages but those from our script. 

Assuming all goes well, once the script finishes executing on all your machines, Spark has been successfully installed and configured on your cluster. However, before you can use Spark, you need either to close the connection to your driver and SSH to it again, or type:

source ~/.bashrc

This is so that the newly created environment variables are available, and your PATH is updated.

To start your cluster, you can type:

start-all.sh

And all the machines in the cluster should be coming to life and be recognized by Spark.

In order to check if everything started properly, type:

jps

And it should return something similar to the following (in our case, we had three machines in our cluster):

40334 Master
41297 Worker
41058 Worker

See also

Here's a list of useful links that might help you to go through with this recipe:

lock icon The rest of the chapter is locked
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image