big data analytics

Setting up Apache Spark: Part II

Now that we have Hadoop YARN, Scala and other pre-requistes setup, we are now ready to install apache spark. If you are visiting this blog post for the first time, please do have a look at the earlier post Setting up Apache Spark: quick look at the Hadoop, Scala and SBT

You definitely need maven, sbt, scala, java, ssh and git (though git is optional). Have git incase you want to fork — didnt work out for me. I am not using my multi node cluster workspace to perform this demonstration so not much of screenshots as of now. However, I am sure these instructions will work fine.

To begin, lets download the appropriate spark binaries (select the one that corresponds to your hadoop installation from the spark site):

wget http://www.apache.org/dyn/closer.cgi/spark/spark-1.1.0/spark-1.1.0-bin-hadoop2.4.tgz

Assuming we have sbt, setup without any problems,

sbt/sbt package

followed by

sbt/sbt clean compile

Now, lets build spark using maven. Allocate desired amount of physical memory based on the capacity of the machine that you are working with and enter the following:

export MAVEN_OPTS="-Xmx1300M -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
mvn -Phadoop2-yarn -Dhadoop.version=2.0.5-alpha -Dyarn.version=2.0.5-alpha -DskipTests clean package

now, it may not be necessary to do a clean but I suggest

sbt clean

followed by

sbt/sbt assembly

This takes time. Once this is over,

sbin/start-all.sh

and finally,

./bin/spark-shell

Voila! the following screen will show up with scala command line. And now folks, lets learn Scala!

spark

Real-time analytics using distributed computing system Storm: Part II

How to setup a Storm cluster ?
We will look at how to set up a single node cluster of storm project. The following are the prerequisites for setting up:

  1. Java 6 or above
  2. Python 2.6
  3. Zookeeper
  4. ZeroMQ
  5. JZMQ
  6. any other dependencies (unzip, git, etc…)

Zookeeper: Apache zookeeper project gives you a set of tools to build distributed applications that can safely handle partial failures in distributed computing.
ZeroMQ: ZeroMQ is a messaging library, which allows you to design a complex communication system without much effort. It is not a complete messaging system. Instead, it allows you to quickly build your own messaging system.
JZMQ: Java binding for ZeroMQ

 

Step 1: Configure Zookeeper

wget http://psg.mtu.edu/pub/apache/zookeeper/stable/zookeeper-3.4.5.tar.gz

tar -xzvf zookeeper-3.4.5.tar.gz

cd zookeeper/conf

create a new cfg file with following information:

vim zoo.cfg

tickTime=2000
dataDir=/var/zookeeper
clientPort=2181

bin/zkServer.sh start

Note: if it says failed to write pid, then change the dataDir to someplace else where root permission is not required.

Step 2a: Configure Messaging library

wget http://download.zeromq.org/zeromq-2.1.7.tar.gz

tar -xzvf zeromq-2.1.7.tar.gz

cd zeromq-2.1.7

./configure

make

sudo make install

 

Step 2b: JZMQ

git clone https://github.com/nathanmarz/jzmq.git

cd jzmq

./autogen.sh

./configure

make

sudo make install

Note: if git is not available then,

sudo apt-get update

sudo apt-get install git-core

Java, Python 2.6 should be in up and running. JAVA_HOME should be set.

 

Step 3: Install Storm

wget https://dl.dropboxusercontent.com/s/dj86w8ojecgsam7/storm-0.9.0.1.zip

unzip storm-0.9.0.1.zip -d /working directory path

cd into storm/conf and modify the storm.yaml with following. Note: We are setting up single server node and reference to localhost being made here.
storm.zookeeper.servers:
- "localhost"
storm.zookeeper.port: 2181
nimbus.host: "localhost"
storm.local.dir: "/var/stormtmp"
java.library.path: "/usr/lib/jvm/java-1.7.0-openjdk-i386"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"
For a multi node cluster you may have to fill out additional information with respect to masters and slaves.

and now,

bin/storm nimbus for master
bin/storm supervisor for slave
bin/storm ui
to get nice ui on your browser.

Use http://localhost:8080 to see the UI

Real-time analytics using distributed computing system Storm: Part I

We know that using data we can produce analytics and create reports that would help us in decision-making, the usual story. The constraint here is in order to create analytics reports right now, data should have been collected, cleansed and pre-processed earlier. Only after spending considerable amount of time doing these tasks, we can use framework like Hadoop and its Ecosystem tools to analyze big data. What if I tell you that you can get your analytics in a matter of few minutes?

This is called Real-time analytics. For example:

In financial engineering, we have something called algorithmic trading. Imagine you have a computer system embedded with an AI based program that would place trading orders from pre-programmed trading instructions based on timing, price, quantity, etc… without a human stock broker involvement.

In Revenue Management, there is dynamic pricing. Imagine a flight ticket that you view online keeps changing its price every few hours, based on demand of that route. That’s near real-time analytics in picture.

There are many such use cases available where quicker availability of data would help make better decisions. Now there is a tool that performs real-time analytics in distributed computing where the task is distributed between multiple machines. I present you, Real time Analytics tool, “Storm” .

What is Storm?

Storm is a distributed and fault-tolerant system for processing streams of real-time data. Just like Hadoop’s job tracker and task tracker, here the work is delegated to different components that are each responsible for a task.

A spout is a component that handles the input stream. A bolt is another component that either persists the data in some storage or performs some tasks to the stream of data and passes the transformed data to some other bolt — can have more than one bolt in the Storm cluster. The arrangement of spouts and bolts and their connection is called a topology. (Just like mapreduce in Hadoop)

Why use Storm?

Storm lowers the complexity for doing real-time processing. Computations are done in parallel using multiple threads, processes and servers. You can use any programming language on top of storm. It is also fast as the system is designed such the messages are processed quickly using ØMQ.

How does Storm work?

Just like Hadoop cluster, there are two kind of nodes: Master node and Slave node. Master runs a daemon process called Nimbus that distributes code around the cluster, assigns tasks to each worker node and monitors failures. As mentioned earlier, Storm is a fault-tolerant system: meaning, when a slave fails, Nimbus will re-assign the tasks of failed slave as necessary. Slaves run a daemon process called Supervisor that executes portion of topology (set of spouts and bolts and their connections)

Storm can run on local mode (for testing the topology) and remote mode(production). Can be set up as single server setup or as multi-server cluster of nodes and masters. The user can design a topology (set of spouts and bolts). Basically these spouts and bolts are just java programs where you specify which bolt/spout should perform what operation. You can also write spouts and bolts using Python or Ruby using multilang protocol. Now this protocol uses standard input/output communication channel that performs the job of spouts/bolts. Messages are encoded either JSON or plain text through this channel.

How to develop real-time applications in Apache Storm?

At a high level, we have to:

0. Setup Storm cluster in your machine
1. Design the topology of bolts and spouts and their connections between them(just like graph theory of nodes and arcs)
2. Write program for each of bolt and spout to instruct what it should do upon receiving the streams of data
3. Pack your code and dependencies into a single jar file and run them

Market Basket Analysis with Mahout

Also known as Affinity Analysis/Frequent Pattern Mining.
Finding patterns in huge amounts of customer transactional data is called market basket analysis. This is useful where store’s transactional data is readily available. Using market basket analysis, one can find purchasing patterns. Market basket analysis is also called associative rule mining (actually its otherway around) or affinity analysis or frequent pattern mining. This technique is behind all customer promotional offers like buy 1 get 1 free, discounts, complimentary products, etc… that we see in the deparmental stores/supermarket chains.

MBA is one of the ways of recommending products to the customer. If we have customer transaction data, data where the number of items bought by each customer is available (the receipt that we get for buying a product) and imagine we have a million transaction records like this, then we can find out buying patterns. For example, lets assume the lifestyle of people at a certain locality eat chips while drinking beer. Then whoever comes into supermarket with sole purpose of buying beer, could most likely to pick up a packet of chips (crisps if you are from the UK) — now this is a pattern we know from local knowledge.

But what about other patterns that we don’t know/we don’t speak about? MBA helps us to find out such patterns.  Why do we need such pattern information? We can use this information for purchase planning, introducing new products (not just on MBA results based also with the help of statistical hypothesis/inference)

Lets perform market basket analysis using Apache Mahout. You should have Apache Mahout and Hadoop installed, up and running.

For the input dataset, I found an anonymous Belgian supermarket transaction data. This data is available thanks to the courtesy of Dr Tom Briggs from University of Hasselt, Belgium. Please visit http://fimi.ua.ac.be/data/retail.pdf and http://www.luc.ac.be/~brijs/. We will use the retail.dat as input dataset. Each record in the data set contains information about the date of purchase, the receipt number, the article number, the number of items purchased, the article price in Belgian Francs and the customer number.

The data are collected over three non-consecutive periods. The first period runs from half December 1999 to half January 2000. The second period runs from 2000 to the beginning of June 2000. The third and final period runs from the end of August 2000 to the end of November 2000. In between these periods, no data is available, unfortunately. This results in approximately 5 months of data. The total amount of receipts being collected equals 88,163. 5,133 customers have purchased at least one product in the supermarket during the data collection period.

The following steps are required to perform the market basket analysis with Apache Mahout:

1. Copy the data from local disk to HDFS:
hadoop fs -copyFromLocal /home/path to your file/retail.dat retail.dat

2. Execute Mahout’s FPG procedure
mahout fpg -i retail.dat -o patternone -method mapreduce

Note: the number of top items with most frequent pattern is by default 50. Meaning, the output will provide you top 50 items.
you can also run this procedure in sequential mode, just make sure you have the data file in the directory where you execute this.

3. Check the output after processing. It will be in the folder ‘patternone’ in your HDFS.

hadoop fs -ls /user/hduser/patternone

4. Identify the output in form of sequence file. It will be under frequentpatterns folder. Use sequence dumper utility to extract the output.
mahout seqdumper -i hdfs://localhost:54310/user/hduser/patternone/frequentpatterns/part-r-00000

5. Learn to interpret the output. It will be likely to be in following key value pair:

FPM Output

Item is called the key here. ([99],32) means that item 99 seems to be appearing in 32 transactions. ([142, 99],22) means item 142 and 99 seems to be appearing in 22 transactions. We dont know for sure what those items are but in real life situations items will be indexed against its names so you can get output with name of the item.

For example item 99 could be beer. 32 means beer was bought by 32 customers. 142 and 99 appearing in 22 transactions meaning, people have bought beer and chips together in those 22 instances. May be the remaining people already have chips at home :).

This data can be used for further analysis to determine the nature of promotion that could be offered to customers.

Apache Hive: to query structured data from HDFS

If we want to talk about Hive, we need to understand the difference between Pig and Hive as people can easily confuse or ask why do we need this when we have that questions.

The Apache Hive software provides a SQL like querying dialect called Hive Query Language that can be used to query data that is stored in Hadoop cluster. Once again Hive eliminates the need for writing mappers and reducers and we can use the HQL to query the language. I would not go deep into why do we need to query. Look up SQL tutorials in search engine.

The importance of Hive can only be understood when we have the right kind of data for Hive to process: static data, data not changing, quick response time is not the priority. Hive is only a software on top of Hadoop framework. It is not database package. We are querying the data from HDFS and we will use Hive where the need for SQL like querying arises.

HCatalog stores the metadata from HQL. Just popped up in my mind as I write this post. We’ll look into it later. Hive is ideal for data warehousing applications, where data is just stored and mined when necessary for report generation, visualization, etc …

Following is the Apache definition of Hive:

The Apache Hive data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

Pig is a procedural data flow languages — the pig latin script, (think of python or perl). Hive is like a SQL querying language. Just like what they say, Pig can eat anything, which means Pig can take structured and unstructured data. Hive on the other hand can only process structured data. Data representation in Pig is through variables, whereas in Hive is through tables. Hive also supports UDF but it is more complex than Pig.

Installation:

Very similar to Pig

1. Download Apache Hive
$ wget http://www.eu.apache.org/dist/hive/hive-latest-version/hive-latest-version.tar.gz

2. Extract the Tarball

$ tar –zxvf hive-latest-version.tar.gz

3. Add the following to your previously created hadoop configuration bash script:

export HIVE_HOME=/data/hive-latest-version
export JAVA_HOME=/usr/java/jdk1.7.0_05
export HADOOP_HOME=/data/hadoop

export PATH=$HIVE_HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH
export CLASSPATH=$JAVA_HOME:/data/hadoop/hadoop-core-1.0.1.jar:$HIVE_HOME/hive-latest-version.jar

4. Run the configuration file

$ . hadoopconfiguration.sh

5. Now launch Hive,

$ hive

Hadoop should be live.

Apache Pig: for ETL functions, data cleansing and munging

Pig is one of the add-ons in the Hadoop framework that supports and helps executing mapreduce data processing in Hadoop. Yes, Hadoop does support mapreduce natively but Pig makes it easier so you dont have to write a complex java program defining mapper and reducer classes.

Pig includes a scripting language called the Pig Latin that provides many standard database operations that we normally do with data and also UDFs (User defined functions so the user can write his own query method to extract relevant data or munge the data). Data Munging is done using Pig, but not limited to Pig. (Look up Sed and Awk in google)

A common example for Pig Latin script is when web companies bringing in logs from their web servers, cleansing the data, and precomputing common aggregates before loading it into their data warehouse. In this case, the data is loaded onto the grid, and then Pig is used to clean out records from bots and records with corrupt data. It is also used to join web event data against user databases so that user cookies can be connected with known user information.

Installation:

Pig is not needed to be installed in our multi-node cluster. Only on the machine where you run in your Hadoop job, the Master. Remeber there can be more than one master for a multi-node cluster; pig can be installed on these machines too. Also, you can run Pig in localmode if you want to use it locally in your linux operating system.

1. Download Apache Pig

$ wget http://www.eu.apache.org/dist/pig/pig-latest-version/pig-latest-version.tar.gz

2. Extract the Tarball

$ tar –zxvf pig-latest-version.tar.gz

3. Add the following to your previously created hadoop configuration bash script:

export PIG_HOME=/data/pig-latest-version
export JAVA_HOME=/usr/java/jdk1.7.0_05
export HADOOP_HOME=/data/hadoop

export PATH=$PIG_HOME/bin:$JAVA_HOME/bin:$HADOOP_HOME/bin:$PATH
export CLASSPATH=$JAVA_HOME:/data/hadoop/hadoop-core-1.0.1.jar:$PIG_HOME/pig-latest-version.jar

4. Run the configuration file

$ . hadoopconfiguration.sh

5. Now launch Pig,

$ pig

You will see set of instructions followed by grunt shell like this

grunt>

Remember, this whole setup needs to be done in a Master/Masters in multi-node cluster, on single-node cluster where Hadoop framework is up and running.

Hadoop: multi-node cluster setup

In our previous post, we saw how to setup a single node cluster. Here, we look at setting up a multi node cluster from the existing single node setup. Multi-node cluster setup is very much similar to steps with slight modifications where reference to localhost needs to be replaced by the hostname of the respective datanode and few additional setup instructions.

From now on, we call single node as master and other nodes as slaves. (one master, many slaves) We will continue to add one datanode to master

Caveat:

  • The namenode (Master) and datanodes (Slaves) should contain the same linux distribution with same user name.
  • The Hadoop framework installed in Master and Slaves should be of the same version.
  • The Hadoop framework should be installed in same path (/user/nameofmasterorslave/Hadoop).
  • Java is required in Master and Slaves, same version of Java is recommended (>=6) — I have never experimented setting up different versions of java in an Hadoop multi-node cluster.

The Core steps:

  • Ensure master node on stop-sh.sh
  • Identify the hostname of all slaves (in terminal: $hostname -f — to identify hostname of any machine)
  • Update the configuration files in Hadoop where referenced to localhost, replace by result of “hostname -f” of MASTER NODE (core-site.xml, mapred-site.xml, masters,slaves)
  • Copy the configuration files from Master machine to Slave machines

scp conf/* hostname:/data/hadoop-1.0.1/conf/
example: cd hadoop
If you are using Amazon AWS: scp conf/* ip-10-239-40-120.ec2.internal:/data/hadoop/conf

  • Always clear hdfs directory in slave machine:

rm -rf /data/hdfstmp

  • In master, edit slavefile, add one more line <@hostname>
  • Start all the processes

bin/start-all.sh

  • if it is waiting for the first time while executing start-all.sh just say ‘yes’
  • To view all processes that are running on master node

master jps

  • To see the processes running on slave nodes

slaves jps

Note: Multinode cluster can have multiple secondary namenodes, While setting up change only the core-site and format namenode

Apache Hadoop — an efficient distributed computing framework

Apache Hadoop is an open source distributed computing framework that is used mainly for ETL tasks (read: Extract, Transform and Load) — storing and processing large scale of data on clusters of commodity hardware.

I agree distributed computing exists for more than a decde, but why has Hadoop gained so much publicity?

  • The entire distributed computing setup can be done in commodity hardware, which means you can setup hadoop in the computer that you are viewing this blog (I stand corrected if you are using your phone)
  • Open source and most important, apache licensed. That means you are most welcome to use this for industry, academia or research and there is no clause that you need to take permission from developer (I am using layman terms here)
  • The need for processing large amount of data has arrived and traditional computing may not be enough
  • HBase’s NoSQL archicteture supercedes Oracle, MySQL in features (later explained in another blog)
  • Built and Supported by Nerds across the globe (just like you!)

Sometimes I wonder worldpeace can only be achieved with Apache License like philosophy! 😀

Here’s a question: What is virtually one complete file system but stored physically acorss different nodes in cluster?

Distributed File System.

In our context, Hadoop Distributed File System. Provides High-through put. Born from Google File System and Google MapReduce.

Apache Hadoop is a Distributed Computing Framework with HDFS on its base and uses MapReduce paradigm to perform large scale processing.

MapReduce is nothing but splitting a giant tasks into many small chunks, send it to all nodes in the given cluster, do the necessary processing and assemble back the processed pieces into one. This whole task happens inside the HDFS in the background. For technical definition of MapReduce refer books. Map means split, Reduce means combine, thats all. Of course it is not that simple, but try to look at it with this perspective. This explanation would paint you a picture/help you visualize what would typically happen in background.

Now this does not mean we need to operate Hadoop at terminal level. There are tools that could be used within Hadoop fraemwork:

HBase: A scalable, distributed database that supports structured data storage for large tables.
Hive: A data warehouse infrastructure that provides data summarization and ad hoc querying.
Mahout: A Scalable machine learning and data mining library.
Pig: A high-level data-flow language and execution framework for parallel computation.
Spark: A fast and general compute engine for Hadoop data. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
ZooKeeper: A high-performance coordination service for distributed applications.

We will look at each of the above in detail with a use case.

Where are we using Hadoop?

Currently as far as I know, Hadoop framework is used in context of large-scale machine learning and data mining, web crawling and text processing, processing huge amounts of data in forms of relational/tabular data, soft analytics.