Hadoop and Big Data World !

Hadoop Introduction

Hadoop and Big Data interesting topics .. In last few years 90 % of data got generated mainly due social networking like Facebook / blogs / twitter / websites / news sites etc … the industry is challenged by the volume of data thats getting generated every day.

The major business challenges with respect to data are

1. Data storage space and cost
2. Cost of data retrieval from storage servers after archival process.
3. Getting meaningful intelligence from tons of data
4. New additions of data columns which were not thought about earlier during data modeling
5. ETL Operations on constantly getting generated data extracting right information at right time.

Technical Blocks : huge volumes of structured (Database ) and UN-structured ( logs / xml / text file etc ) data is Big Data .. The Framework to Manage this big data is Hadoop,

Imaging if you are given 1000 movies each movie of length 700 mb ( 1 CD Size ) …with 100 users trying to access these movies from a storage location over network ….. the first thing that happens in Hadoop world is to create a Job or Process to divide those 1000 movies into Key , Value pair …

example Movie 1 , Actor 1 ….. Movie 2, Actor 2, .. Movie 3, Actor 1 .. Movie 4, Actor 2 … this Process is called Mapping or Map

Then comes the Process of Reducing where Movie 1 , Movie 3 are paired with Actor 1 .. Movie 2, Movie 4 are paired with Actor 2 .. this process is called reducing

The combined Process of Mapping and Reducing is called MapReduce .. which is extensively used when we talk about Hadoop Technology

HDFS is a distributed file system used by Hadoop , where each of these 700 mb movie is split into chunks of 64 mb movies and multiple copies are kept in multiple Nodes ( Servers )

At any given time when 1 server is loaded or high on performance .. the movie chunk is automatically picked up by other server which has lesser load .. this also compliments the load balancing , distributed architecture and high availability concepts

Hadoop Installation on ubuntu vm

To Install Hadoop , you would need to start with Installing JDK , Setting up Java home , setting up path , installation Hadoop and setting up Hadoop in the path

Step 1 : Install JDK , my version is jdk1.7.0_67

follow this Excellent Article on Installation and Configuration of JDK on Ubuntu 64 bit VM

http://www.wikihow.com/Install-Oracle-Java-on-Ubuntu-Linux

Step 2 : Download and Extract and Set Paths for Hadoop  , my version is Hadoop 2.5.1

follow these Instructions to set up Hadoop in Single Node Cluster

http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html

Step 3 : Start Up Scripts from Desktop

#!/bin/bash
cd /home/ubuntu/hadoop/scripts
# format only for first time
gnome-terminal --window-with-profile=LogWindow --working-directory=${SCRIPT_DIR}
--title="Start Namenode" --geometry=80x24+250+40 -e "/home/ubuntu/hadoop/bin/hdfs namenode -format"
echo Formating Namenode .
# Start DFS Server
gnome-terminal --window-with-profile=LogWindow --working-directory=${SCRIPT_DIR} --title="Start DFS"
--geometry=80x24+290+100 -e /home/ubuntu/hadoop/sbin/start-dfs.sh
echo starting DFS
# Start Yarn
gnome-terminal --window-with-profile=LogWindow --working-directory=${SCRIPT_DIR} --title="Start Yarn"
--geometry=80x24+310+130 -e /home/ubuntu/hadoop/sbin/start-yarn.sh echo starting Yarn .

Step 4 : Shutdown Scripts

#!/bin/bash
cd /home/ubuntu/hadoop/scripts
gnome-terminal --window-with-profile=LogWindow --working-directory=${SCRIPT_DIR} --title="Stop Yarn"
--geometry=80x24+250+40 -e /home/ubuntu/hadoop/sbin/stop-yarn.sh
echo stopping Yarn
gnome-terminal --window-with-profile=LogWindow --working-directory=${SCRIPT_DIR} --title="Stop DFS"
--geometry=80x24+290+100 -e /home/ubuntu/hadoop/sbin/stop-dfs.sh
echo stopping DFS

Hadoop Installation verification

Now let us check if the installation has gone right and everything is working as expected

Name Node Information

http://localhost:50070/

ss1

Hadoop Cluster

http://localhost:8088/cluster

ss2 ss3

Putting a File in HDFS and reading file content

Let us see how we can put a text file into Hadoop File System ( HDFS )

ubuntu@ubuntu:~/Documents/test-data/ch1$ hadoop fs -put movie_actors*.txt /
ubuntu@ubuntu:~/Documents/test-data/ch1$ hadoop fs -cat /movie_actors1.txt
Interstellar Ellen Burstyn Murph Matthew McConaughey Cooper Mackenzie
Foy Murph John Lithgow Donald Chalamet David Oyelow Collette Wolfe
ubuntu@ubuntu:~/Documents/test-data/ch1$ hadoop fs -cat /movie_actors2.txt
The Hunger Games Jennifer Lawrence Josh Hutcherson Liam Hemsworth
Donald Sutherland Julianne Moore
ubuntu@ubuntu:~/Documents/test-data/ch1$

We can view the Same Files in File Browser http://localhost:50070/explorer.html#

ss5

Running mapreducer java program

let us Run a Map Reducer Program , that Runs a count of words on a text file.

Step 1 : Create a textfile wizard.txt copy paste some text from movie script wizard of oz , you can get the text from here

http://oz.wikia.com/wiki/The_Wonderful_Wizard_of_Oz

Step 2 : Download and Install any Java Compiler IDE like Eclipse or JDeveloper , I Prefer JDeveloper with my comfort of using it. You would need Hadoop Jar files

jd1 jd2

Create Java Code , Compile and Create jar file wc.jar ,

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Step 3 : Create a Directory inputfile and Put the file wizard.txt to HDFS File System under inputfile folder and check if it exists

ubuntu@ubuntu:~/jdeveloper/mywork/App1/Client/deploy$
hadoop fs -mkdir /inputfile
ubuntu@ubuntu:~/jdeveloper/mywork/App1/Client/deploy$
hadoop fs -put /home/ubuntu/hadoop/input/wizard.txt /inputfile/

ss7

Running the Hadoop Job

ubuntu@ubuntu:~/jdeveloper/mywork/App1/Client/deploy$
hadoop jar wc.jar WordCount /inputfile/wizard.txt /outputfile
14/11/21 17:17:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/11/21 17:17:06 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed.
Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/11/21 17:17:06 INFO input.FileInputFormat: Total input paths to process : 1
14/11/21 17:17:06 INFO mapreduce.JobSubmitter: number of splits:1
14/11/21 17:17:06 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1416542146368_0005
14/11/21 17:17:06 INFO impl.YarnClientImpl: Submitted application application_1416542146368_0005
14/11/21 17:17:06 INFO mapreduce.Job: The url to track the job:
http://ubuntu:8088/proxy/application_1416542146368_0005/
14/11/21 17:17:06 INFO mapreduce.Job: Running job: job_1416542146368_0005
14/11/21 17:17:13 INFO mapreduce.Job: Job job_1416542146368_0005 running in uber mode : false
14/11/21 17:17:13 INFO mapreduce.Job:  map 0% reduce 0%
14/11/21 17:17:18 INFO mapreduce.Job:  map 100% reduce 0%
14/11/21 17:17:24 INFO mapreduce.Job:  map 100% reduce 100%
14/11/21 17:17:24 INFO mapreduce.Job: Job job_1416542146368_0005 completed successfully
14/11/21 17:17:24 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=24234
		FILE: Number of bytes written=241843
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=35554
		HDFS: Number of bytes written=16781
		HDFS: Number of read operations=6
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=2958
		Total time spent by all reduces in occupied slots (ms)=3107
		Total time spent by all map tasks (ms)=2958
		Total time spent by all reduce tasks (ms)=3107
		Total vcore-seconds taken by all map tasks=2958
		Total vcore-seconds taken by all reduce tasks=3107
		Total megabyte-seconds taken by all map tasks=3028992
		Total megabyte-seconds taken by all reduce tasks=3181568
	Map-Reduce Framework
		Map input records=353
		Map output records=6722
		Map output bytes=62156
		Map output materialized bytes=24234
		Input split bytes=107
		Combine input records=6722
		Combine output records=1890
		Reduce input groups=1890
		Reduce shuffle bytes=24234
		Reduce input records=1890
		Reduce output records=1890
		Spilled Records=3780
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=59
		CPU time spent (ms)=1790
		Physical memory (bytes) snapshot=399556608
		Virtual memory (bytes) snapshot=1411158016
		Total committed heap usage (bytes)=308281344
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=35447
	File Output Format Counters
		Bytes Written=16781

Verify Output of Wordcount

ubuntu@ubuntu:~/jdeveloper/mywork/App1/Client/deploy$
hadoop fs -cat /outputfile/part-r-00000
City."	1
City?"	2
DOROTHY	1
Do	1
Dorothy	53
Dorothy!"	1
Dorothy's	2
Dorothy,	10
Dorothy,"	1
Dorothy.	12
Dorothy;	1
EMERALDS"	1
East	5
East,	3
East.	1
East?	1
Em	6
Em,	2
Em,"	1
Em?"	1
Emerald	10
Emeralds	1
Emeralds.	2

ss10

Understanding Reducer – Enhancing Results

Observe the Results from First Run of Word count program

Dorothy	53
Dorothy!"	1
Dorothy's	2
Dorothy,	10
Dorothy,"	1
Dorothy.	12
Dorothy;	1
EMERALDS"	1
East	5

if we look at the Word Dorothy its repeating with , ” etc .. so let us enhance word count program to eliminate a pattern of string and enhance the results
so we create one more java class that can check a pattern

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class WordCount2 {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    static enum CountersEnum { INPUT_WORDS }

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private boolean caseSensitive;
    private Set<String> patternsToSkip = new HashSet<String>();

    private Configuration conf;
    private BufferedReader fis;

    @Override
    public void setup(Context context) throws IOException,
        InterruptedException {
      conf = context.getConfiguration();
      caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);
      if (conf.getBoolean("wordcount.skip.patterns", true)) {
        URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
        for (URI patternsURI : patternsURIs) {
          Path patternsPath = new Path(patternsURI.getPath());
          String patternsFileName = patternsPath.getName().toString();
          parseSkipFile(patternsFileName);
        }
      }
    }

    private void parseSkipFile(String fileName) {
      try {
        fis = new BufferedReader(new FileReader(fileName));
        String pattern = null;
        while ((pattern = fis.readLine()) != null) {
          patternsToSkip.add(pattern);
        }
      } catch (IOException ioe) {
        System.err.println("Caught exception while parsing the cached file '"
            + StringUtils.stringifyException(ioe));
      }
    }

    @Override
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      String line = (caseSensitive) ?
          value.toString() : value.toString().toLowerCase();
      for (String pattern : patternsToSkip) {
        line = line.replaceAll(pattern, "");
      }
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
        Counter counter = context.getCounter(CountersEnum.class.getName(),
            CountersEnum.INPUT_WORDS.toString());
        counter.increment(1);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
    String[] remainingArgs = optionParser.getRemainingArgs();
    if (!(remainingArgs.length != 2 || remainingArgs.length != 4)) {
      System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    List<String> otherArgs = new ArrayList<String>();
    for (int i=0; i < remainingArgs.length; ++i) {
      if ("-skip".equals(remainingArgs[i])) {
        job.addCacheFile(new Path(remainingArgs[++i]).toUri());
        job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
      } else {
        otherArgs.add(remainingArgs[i]);
      }
    }
    FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Compile the code , create jar file and run the code , before running code let us create a patterns.txt file and put it into HDFS

\.
\,
\!
,"
's
!"
"

let us checkin this file into HDFS

ubuntu@ubuntu:~/jdeveloper/mywork/App1/Client/deploy$
hadoop fs -cat /inputfile/patterns.txt

Let us now run the Enhanced version of wordcounter

ubuntu@ubuntu:~/jdeveloper/mywork/App1/Client/deploy$
hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true
/inputfile/wizard.txt /outputfile3  -skip /inputfile/patterns.txt

Let us now verify the resulting Part file

DOROTHY	1
Did	1
Do	2
Don't	1
Dorothy	79
Dorothy;	1

The Results are much better than earlier , the word count of Dorothy has increased based on pattern char removal .. this is the concept of MAPREDUCE

ss12

About Author

What Next >> ? Oracle NoSQL Basics and Implementation

Madhusudhan Rao