A single node Hadoop + Cassandra + Pig setup

In our current project, we have decided to store all operational logs into NoSQL DB. It's total volume about 97 TB per year. Cassandra was our main candidate to use as NoSQL DB. But we also have to analysis and monitor our data, where comes Hadoop and Pig to help. Within 2 days our team able to developed simple pilot projects to demonstrate all the power of Hadoop + Cassandra and Pig.

For the pilot project we used DataStax Enterprise edition.Seems this out of box product help us to quick install Hadoop, Cassandra stack and developed our pilot project. Here we made a decision to setup Hadoop, Cassandra, and Pig by our self. It's my first attempt to install Cassandra over Hadoop and Pig. Seems all these above products already running already a few years, but I haven't found any step by step tutorial to setup a single node cluster with Hadoop + Cassandra + pig.

First of all, we are going to install Hadoop and Cassandra, therefore, will try to run pig_cassandra Map only job over Cassandra column family which will save the result on Hadoop HDFS file system.

Setup Hadoop

Step 1.

Download hadoop from the following link - http://www.sai.msu.su/apache/hadoop/core/stable/ then un archive the file

tar -xvf hadoop-0.20.2.tar.gz
rm hadoop-0.20.2.tar.gz
cd hadoop-0.20.2

Step 2.

Edit the /conf/core-site.xml file. I have used localhost in the value of fs.default.name.


Step 3.

Edit the /conf/mapred-site.xml file.


Step 4.

Edit the /conf/hdfs-site.xml file as shown below. Since this test cluster has a single node, replication factor should be set to 1.


Step 5.

Set your JAVA_HOME variable in the /conf/hadoop-env.sh file. If you already have the JAVA_HOME variable in your .bash_profile - it's redundant.

Step 6.

Format the name node (one per install).

$ bin/hadoop namenode -format
it should print out the following message
12/07/15 15:54:20 INFO namenode.NameNode: STARTUP_MSG: 
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = Shamim-2.local/
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20 -r 911707; compiled by 'chrisdo' on Fri Feb 19 08:07:34 UTC 2010
12/07/15 15:54:21 INFO namenode.FSNamesystem: fsOwner=samim,staff,com.apple.sharepoint.group.1,everyone,_appstore,localaccounts,_appserverusr,admin,_appserveradm,_lpadmin,_lpoperator,_developer,com.apple.access_screensharing
12/07/15 15:54:21 INFO namenode.FSNamesystem: supergroup=supergroup
12/07/15 15:54:21 INFO namenode.FSNamesystem: isPermissionEnabled=true
12/07/15 15:54:21 INFO common.Storage: Image file of size 95 saved in 0 seconds.
12/07/15 15:54:21 INFO common.Storage: Storage directory /tmp/hadoop-samim/dfs/name has been successfully formatted.
12/07/15 15:54:21 INFO namenode.NameNode: SHUTDOWN_MSG: 
SHUTDOWN_MSG: Shutting down NameNode at Shamim-2.local/

Step 6.1.

set up passphraseless ssh. Check that you can login into localhost without passphrase

ssh localhost

if you can't, than first enable your ssh server

system preferences-> sharing-> check the box for remote logging, also you can allow access for all user. Next, execute the following commands.

$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

Step 7.

Start all hadoop components.

$ bin/hadoop-daemon.sh start namenode
$ bin/hadoop-daemon.sh start jobtracker
$ bin/hadoop-daemon.sh start datanode
$ bin/hadoop-daemon.sh start tasktracker
$ bin/hadoop-daemon.sh start secondarynamenode
starting namenode, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-namenode-Shamim-2.local.out
starting jobtracker, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-jobtracker-Shamim-2.local.out
starting datanode, logging to /Users/samim/Development/NoSQL/hadoop/core/hadoop-0.20.2/bin/../logs/hadoop-samim-datanode-Shamim-2.local.out
you can check all the log file to make sure that everything goes well.

Step 8.

Verify the NameNode and DataNodes communication through web interface. (http://localhost:50070/dfshealth.jsp).

Check the page and confirm that you have at least one Live node.

Step 9.

Verify that the JobTracker and TaskTrackers are communicating by looking at the JobTracker web interface and confirming one node listed in the Nodes column: http://localhost:50030/jobtracker.jsp

Step 10.

Use the hadoop command-line tool to test the file system:

$ hadoop dfs -ls /
$ hadoop dfs -mkdir /test_dir
$ echo "A few words to test" > /tmp/myfile
$ hadoop dfs -copyFromLocal /tmp/myfile /test_dir
$ hadoop dfs -cat /test_dir/myfile
A few words to test

Setup Cassandra.

Step 1.

Download the source code for cassandra verion 1.1.2 from the following link http://www.apache.org/dyn/closer.cgi?path=/cassandra/1.1.2/apache-cassandra-1.1.2-src.tar.gz. I assume that you know how to build the cassandra from the source code, otherwise you will find a lot of information though google to build cassandra from the source code.

Step 2.

Edit CASSANDRA_HOME/conf/cassandra.yaml file to set the listen_address and rpc_address to localhost.

Step 3.

Start cassandra by the following command

$ cassandra/bin/ ./cassandra 

Step 4.

Check the cluster through node tool utility.

$ /bin ./nodetool -h localhost ring
Note: Ownership information does not include topology, please specify a keyspace. 
Address         DC          Rack        Status State   Load            Owns                Token                                     datacenter1 rack1       Up     Normal  55.17 KB        100.00%         96217188464178957452903952331500076192  
Cassandra cluster starts up, now we are going to configure pig

Setup Pig.

Step 1.

Download pig from the apache site as follows http://www.sai.msu.su/apache/pig/. Unarchive the file with the following command.

tar -xvf pig-0.8.0.tar.gz rm pig-0.8.0.tar.gz 

At this moment we will try to run the pig_cassandra example which you can find with the source distribution.

First of all it's better to read the README.TXT file from the folder apache-cassandra-1.1.2-src/examples/pig/README.txt. Set all the env variables describes in the readme.txt file as follows:

export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner

Also, if you would like to run using the Hadoop backend, you have to set the PIG_CONF_DIR to the location of your Hadoop config. In my cases:

export PIG_CONF_DIR=hadoop/core/hadoop-0.20.2/conf

In this stage you can run grunt shell to run map reduce task, run

examples/pig$ bin/pig_cassandra -x local 

It should prompt a grunt shell, but I have got the following clssnofound exception: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.RunningJob. \For a quick fix, I decide to edit the pig_cassandra file as follows:

export HADOOP_CLASSPATH="/Users/xyz/hadoop/core/hadoop-0.20.2/hadoop-0.20.2-core.jar"

While I got the grunt shell running, i create a keyspace and one column family in cassandra cluster and insert some value through cassandra-cli.

[default@unknown] create keyspace Keyspace1;
[default@unknown] use Keyspace1;
[default@Keyspace1] create column family Users with comparator=UTF8Type and default_validation_class=UTF8Type and key_validation_class=UTF8Type;
[default@KS1] set Users[jsmith][first] = 'John';
[default@KS1] set Users[jsmith][last] = 'Smith';
[default@KS1] set Users[jsmith][age] = long(42)

Next, I execute the following pig query in grunt shell:

grunt> rows = LOAD 'cassandra://Keyspace1/Users' USING CassandraStorage() AS (key, columns: bag {T: tuple(name, value)});
grunt> cols = FOREACH rows GENERATE flatten(columns);
grunt> colnames = FOREACH cols GENERATE $0;
grunt> namegroups = GROUP colnames BY (chararray) $0;
grunt> namecounts = FOREACH namegroups GENERATE COUNT($1), group;
grunt> orderednames = ORDER namecounts BY $0;
grunt> topnames = LIMIT orderednames 50;
grunt> dump topnames;

Pig execute the above script and here is the statistics:

2012-07-15 17:29:35,878 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Detected Local mode. Stats reported below may be incomplete
2012-07-15 17:29:35,881 [main] INFO  org.apache.pig.tools.pigstats.PigStats - Script Statistics: 
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
0.20.2 0.8.3 samim 2012-07-15 17:29:14 2012-07-15 17:29:35 GROUP_BY,ORDER_BY,LIMIT
Job Stats (time in seconds):
JobId Alias Feature Outputs
job_local_0001 colnames,cols,namecounts,namegroups,rows GROUP_BY,COMBINER 
job_local_0002 orderednames SAMPLER 
job_local_0003 orderednames ORDER_BY,COMBINER file:/tmp/temp-833597378/tmp-220576755,
Successfully read records from: "cassandra://Keyspace1/Users"
Successfully stored records in: "file:/tmp/temp-833597378/tmp-220576755"
Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002 -> job_local_0003,
2012-07-15 17:29:35,881 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,886 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,887 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,888 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2012-07-15 17:29:35,904 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2012-07-15 17:29:35,907 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2012-07-15 17:29:35,907 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

You should find the output file in the hadoop file system tmp directory. In my case it's looks like as follows:


If you would like to run the example-script.pig, you have to create one KeySpace name MyKeySpace and a column family according to the pig script. I just edited the example-script.pig and set the newly created keyspace1, and column family Users. Then you can run the script again like this:

examples/pig$ bin/pig_cassandra example-script.pig

If you want to run the pig in local mode, add the following predicates -x local. For example

pig_cassandra -x local example-script.pig

Without the instruction -x local, pig will run the script on Hadoop mode. See here for more information. Thank'x Nabanita to point out this moment.

My next step is to set up Cassandra cluster with 4 nodes over Hadoop and run a Map-reduce task all over the cluster nodes.


1) Cassandra high-performance cook book.

2) Cassandra definitive guide.

3) http://stackoverflow.com/questions/8846788/pig-integrated-with-cassandra-simple-distributed-query-takes-a-few-minutes-to-c

13 views0 comments