Month: February 2014

Real-time analytics using distributed computing system Storm: Part II

How to setup a Storm cluster ?
We will look at how to set up a single node cluster of storm project. The following are the prerequisites for setting up:

  1. Java 6 or above
  2. Python 2.6
  3. Zookeeper
  4. ZeroMQ
  5. JZMQ
  6. any other dependencies (unzip, git, etc…)

Zookeeper: Apache zookeeper project gives you a set of tools to build distributed applications that can safely handle partial failures in distributed computing.
ZeroMQ: ZeroMQ is a messaging library, which allows you to design a complex communication system without much effort. It is not a complete messaging system. Instead, it allows you to quickly build your own messaging system.
JZMQ: Java binding for ZeroMQ

 

Step 1: Configure Zookeeper

wget http://psg.mtu.edu/pub/apache/zookeeper/stable/zookeeper-3.4.5.tar.gz

tar -xzvf zookeeper-3.4.5.tar.gz

cd zookeeper/conf

create a new cfg file with following information:

vim zoo.cfg

tickTime=2000
dataDir=/var/zookeeper
clientPort=2181

bin/zkServer.sh start

Note: if it says failed to write pid, then change the dataDir to someplace else where root permission is not required.

Step 2a: Configure Messaging library

wget http://download.zeromq.org/zeromq-2.1.7.tar.gz

tar -xzvf zeromq-2.1.7.tar.gz

cd zeromq-2.1.7

./configure

make

sudo make install

 

Step 2b: JZMQ

git clone https://github.com/nathanmarz/jzmq.git

cd jzmq

./autogen.sh

./configure

make

sudo make install

Note: if git is not available then,

sudo apt-get update

sudo apt-get install git-core

Java, Python 2.6 should be in up and running. JAVA_HOME should be set.

 

Step 3: Install Storm

wget https://dl.dropboxusercontent.com/s/dj86w8ojecgsam7/storm-0.9.0.1.zip

unzip storm-0.9.0.1.zip -d /working directory path

cd into storm/conf and modify the storm.yaml with following. Note: We are setting up single server node and reference to localhost being made here.
storm.zookeeper.servers:
- "localhost"
storm.zookeeper.port: 2181
nimbus.host: "localhost"
storm.local.dir: "/var/stormtmp"
java.library.path: "/usr/lib/jvm/java-1.7.0-openjdk-i386"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
worker.childopts: "-Xmx768m"
nimbus.childopts: "-Xmx512m"
supervisor.childopts: "-Xmx256m"
For a multi node cluster you may have to fill out additional information with respect to masters and slaves.

and now,

bin/storm nimbus for master
bin/storm supervisor for slave
bin/storm ui
to get nice ui on your browser.

Use http://localhost:8080 to see the UI

Advertisements

Real-time analytics using distributed computing system Storm: Part I

We know that using data we can produce analytics and create reports that would help us in decision-making, the usual story. The constraint here is in order to create analytics reports right now, data should have been collected, cleansed and pre-processed earlier. Only after spending considerable amount of time doing these tasks, we can use framework like Hadoop and its Ecosystem tools to analyze big data. What if I tell you that you can get your analytics in a matter of few minutes?

This is called Real-time analytics. For example:

In financial engineering, we have something called algorithmic trading. Imagine you have a computer system embedded with an AI based program that would place trading orders from pre-programmed trading instructions based on timing, price, quantity, etc… without a human stock broker involvement.

In Revenue Management, there is dynamic pricing. Imagine a flight ticket that you view online keeps changing its price every few hours, based on demand of that route. That’s near real-time analytics in picture.

There are many such use cases available where quicker availability of data would help make better decisions. Now there is a tool that performs real-time analytics in distributed computing where the task is distributed between multiple machines. I present you, Real time Analytics tool, “Storm” .

What is Storm?

Storm is a distributed and fault-tolerant system for processing streams of real-time data. Just like Hadoop’s job tracker and task tracker, here the work is delegated to different components that are each responsible for a task.

A spout is a component that handles the input stream. A bolt is another component that either persists the data in some storage or performs some tasks to the stream of data and passes the transformed data to some other bolt — can have more than one bolt in the Storm cluster. The arrangement of spouts and bolts and their connection is called a topology. (Just like mapreduce in Hadoop)

Why use Storm?

Storm lowers the complexity for doing real-time processing. Computations are done in parallel using multiple threads, processes and servers. You can use any programming language on top of storm. It is also fast as the system is designed such the messages are processed quickly using ØMQ.

How does Storm work?

Just like Hadoop cluster, there are two kind of nodes: Master node and Slave node. Master runs a daemon process called Nimbus that distributes code around the cluster, assigns tasks to each worker node and monitors failures. As mentioned earlier, Storm is a fault-tolerant system: meaning, when a slave fails, Nimbus will re-assign the tasks of failed slave as necessary. Slaves run a daemon process called Supervisor that executes portion of topology (set of spouts and bolts and their connections)

Storm can run on local mode (for testing the topology) and remote mode(production). Can be set up as single server setup or as multi-server cluster of nodes and masters. The user can design a topology (set of spouts and bolts). Basically these spouts and bolts are just java programs where you specify which bolt/spout should perform what operation. You can also write spouts and bolts using Python or Ruby using multilang protocol. Now this protocol uses standard input/output communication channel that performs the job of spouts/bolts. Messages are encoded either JSON or plain text through this channel.

How to develop real-time applications in Apache Storm?

At a high level, we have to:

0. Setup Storm cluster in your machine
1. Design the topology of bolts and spouts and their connections between them(just like graph theory of nodes and arcs)
2. Write program for each of bolt and spout to instruct what it should do upon receiving the streams of data
3. Pack your code and dependencies into a single jar file and run them

Graph Database Neo4j: Part II

Why use Graph Database at all?

How do I know the person XXX? How to go to YY from XX? Where to have lunch? Where to shop? These are every day questions that could be solved using graph database like Neo4j. Powerful recommender engines are behind facebook, google that recommends items that so perfectly. In order to build such recommender engine, social routing we can use Neo4j. To highlight some key domains:

1. Logistics Industry: where short term planning problems are solved frequently. (using Dijksta’s Algorithm, Travelling Salesman Problem, read more network theory)
2. Energy Industry: where Electric circuits are designed and optimized for minimizing wastage (read more on kirchoff’s laws)
3. Financial Engineering: Where markov chains and Bayesian networks are utilized to solve problems (actually any domain that uses stochastic modeling techniques)
4. Social Network analysis: understanding local and global patterns of social networks and examine network dynamics of individuals who share similar interests.
5. Machine Learning: Building a recommender engine by collecting web analytics data and finding interference patterns of products, user preferences, services, etc…

Data Model Archicteure of Neo4j:

Neo4j’s data model is a property graph.

A Property-Graph consists of labeled nodes and relationships each with properties. Nodes as we saw earlier, are just data records, usually used for an label and also contain their relationships to other nodes. Relationships connect two nodes. They are also an explicit data record in the graph database. Think of them as containing shared information between two entities (direction, type, properties) and representing the connection to both nodes. Properties are simple key:value pairs. There is no schema, just structure.

So how are we suppose to use Graph Databases?

We can query the graph databases using GQL, the graph query language. Cypher is a declarative, SQL inspired language for describing patterns in graphs. It allows us to describe what we want to select, insert, update or delete from a graph database without specifying how to do it. For example, if we want to write a GQL query to understand the relationship between two nodes, it will be something like:

MATCH (node1)-[relationship:TYPE]->(node2)
RETURN relationship.property

Installing Neo4j on your linux machine: Installing in Windows could be self explanatory; just download executable file, click ‘ok’ where asked for your persmission to install and follow on-screen procedures. Lets look at linux:

I always prefer installing from terminal compared to rpms and make procedures. It is quite simple. Import the vendor’s signing key and create apt sources list file. Do an update and install the neo4j community edition.

1998 wget -O - http://debian.neo4j.org/neotechnology.gpg.key | apt-key add -
1999 echo 'deb http://debian.neo4j.org/repo stable/' > /etc/apt/sources.list.d/neo4j.list
2000 apt-get update
2001 apt-get install neo4j

Caveat: You need Oracle Java 7 installed and not open Java. neo4j doesn’t seem to support open Java.

cd /etc/neo4j/
sudo pico neo4j.properties
remove the comment the following:
1. #node_auto_indexing=true
2. #node_keys_indexable=name,age
save and exit. Now,
sudo /etc/init.d/neo4j-service restart

Graph Database Neo4j: Part I

What are graph databases? Liitle refresher on Graph Theory (Refer Wiley book on graphs)

A Graph Database stores data in a Graph, the most generic of data structures, capable of elegantly representing any kind of data in a highly accessible way. The records in a graph database are called Nodes. Nodes are connected through typed, directed arcs, called Relationships. Each node and relationship can have named attributes referred to as Properties. A Label is a name that organizes nodes into groups.

In Neo4j, a node can be simply represented as (a) or (), for an empty node. It is not mandatory that nodes should be connected. The real advantage is to build relationships between nodes, (arcs in graph theory).

A relationship can be represented as (a)-->(b)/(a)-->()

We can query the graph database to find out matching of patterns by defining relationships. for example, if we want to know with whom all node ‘x’ has relationships with, then we can query (x)-[r]->()

If node ‘x’ has many types of relationships like worked in this company, acted in the movie, etc… we can retreive that in our query by asking, (a)-[:WORKED_AT]->(c)

We can also query with labels. Imagine Label is like a node’s property. If multiple nodes have same property, then we can group them. For example, if we have many nodes with labels like man, woman, American, British, then we can group them as American women, British women, etc…

For example,

(a:man {citizen:"American"}) list of American men in our database

(a:woman {citizen:"British"})->[WORKED_AT]->(c:ORC Solutions)

list of British women worked at ORC Solutions Pvt Ltd (just an example)

So What is Neo4j really?

Neo4j is a one of the NoSQL databases whose data model is a Graph, specifically a Property Graph. Cypher is Neo4j’s graph query language (SQL for graphs or GQL). It is a declarative query language where we can describe what we are interested in, not how it is acquired. Supposedly very readable and expressive.

NoSQL Architecture: Part IV

In our last post, we looked at column store and document store categories. In this post we will see the remaining two, key-value store and graph store.

Key-value store: Similar to document store in the context of not defining the schema; however, Unlike a document store that can create a key when a new document is inserted, a key-value store requires the key to be specified and the key must be known in order to retrieve the value. Please not that querying is not possible — you will not get SQL/SQL like support. There will be no relationship between entities.

Confusing?

Think of dictionary data structure in Python.

Key-value is mainly implemented in in-memory distributed/cache storage. Hence, the advantages are fairly straightforward. Quick retrieval of data. Applications, which require real time responses would like to use an in memory database, perhaps application to control aircraft, in cases, where the quick response times are crucial.

Examples are Redis, MemcacheDB (built on Memcached), Berkley DB

Graph Store: Graph databases are special case of NoSQL. It is a databases that uses a graph structure to denote the relationship between entities. There can be multiple links between two entities in a graph, making it known that the multiple relationships that the two nodes share.

The relationships represented may include social relationships between people, transport links between places, or network topologies between connected systems. The advantage that graph databases have is easy representation, retrieval and manipulation of relationships between the entities in the system. Use it only if there are relationships between the data.

Example of such database is Neo4j

There are some databases available which can support multiple categories that we have seen so far. One such beauty is OrientDB,  that supports document store, key-value store (quick retrieval is possible due to MVRB-tree indexing algorithm and so fast look-ups) and graph store. Completely written on Java and hence can run on windows/linux/anything that has java in it operating systems. Provides SQL support so the user can query. No dependencies needed and its very light. Does have ‘running from local‘ mode as well. And yes, it is Apache licensed! so free to use.

NoSQL Architecture: Part III

There are different categories of NoSQL databases. These categories are based on how the data is stored. NoSQL products are optimized for insertion and retrival operations — because they usually happens in large scale and to calibrate performance most of the NoSQL products follow a horizontal structure. (as far as I know)

There are four major storage types available in NoSQL paradigm:
1. Column-oriented
2. Document Store
3. Key Value Store
4. Graph

Column-oriented: Data stored as columns. Typical RDBMS stores data as rows. You may argue that relational database displays data in two dimensional table with rows and columns. The difference here is when you query a RDBMS it will process one row at a time where as a column oriented database will have the data stored as columns. An example would enlighten the concept here:

Imagine following data needs to be stored. Lets compare RDBMS and NoSQL here:

Original Data

StudentID StudentName Mark1 Mark2 Mark3
12001 Bruce Wayne 55 65 75
12002 Peter Parker 66 77 88
12003 Charles Xavier 44 33 22

Data in RDMBS will be stored in the following way:
12001,Bruce Wayne,55,65,75
12002,Peter Parker,66,77,88
12003,Charles Xavier,44,33,22

Data in NoSQL will be stored in the following way:
12001,12002,12003
Bruce Wayne, Peter Parker, Charles Xavier
55,66,44
65,77,33
75,88,22

Note: This is a very trivial, simple example just to demonstrate a point. One cannot take this in face value and argue insertion will be much difficult in NoSQL. Whether it is RDBMS or NoSQL they are more sophisticated and their systems are optimized enough to handle data for processing. We are just looking things at a higher level.

The advantage of column based approach is that it is computationally faster than RDBMS. Imagine if you would like to find out average, maximum or minimum of a given subject, you dont have to go through each and every row. Instead, just look at that respective column to determine the value. Also when you query the database, it does not have to scan each row for matching conditions; whichever the column is conditioned to retrive data, only those will be touched and voila, faster processing. You have to read these assuming you have a billion records in your database and need to query all of them at once just to retrieve few hundreds of it.

Examples are HBase, Cassandra, Google Big Table, etc… Oracle also has this feature introduced quite recently.

Document Store: In the previous category, we looked at structured data, students’ records to be precise. Now we are looking at how to store somewhat structure/semi-structure data. When we use Facebook API to extract posts from the given group, we would get that in JSON format. Like this:

{ "technology": "big data", "message" ":"Way of Internet of Things to Internet of Everything","permalink":"http://www.facebook.com/comsoc/posts/10151526753216486","actor_id":130571316485}
{ "technology": "big data", "message" ":"Challenges of Big Data Analysis","permalink":"http://www.facebook.com/comsoc/posts/10151494314921486","actor_id":130571316485}
{ "technology": "big data", "message" ":"Big Data'nin hayatimiza etkisi","permalink":"http://www.facebook.com/comsoc/posts/10151490942041486","actor_id":130571316485}
{ "technology": "big data", "message" ":"Etkinligimiz hazir!! Peki Ya Siz??","permalink":"http://www.facebook.com/comsoc/posts/10151325074526486","actor_id":130571316485}
{ "technology": "big data", "message" ":"30 Nisan'da 'girisimci melekler' Istanbul'u kusatiyor. Siz nerdesiniz?","permalink":"http://www.facebook.com/comsoc/posts/10151318889096486","actor_id":130571316485}

Or even imagine something like this:

{ "StudentID": "12001", "StudentName":"Bruce Wayne", "Location" : "Gotham city" }

Another record is like this:

{ "StudentID": "12002", "StudentName":"James Tiberius Kirk", "Address" :
{"Street": "2nd street","City": "Edge city", "State": "New York"} }

Imagine where records/documents does not follow even/constant schema. RDBMS cannot process this. We need something better where such semi-structured data can be indexed and queried. The document store category of NoSQL. (About the indexing part — the document ID could be the URL from where these data are crawled, or the timestamp when the data was crawled. It is even okay if these records are without document ID)

These category of database with room for changing schema or schemaless documents would provide flexibility and hence the popularity. Ideal situation would be any web-based application where the content would have varying schema. Or in cases where the data is available in JSON or XML format.

Examples are Redis (In-memory), MongoDB, CouchDB, Lotus Notes, etc…

More on the remaining categories in future posts.

NoSQL Architecture: Part II

NoSQL can be simply understood as data storage that is not just SQL but a bit more than that. We know that relational databases have been used to store the structured data. The data is sub-divided into groups, called tables which store well-defined units of data in terms of type, size, and other constraints. Each unit of data is known as column while each unit of the group is known as row.

Why are we talking about NoSQL?

We know that traditional RDBMS has limitations in terms of scalability and parallelization. Companies like eBay, Google, Yahoo would usually get billions of requests each day/week. For example, if you are searching on google search engine, using gmail services or gtalk, or gsomething, usually accessing these services from google would be a result of multiple systems but we cannot see with our eyes how many systems are being used to process our requests. Traditional RDBMS cannot be used to process these requests and they need something more robust, parallelized and scalable. So they came up with
1. GFS: Distributed filesystem (google file system)
2. Chubby: Distributed coordination system
3. MapReduce: Parallel processing system
4. Big Table: Column oriented database

Based on the idea of the above products, many NoSQL products were born like HBase, MongoDB, etc…

Traditional databases have limitations which are perceived as features like a transaction cannot leave database in an inconsistent state, one transaction cannot interfere with another, etc… These qualities would seem ideal in context of structured data but if we are talking about web-scale then performance will be compromised. Imagine, if I am looking at a book on eBay/Amazon and finalized that I am buying the book and proceeding to payment, it will lock a part of the database, specifically the inventory, and every other person in the world will have to wait to even access the book until I complete my transaction (this cannot be possible but the point is locking one web page for a secure transaction). This would be very counterproductive and hence, NoSQL gained momentum.

Why NoSQL at all?
1. Almost all NoSQL products offer schemaless data representation. This means that you don’t have to think too far ahead to define a structure.
2. Even with the small amount of data that you have, if you can deliver in milliseconds rather than hundreds of milliseconds—especially over mobile/hand held devices and other intermittently connected devices—you have much higher probability of winning users over.
3. NoSQL is an elastic product and not brittle. It can handle sudden increase of load and it is also scalable — imagine billions of rows and columns and no need to delete anything as NoSQL can create timestamps.
4. Open source, Apache Licensed and online support from experienced users, developers, committers of the project and articles from bloggers like me!

More on NoSQL in future posts.

Market Basket Analysis with Mahout

Also known as Affinity Analysis/Frequent Pattern Mining.
Finding patterns in huge amounts of customer transactional data is called market basket analysis. This is useful where store’s transactional data is readily available. Using market basket analysis, one can find purchasing patterns. Market basket analysis is also called associative rule mining (actually its otherway around) or affinity analysis or frequent pattern mining. This technique is behind all customer promotional offers like buy 1 get 1 free, discounts, complimentary products, etc… that we see in the deparmental stores/supermarket chains.

MBA is one of the ways of recommending products to the customer. If we have customer transaction data, data where the number of items bought by each customer is available (the receipt that we get for buying a product) and imagine we have a million transaction records like this, then we can find out buying patterns. For example, lets assume the lifestyle of people at a certain locality eat chips while drinking beer. Then whoever comes into supermarket with sole purpose of buying beer, could most likely to pick up a packet of chips (crisps if you are from the UK) — now this is a pattern we know from local knowledge.

But what about other patterns that we don’t know/we don’t speak about? MBA helps us to find out such patterns.  Why do we need such pattern information? We can use this information for purchase planning, introducing new products (not just on MBA results based also with the help of statistical hypothesis/inference)

Lets perform market basket analysis using Apache Mahout. You should have Apache Mahout and Hadoop installed, up and running.

For the input dataset, I found an anonymous Belgian supermarket transaction data. This data is available thanks to the courtesy of Dr Tom Briggs from University of Hasselt, Belgium. Please visit http://fimi.ua.ac.be/data/retail.pdf and http://www.luc.ac.be/~brijs/. We will use the retail.dat as input dataset. Each record in the data set contains information about the date of purchase, the receipt number, the article number, the number of items purchased, the article price in Belgian Francs and the customer number.

The data are collected over three non-consecutive periods. The first period runs from half December 1999 to half January 2000. The second period runs from 2000 to the beginning of June 2000. The third and final period runs from the end of August 2000 to the end of November 2000. In between these periods, no data is available, unfortunately. This results in approximately 5 months of data. The total amount of receipts being collected equals 88,163. 5,133 customers have purchased at least one product in the supermarket during the data collection period.

The following steps are required to perform the market basket analysis with Apache Mahout:

1. Copy the data from local disk to HDFS:
hadoop fs -copyFromLocal /home/path to your file/retail.dat retail.dat

2. Execute Mahout’s FPG procedure
mahout fpg -i retail.dat -o patternone -method mapreduce

Note: the number of top items with most frequent pattern is by default 50. Meaning, the output will provide you top 50 items.
you can also run this procedure in sequential mode, just make sure you have the data file in the directory where you execute this.

3. Check the output after processing. It will be in the folder ‘patternone’ in your HDFS.

hadoop fs -ls /user/hduser/patternone

4. Identify the output in form of sequence file. It will be under frequentpatterns folder. Use sequence dumper utility to extract the output.
mahout seqdumper -i hdfs://localhost:54310/user/hduser/patternone/frequentpatterns/part-r-00000

5. Learn to interpret the output. It will be likely to be in following key value pair:

FPM Output

Item is called the key here. ([99],32) means that item 99 seems to be appearing in 32 transactions. ([142, 99],22) means item 142 and 99 seems to be appearing in 22 transactions. We dont know for sure what those items are but in real life situations items will be indexed against its names so you can get output with name of the item.

For example item 99 could be beer. 32 means beer was bought by 32 customers. 142 and 99 appearing in 22 transactions meaning, people have bought beer and chips together in those 22 instances. May be the remaining people already have chips at home :).

This data can be used for further analysis to determine the nature of promotion that could be offered to customers.