Kalanand's April 2014 Log

   March 2014   
April 2014
   May 2014   

April 1st

Quickly generating prime numbers

Today I learned an easy way to generate prime numbers using python:
# Suppose we want a list of prime numbers below N = 100
# First create a list of non-prime numbers below N. Need to check 
# if thy are divisible by integers up to sqrt(N) 

import math
N = 100
noprimes = [j for i in range(2, int(math.sqrt(N))) for j in range(i*2, N, i)]

# Now get the "inverse" of the above list, which are prime numbers. 

primes = [x for x in range(2, N) if x not in noprimes]
print primes 
# stdout: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]
On a related note, some useful math functions in pyton are
 math.pi             # constant pi = 3.141592...
 math.e              # constant e = 2.718281...
 math.frexp(x)       # Return (m,e) such that x == m * 2**e 
 math.fsum(iterable) # Accurate floating point sum 
 math.isinf(x)       # Check if x is infinity
 math.isnan(x)       # Check if x is a NaN (not a number).
 math.ldexp(x, i)    # Return x * (2**i). 
 math.modf(x)        # Return the fractional and integer parts of x. 
 math.trunc(x)       # Return the value x truncated to an Integral

 math.log(x[, base]) # By default returns the natural logarithm (to base e).
 math.pow(x, y)
 math.hypot(x, y)    # Return the Euclidean norm, sqrt(x*x + y*y). 

 math.sin(x), math.cos(x), math.tan(x)       # trignometric functions
 math.asin(x), math.acos(x), math.atan(x)    # inverse trignometric 
 math.sinh(x), math.cosh(x), math.tanh(x)    # hyperbolic 
 math.asinh(x), math.acosh(x), math.atanh(x) # inverse hyperbolic

 math.atan2(y, x)    # Return atan(y/x), in radians. Between -pi and pi. 
 math.degrees(x)     # Converts angle x from radians to degrees.
 math.radians(x)     # Converts angle x from degrees to radians.

 math.erf(x)         # error function
 math.erfc(x)        # complementary error function
 math.gamma(x)       # Gamma function
 math.lgamma(x)      # natural log of the absolute value of the Gamma function
Also, complex numbers are built-in type in python. Some useful functions that can be called on complex numbers
 cmath.abs(x)        # absolute value, r
 cmath.phase(x)      # phase, phi
 cmath.polar(x)      # Returns (r, phi)
 cmath.rect(r, phi)  # Returns (x, y)

 cmath.log(x[, base])

 ### The trigonometric and hyperbolic functions work exactly like the above.

April 2nd

Python: list comprehension vs map lambda

Turns out that I can do a lot of the same things using either list comprehension or map lambda construction
words = 'I am going home'.split()
print words    # stdout: ['I', 'am', 'going', 'home']

# First, I try list comprehension
stuff = [[w.upper(), w.lower(), len(w)] for w in words]
for i in stuff: print i
# stdout:
# ['I', 'i', 1]
# ['AM', 'am', 2]
# ['GOING', 'going', 5]
# ['HOME', 'home', 4]

# Next, try map lambda construction
stuff = map(lambda w: [w.upper(), w.lower(), len(w)], words)
for i in stuff: print i
# stdout:
# ['I', 'i', 1]
# ['AM', 'am', 2]
# ['GOING', 'going', 5]
# ['HOME', 'home', 4]
Turns out you cannot use list comprehensions when the construction rule is too complicated to be expressed with "for" and "if" statements, or if the construction rule can change dynamically at runtime. In this case, use map() and / or filter() with an appropriate function.

April 3rd

Recap of Hadoop basics

Some frequently used acronyms

    HDFS - Hadoop Distributed File System
    GFS - Google File System
    JSON - Java Script Object Notation
    NN - NameNode
    DN - Data Node
    SNN - Secondary NameNode
    JT - Job Tracker
    TT - Task Tracker
    HA NN - Highly Available NameNode (or NN HA - NameNode Highly Available)
    REST - Representational State Transfer
    HiveQL - Hive SQL
    CDH - Cloudera's Distribution of Hadoop
    ZKFC - ZooKeeper Failover Controller
    FUSE - Filesystem In Userspace
    YARN - Yet Another Resource Negotiator
    Amazon EC2 - Amazon Elastic Compute Cloud
    Amazon S3 - Amazon Simple Storage Service 
    ACID - Atomicity, Consistency, Isolation, and Durability
    DHT  - Distributed Hash Table
    OLTP - Online Transaction Processes

Hadoop components

When to use hadoop: When not to use hadoop: MapReduce extensions and contemporaries MapReduce extensions vs RDBMS

RDBMS: MapReduce:

Hadoop download page

Download the latest stable release of Hadoop (hadoop-x.y.z.tar.gz) from the following link:


Then untar and move the whole directory to your project area. I have it at:

Basic setup to run Hadoop on my laptop running Mac 10.9.2 (Mavericks)

I needed to add the following lines in ~/.tcshrc configuration
setenv JAVA_HOME /System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
setenv HADOOP_HOME /Users/kalanand/projects/hadoop-2.4.0
Next, I created a directory $HADOOP_HOME/conf with the following configuration files in it


# Set Hadoop-specific environment variables here.
export HADOOP_VERSION=2.4.0
export HADOOP_HOME=$HOME/projects/hadoop-$HADOOP_VERSION
export HADOOP_OPTS="-Djava.security.krb5.realm= -Djava.security.krb5.kdc="

# The only required environment variable is JAVA_HOME.  All others are
# optional.  When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.

# The java implementation to use.  Required.
export JAVA_HOME=$(/usr/libexec/java_home)

# Extra Java CLASSPATH elements.  Optional.

export HADOOP_CONF_DIR=$HOME/projects/hadoop-$HADOOP_VERSION/conf

# The maximum amount of heap to use, in MB. Default is 1000.


<?xml version="1.0"?>
<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>
        <name>fs.default.name< /name>
        <value>hdfs://localhost:9000< /value>


<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

   <name>dfs.replication< /name>
   <value>1< /value>
   <name>dfs.namenode.name.dir< /name>
   <name>dfs.datanode.data.dir< /name>


<?xml version="1.0" encoding="UTF-8"?>



<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <name>mapreduce.framework.name< /name>
      <value>yarn< /value>


<?xml version="1.0"?>
   <name>yarn.nodemanager.aux-services< /name>
   <value>mapreduce_shuffle< /value>
   <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class< /name>
   <value>org.apache.hadoop.mapred.ShuffleHandler< /value>

Create directories to be used by NameNode and DataNode

mkdir -p $HADOOP_HOME/yarn_data/hdfs/namenode
mkdir -p $HADOOP_HOME/yarn_data/hdfs/datanode

Now Hadoop is almost ready to run

Let's check hadoop version
hadoop version
I get the following stdout output:
Hadoop 2.4.0
Subversion http://svn.apache.org/repos/asf/hadoop/common -r 1583262
Compiled with protoc 2.5.0
From source with checksum 375b2832a6641759c6eaf6e3e998147
This command was run using /Users/kalanand/projects/hadoop-2.4.0/share/hadoop/common/hadoop-common-2.4.0.jar

Format NameNode

Before we run any real job we need to format the NameNode. This step is needed only for the first time. Doing it every time will result in loss of content on HDFS.
hadoop namenode -format

Actual running: start HDFS processes

The following shell script starts the DFS and yarn in one go:
After running the above command, I get the following stdout:
This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
Starting namenodes on [localhost]
localhost: starting namenode, logging to /Users/kalanand/projects/hadoop-2.4.0/logs/hadoop-kalanand-namenode-xxx.out
localhost: starting datanode, logging to /Users/kalanand/projects/hadoop-2.4.0/logs/hadoop-kalanand-datanode-xxx.out
Starting secondary namenodes [] starting secondarynamenode, logging to /Users/kalanand/projects/hadoop-2.4.0/logs/hadoop-kalanand-secondarynamenode-xxx.out
starting yarn daemons
starting resourcemanager, logging to /Users/kalanand/projects/hadoop-2.4.0/logs/yarn-kalanand-resourcemanager-xxx.out
localhost: starting nodemanager, logging to /Users/kalanand/projects/hadoop-2.4.0/logs/yarn-kalanand-nodemanager-xxx.out
We could have instead run the DFS and yarn separately, as suggested in the above stdout message
Alternatively, we could start HDFS processes one-by-one
sbin/hadoop-daemon.sh start namenode
sbin/hadoop-daemon.sh start datanode
sbin/hadoop-daemon.sh start tasktracker
sbin/yarn-daemon.sh start resourcemanager
sbin/yarn-daemon.sh start nodemanager
sbin/mr-jobhistory-daemon.sh start historyserver

Running the famous wordcount example

Now let's run an actual job to verify that everything is working. In this example, we create a text file and perform hadoop MapReduce to count the frequency of each word in this file.
mkdir in
emacs in/file & 
## Write the following two lines in this text file  
  This is one line
  This is another one
Add this directory to HDFS:
hadoop fs -copyFromLocal in /in
Run wordcount example provided:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.0.jar wordcount /in /out
Check the output:
hadoop fs -ls /out
hadoop fs -cat /out/part-r-00000
I get the following stdout:
This 2
another 1
is 2
line 1
one 2

Monitoring via commandline

hadoop dfsadmin -report

Monitoring via web browser

Browse HDFS and check health of the nodes at: http://localhost:50070

Check the status of the applications running at: http://localhost:8088

Deleting a directory from Hadoop cluster

We can delete the above created /out directory in HDFS using the following command
hadoop fs -rmr hdfs://localhost:9000/out
Some common hadoop file handling commands are
Syntax: hadoop fs -<command> <arguments> 
Example commands:
cat            # Exit code: Returns 0 on success and -1 on error. 
copyFromLocal  # Similar to put command, except restricted to a local file 
copyToLocal    # Similar to get command, except restricted to a local file
cp             # 0 == success, -1 == error
du             # 0 == success, -1 == error 
dus            # Similar to du -s in Unix
get            # Copy to local file system, 0 == success, -1 == error
getmerge       # Takes a source directory and a destination file as input and concatenates files in src into the destination local file.
ls             # 0 == success, -1 == error
lsr            # Similar to Unix ls -R 
mkdir          # 0 == success, -1 == error
mv             # allows multiple sources, 0 == success, -1 == error
put            # Copy local file to the destination filesystem.
rmr            # Similar to rm -r in Unix
stat           # Returns the stat information on the path. 
tail           # Displays last kilobyte of the file to stdout.
test           # -e check to see if the file exists (0 if true), -z check to see if the file is zero length (0 if true), -d check return 1 if the path is directory else return 0. 
text           # Takes a source file and outputs the file in text format. 
touchz         # Create a file of zero length. 

Stop the processes

In one step:
Or, in two steps:
Or, stop each process one-by-one:
sbin/hadoop-daemon.sh stop namenode
sbin/hadoop-daemon.sh stop datanode
sbin/hadoop-daemon.sh stop tasktracker
sbin/yarn-daemon.sh stop resourcemanager
sbin/yarn-daemon.sh stop nodemanager
sbin/mr-jobhistory-daemon.sh stop historyserver

Useful references

April 7th

Protocols for updating distributed database

If someone is reading from a database at the same time as someone else is writing to it, it is possible that the reader will see a half-written or inconsistent piece of data. There are several ways of solving this problem, known as concurrency control methods. The simplest way is to make all readers wait until the writer is done, which is known as a lock. This can be very slow, and particularly problematic for distributed systems.

Consider the following bank transaction example (transferring money from one bank account to another)

UPDATE ACCOUNTS SET Balance = Balance - 100 
WHERE AccountID = 123

UPDATE ACCOUNTS SET Balance = Balance + 100 
WHERE AccountID = 456

Desirable to have the following transaction properties (ACID):
Atomicity: all or nothing
Consistency: moving from one consistent state to another
Isolation: whether operations of an unfinished transaction affect other transactions or not
Durability: when a transaction is finished, its changes are permanent even if there is a system failure

There are three commonly used methods to resolve concurrency control:
Coordinator:In this model the coordinator begins transaction, assigns unique transaction ID, and is responsible for commit/abort. Many systems allow any client to be the coordinator for its own transactions.

The commit-step itself is two phases
Phase 1: Voting: Each participant prepares to commit, and votes on whether or not it can commit
Phase 2: Committing: Each participant actually commits or aborts.

Multiversion concurrency control (MVCC): In this approach, each user connected to the database sees a snapshot of the database at a particular instant in time. Any changes made by a writer will not be seen by other users of the database until the changes have been completed (or, in database terms: until the transaction has been committed.).

When an MVCC database needs to update an item of data, it will not overwrite the old data with new data, but instead mark the old data as obsolete and add the newer version elsewhere. Thus there are multiple versions stored, but only one is the latest.

Databases with MVCC: Berkeley DB, CouchDB, HBase, MySQL when used with InnoDB, Oracle database, PostgreSQL.

Paxos: Distributed voting scheme. Relieves dependency on coordinator. Very successful and widely applied.

Databases with Paxos: BigTable, Google Spanner and Megastore, Cassandra.

CAP theorem:

It states that it is impossible for a distributed computer system to simultaneously provide all three of the following guarantees:
According to the theorem, a distributed system cannot satisfy all three of these guarantees at the same time.

Vector clocks:

A vector clock of a system of N processes is an array/vector of N logical clocks, one clock per process, with the following rules for clock updates: Let's define:

R = Minimum # of nodes that participate in a successful read
W = Minimum # of nodes that participate in a successful write
N = Replication factor

If R + W > N, then you can claim consistency.
If R + W < N, then it means lower latency.

BigTable data model:

BigTable is a proprietary data storage system built on Google File System. It is not distributed outside Google, although Google offers access to it as part of its Google App Engine.
HBase is open source implementation of BigTable. Written in Java. Colossus is the successor to the Google File System (GFS).

What is Pig ?

A classic DB problem: Suppose you have user data in one file, web site data in another, and you need to find the 5 top most visited sites by users aged 18−25.
Load Users                    Load Pages
    |                            | 
Filter by age                    |
    |                            |
             Join on name
             Group on url 
             Count clicks
             Order by clicks
             Take top 5
Pig Latin code to do this (9 lines of code, 15 minutes to write)
Users  = LOAD 'Users' AS (name, age);
Fltrd  = FILTER Users BY age >= 18 AND age <= 25; 
Pages  = LOAD 'Pages' AS (user, url);
Jnd    = JOIN Fltrd BY name, Pages BY user;
Grpd   = GROUP Jnd BY url;
Srtd   = ORDER Smmd BY clicks DESC;
Top5   = LIMIT Srtd 5;
STORE Top5 INTO 'top5sites';
A simple Pig Latin program example:
A  = LOAD 'file1' AS (sid, pid, mass, px:double);
B  = LOAD 'file2' AS (sid, pid, mass, ps:double);
C  = FILTER A BY px<10;
D  = JOIN C BY sid, B BY sid;
STORE D into 'output.txt';
Note: Pig Latin follows lazy execution philosophy. So, nothing happens until you want to write the output, i.e., the last line of the above code.

April 8th

Pig data model

Pig functions

LOAD: Read data in memory Example:
A = LOAD 'myfile.txt' USING PigStorage('\t') AS (f1, f2, f3);
A = <1,2,3>  
FILTER: Getting rid of unnecessary data Example:
Y = FILTER A BY f1=='8';
Y = <8,3,8>
GROUP: Getting data together
X = GROUP A BY f1;
X = <1, {<1,2,3>}>
    <4, {<4,2,1>}>
    <8, {<8,3,4>}>
Note: The first field will be named "group", the second field "A".

FOREACH: Manipulate each table
X = <1,5>
COGROUP: Getting data together
C = COGROUP A BY f1, B BY $0;
A = <1,2,3>             B = <2,4>
    <4,2,1>                 <8,9>
    <8,3,4>                 <1,3>

C = <1, {<1,2,3>}, {<1,3>}>
    <4, {<4,2,1>}, {}>
    <8, {<8,3,4>}, {<8,9>}>
JOIN: A special case of COGROUP
C = JOIN A BY $0, B BY $0;
C = <1,2,3,1,3>

Three special JOIN algorithms

Principle of "in situ" data analysis: To deal with petabytes of data you will have to bring computation to data instead of bringing data to the computation.

Read somewhere a very apt quotation: A data scientist is better at coding than any statistician around and is better at statistics than any software programmer around.

Pig query example to analyze web traffic data

A = LOAD 'traffic.dat' AS (ip, time, url);
B = GROUP A BY ip;
D = FILTER BY ip IS '' OR ip IS '';
STORE D INTO 'local_traffic.dat';
Note: In principle, the above program is inefficient since we may want to do FILTER as the first command. But since Pig does "lazy evaluation" (i.e., no work is done until STORE), the Pig implementation will automatically move the FILTER step near the top to optimize performance.

April 22nd

Rename multiple files using wildcard

Suppose I want to rename all files named "file-old-*" to "file-new-*". Here is a simple way to do this from a unix shell
for f in file-*; do mv "$f" "${f/old/new}";done

April 24th

rsync/copy local file to a remote server

Here is how I did it successfully. Note the "-z" option which compresses the files before transferring, so the process is a little faster.
rsync -avz Kalanand-*.pdf kmishran@kmishra.net:

Go to March's log

Last modified: Thu April 24 15:09:24 CDT 2014