Big Data Analytics

Distributed Computing, ETL Tools, Big Data Architecture

Some notes on hadoop cluster

One way Passwordless SSH from Master to worker nodes:

1. Generate Key: ssh-keygen -t rsa
2. Create folder in worker node: ssh [user]@255.255.255.255 mkdir -p .ssh
3. Copy key to worker node: ssh-copy-id id_rsa.pub [user]@255.255.255.255
4. Enable permissions to worker: ssh [user]@255.255.255.255 "chmod 700 .ssh; chmod 640 .ssh/authorized_keys"

700 — user can read, write and execute. group and others have no permissions
640 — user can read and write. group can read. others have no permissions

Configuration Files: (For minimal configuration)

core-site.xml configuration should be the same for master and all worker nodes. Namenode URL (fs.default.name) should point to master node only.
mapred-site.xml should be edited only on master node.
yarn-site.xml should be the same configuration on master and worker nodes.
the file “slaves” should be updated only in the master node.
hdfs-site.xml is self explanatory

Disable IPV6:

IPV4 – Internet Protocol Version 4 — IP address follows this pattern:
ipv4.png

IPV6 – Internet Protocol Version 6 — IP address follows this pattern:
ipv6.png

Hadoop works/communicates on IPV4 within its cluster. It does not support IPV6 at the moment.

Advertisements

developing text processing data products: part I

Folks, this is going to be a series of information pieces (more than one blog post about same topic) about text processing. In this series, if intend to discuss some of my experiences and also take this moment to organize the discussion on my blog. In the past, if touched upon some of the text processing recipes purely in an application point of view; however, if have spent more time in my career work with text content in automation and analytics and if owe it to myself to write more on text processing. Without further ado, lets jump into the this series.

Text processing is extracting information that could be used for decision making purposes from a text say like a book/paper/article — WIHTOUT HAVING TO READ IT. I use to read the newspaper to find out about weather forecast (five years ago) but these days if we need information about something we type it in search engine to get results (and ads). But imagine someone need information on day to day basis for decision making purposes, where he/she needs to read many articles or news or books in short period of time. Information is power but timing is important too. Hence the need for text processing data products. 

Some basic things that we can do with text data:

Crawling: extract data/content from a website using a crawlier, which is used to extract the data.

Tokenization: process of splitting a string into tiny objects in an array, based on the space between words.

Stemming: word reduction to a smaller word, like a root of word only to use it to search for all of its variations.

Stop word removal: straight forward, to remove frequently occurring words like a, an, the.

Parsing: process of breaking down a sentence into a set of phrases and further breakdown each phrase into noun, verb, adjective, etc… Also called parts of speech tagging. Cool topic.

These are just my experiences and my knowledge, and as always I try to write in a way that anyone can read and understand. More later.

Setting up Apache Spark: Part II

Now that we have Hadoop YARN, Scala and other pre-requistes setup, we are now ready to install apache spark. If you are visiting this blog post for the first time, please do have a look at the earlier post Setting up Apache Spark: quick look at the Hadoop, Scala and SBT

You definitely need maven, sbt, scala, java, ssh and git (though git is optional). Have git incase you want to fork — didnt work out for me. I am not using my multi node cluster workspace to perform this demonstration so not much of screenshots as of now. However, I am sure these instructions will work fine.

To begin, lets download the appropriate spark binaries (select the one that corresponds to your hadoop installation from the spark site):

wget http://www.apache.org/dyn/closer.cgi/spark/spark-1.1.0/spark-1.1.0-bin-hadoop2.4.tgz

Assuming we have sbt, setup without any problems,

sbt/sbt package

followed by

sbt/sbt clean compile

Now, lets build spark using maven. Allocate desired amount of physical memory based on the capacity of the machine that you are working with and enter the following:

export MAVEN_OPTS="-Xmx1300M -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package

now, it may not be necessary to do a clean but I suggest

sbt clean

followed by

sbt/sbt assembly

This takes time. Once this is over,

sbin/start-all.sh

and finally,

./bin/spark-shell

Voila! the following screen will show up with scala command line. And now folks, lets learn Scala!

spark

Setting up Apache Spark: Quick look at Hadoop, Scala and SBT

We are going to look at installing spark on a Hadoop. Lets try to setup hadoop yarn here once again with screenshots from scratch, as i received some comments that my installation needs more screenshots so i am doing one with screenshots. In this post, we will look at creating a new user account on Ubuntu 14.04 and installing Hadoop 2.5.x stable version.

To create new user,

sudo passwd

enter your admin password to set up your root passwd

sudo adduser <new user-name>

enter the details

now providing the root access to the new user

sudo visudo

add the line

new user-name ALL = (ALL:ALL) ALL

if you want to delete new user then

sudo deluser <new user-name> from account with sudo privileges ( not guest)

For java:

Oracle jdk is the official. to install oracle-java 8, add oracle -8 to your packet manager repository and then do an update. install only after these steps are completed.

sudo apt-get install python-software-properties

sudo add-apt-repository ppa:webupd8team/java

sudo apt-get update

sudo apt-get install oracle-java8-installer

Quickest way to setup java home

sudo update-alternatives --config java

copy the path of java 8 till java-8-oracle. for instance

/usr/lib/jvm/java-8-oracle

sudo nano /etc/environment

add

export JAVA_HOME = "/usr/lib/jvm/java-8-oracle"

source /etc/environment

if you echo, you will see the path.

Setting up passwordless ssh:

Look up my previous posts on ssh introduction. we will just directly jump into passwordless ssh with screenshots

Generate the key pair
ssh1

Create a folder in localhost and permanently add those keys generated to the localhost

ssh3

Thats it. You are done.

Install hadoop 2.5 stable version:
wget http://your-path-to-hadoop-2.5x-version/hadoop-2.4.1.tar.gz
tar xzvf hadoop-2.4.1.tar.gz

mv hadoop-2.4.1 hadoop

Create HDFS directory inside hadoop folder:

mkdir -p data/namenode
mkdir -p data/datanode

You should have these:

hadoop hdfs directory

go to hadoop-env.sh and update the java home path, hadoop_opts, hadoop_common_lib_native_dir. it is in etc/hadoop

 

hadoop-env

Edit core-site.xml and add the following:

core-site

create a file called “mapred-site.xml” and add the following:

mapred-site

Edit hdfs-site.xml and add the following:

hdfs-site

Edit yarn-site.xml and add the following:

yarn-site

Now, when you run the start-yarn/start-dfs files under sbin, you will get the following screen:

hadoop-complete

Install spark:
Obtain the latest version of Spark from http://spark-project.org/download. To interact with Hadoop Distributed File System (HDFS), you need to use a Spark version that is built against the same version of Hadoop as your cluster. Go to http://spark.apache.org/downloads.html and choose the package type: prebuilt for hadoop-2.4 and download spark. Note that Spark 1.1.0 uses scala 2.10.x. So we need to install scala.

Lets install Scala:

wget http://www.scala-lang.org/files/archive/scala-2.9.3.tgz
tar -xvf scala-2.9.3.tgz
cd scala-2.9.3
pwd
to get the path

You will probably want to add these to your .bashrc file or equivalent:

export SCALA_HOME=`pwd`
export PATH=`pwd`/bin:$PATH

scala1

We also need something called sbt. Sbt stands for simple build tool but to me it seems to be more complicated than Maven. You can still use maven to build however, I would suggest to get acquainted with sbt, if you are interested in exploring Scala in general.

More on the next post.

Apache Spark: data processing engine for cluster computing

May I present Apache Spark, another apache licensed top-level project that could perform large scale data processing way faster than Hadoop (I am referring to MR1.0 here). It is possible due to Resilient Distributed Datasets concept that is behind this fast data processing. RDD is basically a collection of objects, spraed across a cluster stored in ram or disk, automatically rebuilt on failure. It is purpose is to support higher-level, parallel operations on data as straightforward as possible.

Apache Spark is often referred to as data processing engine. Simply put, Spark is cluster computing engine that made it easy to handle a wide range of workloads: ETL, SQL-like queries, machine learning and streaming. The amount of code you write is also minimized to a great extent compared to traditional mapreduce development.

The Spark engine has four major components.

  1. SparkSQL: can query structured data, connect using JDBC drivers to import data, work with existing data warehouse (Hive)
  2. Spark MLib: scalable machine learning library (like that of Mahout but faster) that could run forecasting classification clustering recommendations, etc…
  3. GraphX: an API, used for graph computations, ETL tasks, exploratory data analysis
  4. Streaming: similar to Storm, can be used to build real-time streaming applications (analyze twitter feeds real-time as it is posted, etc…)

To install Spark, we need the following in the OS (Mac/Debian):

  • Java
  • Scala
  • Maven
  • Git

We will cover the installation tomorrow as I do would like to offer some more information on Scala. Keep watching this space for more Apache Spark!

Hadoop 2: YARN — Overview and Setup

We have already seen MapReduce. Now, lets dig deep into the new data processing model of Hadoop. Hadoop comes with a advanced resource management tool called YARN, and packaged as Hadoop-2.x.

What is YARN?

YARN stands for Yet Another Resource Navigator. YARN is also known as Hadoop Data Operating System — YARN enables data processing models beyond traditional mapreduce, such as Storm (real time streaming), Solr (searching) and interactive systems (Apache Tez).

Why YARN?

Hadoop committers decided to split up resource management and job scheduling into separate daemons to overcome some deficiencies in the original MapReduce. In traditional MR terms, this is basically splitting up jobtracker to provide flexibility and improve performance.

YARN splits the functionality of a JobTracker into two separate daemons:

A global Resource Manager (RM) that consists of a Scheduler and an Applications Manager.
An Application Master (AM) that provides support for a specific application. It runs on every node and controls execution of applications. (Here application could mean a single MR job or a Directed Acyclic Graph (DAG) of jobs).

Caveat:

YARN, also referred to as MapReduce 2 is not an improvment over traditional MapReduce data processing model. It merely provides a resource management model that executes MapReduce jobs.

How to deploy Hadoop-2.x YARN?

We will look at how to deploy YARN in a single node cluster setup. The following are system requirements:

Java 6 or above
SSH

Download Hadoop-2.2.0:

wget http://your download link here/hadoop-2.2.0.tar.gz

unpack the tarball:

tar -zxvf hadoop-2.2.0.tar.gz

Create HDFS directory inside Hadoop-2.2.0 folder:

mkdir -p /data/namenode
mkdir -p /data/datanode

Rename hadoop-2.2.0 to hadoop but I wouldn’t suggest that.

Go to Hadoop-2.2.0/etc/hadoop/hadoop-env.sh and copy/paste the following:

export JAVA_HOME= < your java path here >
<your path for lib folder in Hadoop-2.2.0> type pwd from lib and paste it below.
export HADOOP_COMMON_LIB_NATIVE_DIR="/home/hduser/hadoop-2.2.0/lib"
export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=/home/hduser/hadoop-2.2.0/lib"

Similar to Hadoop 1 (traditional hadoop), make changes in core-site, mapred-site, yarn-site, hdfs-site:

In core-site.xml, copy/paste:

<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>

In hdfs-site.xml, copy/paste:

<property>
<name>dfs.replication</name> <value>1</value>
</property>

<property>
<name>dfs.namenode.name.dir</name>
<value> <path to your hadoop> /hadoop/data/namenode</value>
</property>

<property>
<name>dfs.datanode.data.dir</name>
<value> <path to your hadoop> /hadoop/data/datanode</value>
</property>

In mapred-site.xml, copy/paste: (create mapred-site.xml by vim mapred-site.xml , if file is not available)

<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

In yarn-site.xml, copy/paste:

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>

Either update your bashrc file or create yarnconfig.sh and paste the following:

export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk <your java path here>
export HADOOP_HOME=/home/hduser/hadoop-2.2.0 <your hadoop path here>
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME

To run,

source ~/.bashrc or . yarnconfig.sh

Before you run YARN, go to Hadoop-2.2.0 folder and type to format the namenode

bin/hdfs namenode -format

To start Hadoop:

sbin/start-dfs.sh
sbin/start-yarn.sh

You can check whether everthing went well by checking jps. Ideally you should have the following:

8144 ResourceManager
7722 DataNode
7960 SecondaryNameNode
8333 NodeManager
9636 Jps
7539 NameNode

Note: The PID might be different but the processes are the same.

Access the UI for:

Try some examples in your new YARN:

bin/hadoop jar /home/hduser/hadoop-2.2.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar pi 4 15 

Book Review: Optimizing Hadoop for MapReduce

5655OS_Optimizing Mapreduce.jpgI had a chance to review another book titled “Optimizing Hadoop for MapReduce” and must say this book is an good resource for devops professionals who build MapReduce programs in Hadoop. The book is well organized — starts off with introducing basic concepts, identifying system bottlenecks and resource weaknesses, suggesting ways to fix and optimize them, followed by Hadoop best practices and recommendations. Though packed with advanced concepts and information on Hadoop architecture, the author writing is such that it could appeal to all types of audience (from novice to expert) with helpful hints on each chapter.

The first chapter on map reduce is written for people who are new to this paradigm. It contains pictorial representations on how the “low-level” MapReduce works. It’s easier to misunderstand the low-level MapReduce process and this chapter will clarify that.

The second chapter discusses performance tuning parameters — allocating map/reduce tasks based on number of cores in the respective Hadoop cluster. It also suggests widely used cluster management tools such as Ambari, Chukwa, etc.

The third and fourth chapter discusses identifying system bottlenecks and resource weaknesses respectively. The author takes an organized approach by introducing performance tuning process cycle and demystifying how various major components of a given Hadoop cluster (CPU, RAM, Storage and network bandwidth) could cause a bottleneck and how to eliminate them. Especially in the fourth chapter, I particularly liked the idea of discussing formulas that could be used as part of planning the Hadoop cluster and demonstrated using examples.

The remaining three chapters focus on enhancing and optimizing the Map/Reduce tasks and best practices and recommendations. The author introduces performance metrics for Map/Reduce tasks and suggests ways to enhance the map/reduce tasks and fine-tuning parameters to improve performance of a MapReduce job. The final chapter on Best practices is packed with valuable information on hardware tuning for optimal performance of the Hadoop cluster and Hadoop best practices.

Few minor points here and there should be read with caution. For instance, the author says each slave is called a task tracker in the first chapter — could have been better by saying it assumes the responsibilities of task tracker while in general it is actually called a data node. That is just my suggestion. In short, this book is a compilation of all the MapReduce performance related issues and ideas on troubleshooting and optimizing the performance of the same including best practices. Must have book especially for hadoop administrators and developers. This book is available at packtpub

Big Data Logistics: data transfer using Apache Sqoop from RDBMS

Apache Sqoop is a connectivity tool to perform data transfer between Hadoop and traditional databases (RDBMS) which contains structured data. Using sqoop, one can import data to Hadoop Distributed File System from RDBMS like Oracle, Teradata, MySQL, etc… and also export the data from Hadoop to any RDBMS in form of CSV file or direct export to databases.

There is a possibility of writing mapreduce programs that would use jdbc connectors to export/import data from/to Hadoop but sqoop automates most of such process. Advantage here is operations are parallelized and fault tolerance.

One caveat though: user has to download the appropriate jdbc driver for the database. I will use MySQL so I would need MySQL jdbc driver. These drivers are developed already so not to worry. Do lookup cloudera, hortonworks for teradata, postgresql, oracle’s jdbc drivers.

JDBC driver, Java DataBase Connectivity is basically an API that lets you access data from relational databases and data warehouses. Each RDBMS provider has its own JDBC driver interface program.

One can import an entire table from RDBMS and specify a location where it needs to be stored in HDFS or can incorporate a query where only a subset of an entire table will be written in HDFS.

Installation:

Sqoop is very easy to install. If you are a Red Hat/CentOS user, do a yum install on sqoop.

$ wget http://name of the mirror/sqoop/1.4.4/sqoop-1.4.4.bin__hadoop-1.0.0.tar.gz

$ tar -zxvf sqoop-1.4.4.bin__hadoop-1.0.0.tar.gz

Add the SQOOP_HOME in your runtime configuration file.
export SQOOP_HOME=/data/sqoop-1.4.4
export PATH=$SQOOP_HOME/bin

Hadoop should be up and running.

Now, Download the Mysql connector jar from, http://dev.mysql.com/downloads/connector/j/, extract it, copy the mysql-connector-java-5.1.22-bin.jar into SQOOP_HOME/lib

I do not have MySQL-server so I am going to install it.

$ sudo apt-get install mysql-server

$ sudo service mysql status

it will show process running.

Now, sqoop tool has been setup. Say if I created a table called EmpRecordsData from a Database called FirstSQOOP with details of employees, salary, dept and wanted to import the table into HDFS, then we could enter,

sqoop import \
--connect jdbc:mysql://localhost/firstsqoop \
--username root \
--password "" \
--table employee

This would perform a mapreduce of importing data from mysql database using the mysql jdbc connector. You can also import a subset of existing table

sqoop import \
--connect jdbc:mysql://localhost/firstsqoop \
--username root \
--password "" \
--table employee \
--where "Dept = 'Accounting' && Salary >= "5000""

Or even query the RDBMS and only import what you want

sqoop import \
--connect jdbc:mysql://localhost/firstsqoop \
--username root \
--password "" \
--table employee
--query 'SELECT EmpRecordsData,
ContractRecordsData \
FROM ORC_Solutions \
JOIN EmpName USING EmpDept \
WHERE "EmpDept = 'Operations Research'"' \
--target-dir TargetHDFSDirectory

Now lets look at how we can export data from Hadoop/HDFS

sqoop export \
--connect jdbc:mysql://localhost/firstsqoop \
--username root \
--password "" \
--table employee \
--export-dir EmployeeDepartment

You can update the existing database if you prefer as sqoop also provides such facility. Also, sqoop lets you import RDBMS data directly to hive. Here how thats done:

sqoop import \
--connect jdbc:mysql://localhost/firstsqoop \
--username root \
--password "" \
--table employee \
--hive-import

Real-time analytics using distributed computing system Storm: Part II

How to setup a Storm cluster ?
We will look at how to set up a single node cluster of storm project. The following are the prerequisites for setting up:

  1. Java 6 or above
  2. Python 2.6
  3. Zookeeper
  4. ZeroMQ
  5. JZMQ
  6. any other dependencies (unzip, git, etc…)

Zookeeper: Apache zookeeper project gives you a set of tools to build distributed applications that can safely handle partial failures in distributed computing.
ZeroMQ: ZeroMQ is a messaging library, which allows you to design a complex communication system without much effort. It is not a complete messaging system. Instead, it allows you to quickly build your own messaging system.
JZMQ: Java binding for ZeroMQ

 

Step 1: Configure Zookeeper

wget http://psg.mtu.edu/pub/apache/zookeeper/stable/zookeeper-3.4.5.tar.gz

tar -xzvf zookeeper-3.4.5.tar.gz

cd zookeeper/conf

create a new cfg file with following information:

vim zoo.cfg

tickTime=2000
dataDir=/var/zookeeper
clientPort=2181

bin/zkServer.sh start

Note: if it says failed to write pid, then change the dataDir to someplace else where root permission is not required.

Step 2a: Configure Messaging library

wget http://download.zeromq.org/zeromq-2.1.7.tar.gz

tar -xzvf zeromq-2.1.7.tar.gz

cd zeromq-2.1.7

./configure

make

sudo make install

 

Step 2b: JZMQ

git clone https://github.com/nathanmarz/jzmq.git

cd jzmq

./autogen.sh

./configure

make

sudo make install

Note: if git is not available then,

sudo apt-get update

sudo apt-get install git-core

Java, Python 2.6 should be in up and running. JAVA_HOME should be set.

 

Step 3: Install Storm

wget https://dl.dropboxusercontent.com/s/dj86w8ojecgsam7/storm-0.9.0.1.zip

unzip storm-0.9.0.1.zip -d /working directory path

cd into storm/conf and modify the storm.yaml with following. Note: We are setting up single server node and reference to localhost being made here.
storm.zookeeper.servers:
- "localhost"
storm.zookeeper.port: 2181
nimbus.host: "localhost"
storm.local.dir: "/var/stormtmp"
java.library.path: "/usr/lib/jvm/java-1.7.0-openjdk-i386"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"
For a multi node cluster you may have to fill out additional information with respect to masters and slaves.

and now,

bin/storm nimbus for master
bin/storm supervisor for slave
bin/storm ui
to get nice ui on your browser.

Use http://localhost:8080 to see the UI

Real-time analytics using distributed computing system Storm: Part I

We know that using data we can produce analytics and create reports that would help us in decision-making, the usual story. The constraint here is in order to create analytics reports right now, data should have been collected, cleansed and pre-processed earlier. Only after spending considerable amount of time doing these tasks, we can use framework like Hadoop and its Ecosystem tools to analyze big data. What if I tell you that you can get your analytics in a matter of few minutes?

This is called Real-time analytics. For example:

In financial engineering, we have something called algorithmic trading. Imagine you have a computer system embedded with an AI based program that would place trading orders from pre-programmed trading instructions based on timing, price, quantity, etc… without a human stock broker involvement.

In Revenue Management, there is dynamic pricing. Imagine a flight ticket that you view online keeps changing its price every few hours, based on demand of that route. That’s near real-time analytics in picture.

There are many such use cases available where quicker availability of data would help make better decisions. Now there is a tool that performs real-time analytics in distributed computing where the task is distributed between multiple machines. I present you, Real time Analytics tool, “Storm” .

What is Storm?

Storm is a distributed and fault-tolerant system for processing streams of real-time data. Just like Hadoop’s job tracker and task tracker, here the work is delegated to different components that are each responsible for a task.

A spout is a component that handles the input stream. A bolt is another component that either persists the data in some storage or performs some tasks to the stream of data and passes the transformed data to some other bolt — can have more than one bolt in the Storm cluster. The arrangement of spouts and bolts and their connection is called a topology. (Just like mapreduce in Hadoop)

Why use Storm?

Storm lowers the complexity for doing real-time processing. Computations are done in parallel using multiple threads, processes and servers. You can use any programming language on top of storm. It is also fast as the system is designed such the messages are processed quickly using ØMQ.

How does Storm work?

Just like Hadoop cluster, there are two kind of nodes: Master node and Slave node. Master runs a daemon process called Nimbus that distributes code around the cluster, assigns tasks to each worker node and monitors failures. As mentioned earlier, Storm is a fault-tolerant system: meaning, when a slave fails, Nimbus will re-assign the tasks of failed slave as necessary. Slaves run a daemon process called Supervisor that executes portion of topology (set of spouts and bolts and their connections)

Storm can run on local mode (for testing the topology) and remote mode(production). Can be set up as single server setup or as multi-server cluster of nodes and masters. The user can design a topology (set of spouts and bolts). Basically these spouts and bolts are just java programs where you specify which bolt/spout should perform what operation. You can also write spouts and bolts using Python or Ruby using multilang protocol. Now this protocol uses standard input/output communication channel that performs the job of spouts/bolts. Messages are encoded either JSON or plain text through this channel.

How to develop real-time applications in Apache Storm?

At a high level, we have to:

0. Setup Storm cluster in your machine
1. Design the topology of bolts and spouts and their connections between them(just like graph theory of nodes and arcs)
2. Write program for each of bolt and spout to instruct what it should do upon receiving the streams of data
3. Pack your code and dependencies into a single jar file and run them