Friday, November 28, 2014

Hello Hadoop: Welcoming Parallel Processing

Even the largest computers struggle with complex problems that have a lot of variables and large data sets. Imagine if one person had to sort through 26,000 boxes of large balls containing sets of 1,000 balls each with one letter of the alphabet: the task would take days. But if you separated the contents o the 1,000 unit boxes into 10 smaller equal boxes and asked 10 separate people to work on these smaller tasks, the job would be completed 10 times faster. This notion of parallel processing is one of the cornerstones of many Big Data projects.

Apache Hadoop (named after the creator Doug Cutting’s child’s toy elephant) is a free programming framework that supports the processing of large data sets in a distributed computing environment. Hadoop is part of the Apache project sponsored by the Apache Software Foundation and although it originally used Java, any programming language can be used to implement many parts of the system.

Hadoop was inspired by Google’s Map-Reduce, a software framework in which an application is broken down into numerous small parts. Any of these parts (also called fragments or blocks) can be run on any computer connected in an organised group called a cluster. Hadoop makes it possible to run applications on thousands of individual computers involving thousands of terabytes of data. Its distributed file system facilitates rapid data transfer rates among nodes and enables the system to continue operating uninterrupted in case of a node failure. This approach lowers the risk of catastrophic system failure, even if a significant number of computers become inoperative.

Saturday, November 15, 2014

Why use Hadoop? Any other solution to Large-Scale Data ?

Hadoop is a large-scale distributed batch processing infrastructure. While it can be used on a single machine, its true power lies in its ability to scale to hundreds or thousands of computers, each with several processor cores. Hadoop is also designed to efficiently distribute large amounts of work across a set of machines.

How large an amount of work? Orders of magnitude larger than many existing systems work with. Hundreds of gigabytes of data constitute the low end of Hadoop-scale. Actually Hadoop is built to process "web-scale" data on the order of hundreds of gigabytes to terabytes or petabytes. At this scale, it is likely that the input data set will not even fit on a single computer's hard drive, much less in memory. So Hadoop includes a distributed file system which breaks up input data and sends fractions of the original data to several machines in your cluster to hold. This results in the problem being processed in parallel using all of the machines in the cluster and computes output results as efficiently as possible.

Challenges at Large Scale

Performing large-scale computation is difficult. To work with this volume of data requires distributing parts of the problem to multiple machines to handle in parallel. Whenever multiple machines are used in cooperation with one another, the probability of failures rises. In a single-machine environment, failure is not something that program designers explicitly worry about very often: if the machine has crashed, then there is no way for the program to recover anyway.

In a distributed environment, however, partial failures are an expected and common occurrence. Networks can experience partial or total failure if switches and routers break down. Data may not arrive at a particular point in time due to unexpected network congestion. Individual compute nodes may overheat, crash, experience hard drive failures, or run out of memory or disk space. Data may be corrupted, or maliciously or improperly transmitted. Multiple implementations or versions of client software may speak slightly different protocols from one another. Clocks may become desynchronized, lock files may not be released, parties involved in distributed atomic transactions may lose their network connections part-way through, etc. In each of these cases, the rest of the distributed system should be able to recover from the component failure or transient error condition and continue to make progress. Of course, actually providing such resilience is a major software engineering challenge.

Different distributed systems specifically address certain modes of failure, while worrying less about others. Hadoop provides no security model, nor safeguards against maliciously inserted data. For example, it cannot detect a man-in-the-middle attack between nodes. On the other hand, it is designed to handle hardware failure and data congestion issues very robustly. Other distributed systems make different trade-offs, as they intend to be used for problems with other requirements (e.g., high security).

In addition to worrying about these sorts of bugs and challenges, there is also the fact that the compute hardware has finite resources available to it. The major resources include:

* Processor time
* Memory
* Hard drive space
* Network bandwidth

Individual machines typically only have a few gigabytes of memory. If the input data set is several terabytes, then this would require a thousand or more machines to hold it in RAM -- and even then, no single machine would be able to process or address all of the data.

Hard drives are much larger; a single machine can now hold multiple terabytes of information on its hard drives. But intermediate data sets generated while performing a large-scale computation can easily fill up several times more space than what the original input data set had occupied. During this process, some of the hard drives employed by the system may become full, and the distributed system may need to route this data to other nodes which can store the overflow.

Finally, bandwidth is a scarce resource even on an internal network. While a set of nodes directly connected by a gigabit Ethernet may generally experience high throughput between them, if all of the machines were transmitting multi-gigabyte data sets, they can easily saturate the switch's bandwidth capacity. Additionally if the machines are spread across multiple racks, the bandwidth available for the data transfer would be much less. Furthermore RPC requests and other data transfer requests using this channel may be delayed or dropped.

To be successful, a large-scale distributed system must be able to manage the above mentioned resources efficiently. Furthermore, it must allocate some of these resources toward maintaining the system as a whole, while devoting as much time as possible to the actual core computation.

Synchronization between multiple machines remains the biggest challenge in distributed system design. If nodes in a distributed system can explicitly communicate with one another, then application designers must be cognizant of risks associated with such communication patterns. It becomes very easy to generate more remote procedure calls (RPCs) than the system can satisfy! Performing multi-party data exchanges is also prone to deadlock or race conditions. Finally, the ability to continue computation in the face of failures becomes more challenging. For example, if 100 nodes are present in a system and one of them crashes, the other 99 nodes should be able to continue the computation, ideally with only a small penalty proportionate to the loss of 1% of the computing power. Of course, this will require re-computing any work lost on the unavailable node. Furthermore, if a complex communication network is overlaid on the distributed infrastructure, then determining how best to restart the lost computation and propagating this information about the change in network topology may be non trivial to implement.

Friday, November 14, 2014

Pig: the Silent Hero

SQL on Hadoop has been extensively covered in the media in the last year. Pig, being a well-established technology, has been largely overlooked though Pig as a Service was a noteworthy development. Considering Hadoop as a data platform though requires Pig and an understanding why and how it is important.  Data users are generally trained in using SQL, a declarative language, to query for data for reporting, analytic and ad-hoc explorations. SQL does not describe how the data is processed; it is more declarative and appeals to a lot of data users. ETL(Extract, Transform and Load)  processes, which are developed by data programmers, benefit and sometimes even require the ability to detail the data transformation steps. At times ETL programmers like a procedural language as opposed to a declarative language. Pig’s programming language, Pig Latin, is procedural and gives programmers control over every step of the processing.  Business users and programmers work on the same data set yet usually focus on different stages. The programmers commonly work on the whole ETL pipeline, i.e. they are responsible to clean and extract the raw data, transform it and load it into third party systems. Business users either access data on third party systems or access the extracted and transformed data for analysis and aggregation. The requirement of diverse tooling is therefore important as the interaction patterns with the same data set are divers.  Importantly, complex ETL workflows need management, extensibility, and test-ability to ensure stable and reliable data processing. Pig provides strong support on all aspects. Pig jobs can be scheduled and managed with workflow tools like Oozie to build and orchestrate large scale, graph-like data pipelines.  Pig achieves extensibility with UDFs (User Defined Function), which let programmers add functions written in one of many programming languages. The benefit of this model is that any kind of special functionality can be injected and that Pig and Hadoop manage the distribution and parallel execution of the function on potentially huge data sets in an efficient manner. This allows the programmers to focus on adding and solving specific domain problems, e.g. like rectifying specific data set anomalies or converting data formats, without worrying about the complexity of distributed computing.  Reliable data pipelines require testing before deployment in production to ensure correctness of the numerous data transformation and combination steps. Pig has features supporting easy and testable development of data pipelines. Pig supports unit tests, an interactive shell, and the option to run in a local mode, which allows it to execute programs in a fashion not requiring a Hadoop cluster. Programmers can use these to test their Pig programs in detail with test data sets before they ever enter production and also help them try out ideas quickly and inexpensively, which is essential for fast development cycles.  None of these features are particularly glamorous yet they are important to evaluate Hadoop and data processing with it. The choice of leveraging Pig for a big data project can easily make the difference between success and failure.  

Wednesday, November 12, 2014

Hadoop Distributed File System(HDFS)

**** BLOCKS ****
All filesystem has a block size, a block size refers to the minimum amount of data that it can read or write. Filesystem blocks are typically a few kilobytes of normally 512 bytes.
Whereas in hadoop filesystem are having a much larger size of blocks i.e 64MB by default.

Now a question arises why the block system of hadoop are of big size ?

To start this - HDFS is meant to handle large files. If you have small files, smaller block sizes are better. If you have large files, larger block sizes are better.
For large files, lets say you have a 1000MB file. With a 4k block size, you'd have to make 256,000 requests to get that file (1 request per block). In HDFS, those requests go across a network and come with a lot of overhead. Each request has to be processed by the Name Node to figure out where that block can be found. That's a lot of traffic! isn't it . If you use 64Mb blocks, the number of requests goes down to 16, greatly reducing the cost of overhead and load on the Name Node.

**** Namenodes and Datanodes ****
Hadoop cluster has two types of nodes which are operating in a master-slave pattern.
1) A Namenode (Master)
2) 'N' Datanodes (Slave)
* N is the number of machines.

The Namenode contains the filesystem namespace. It has the filesystem tree information and the metadata for all files and directories within the tree.This information is stored on the regular interval in the local disk in two formats :
1) Namespace image
2) Edit logs

NOTE : A Namenode also knows the datanodes on which the blocks for a given file is been stored. Namenode does not store the block location on the regular basis as the location of block is always rewriten when ever the system started.

A client always access the files with communicating with Namenode and Datanodes.

Datanode stores and retrieve the blocks when ever they asked to do by client or by Namenode. Dataode also report back to the Namenode about the information of list of block they are storing.
Without the namenode the filesystem or we can say the datanodes or in other words the blocks which contains our data cannot be accessed. This is the single point of failure till hadoop 1.x, but from hadoop 2.x we have different architecture of namenode configuration which contains the secondary namenode concepts.

The secondary Namenode also has serious bottleneck, which could not solve the Single Point of Failure of hadoop cluster.

In the upcoming post, I will explain more about the recent work about this issue.

Sunday, November 9, 2014

Let's start with Map-Reduce

As this is my first post regarding the Map-Reduce Programming, I will be trying to focus on the important points in a Map-Reduce framework.

Map-Reduce works by breaking the processing into two phases. 
1) The Map phase
2) The Reduce phase
Each phase (i.e map and reduce phase) has key-value pair as input and output.
The type of key value pair is been decided by a programmer as per the requirement.
The programmer also specify two function in each phase. The map function and reduce function respectively.
The map function is just the data preparation phase, setting up the data in such a way that the reducer function can do its work on it.
Note : The map function is a good place to drop the bad records.
The output from the map function is processed by the Map-Reduce framework before being sent to the reduce function.
Note : This processing Sorts and groups the key-value pairs by key.
The Mapper class is a generic type, with four formal type parameters that specifies the input key, input value, output key, output value types of the map function.
The map() method also provides an instance of Context to write the output.
In the reduce function again their are four formal type parameters are used to specify the input and output types.
Note : The input types of the reduce function must be of same match as of mapper output function.
Job - A job object forms the specification of the job and gives you control over how the job is run. When we run the job on Hadoop cluster we will package the code into jar file (which Hadoop will distribute around the cluster). Rather then explicitly specify the name of the jar file we can pass the class in the job's setJarByClass() method, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class.
In Job object, we specify the input and output paths. An input path is specified by calling the static addInputPath() method on FileInputFormat , and it can be a single file, a directory, or a file pattern.
Note : addInputPath() can be called more than once to use input from multiple paths.
Note : MultipleInputs class supports Map-Reduce jobs that have multiple input paths with a different InputFormat and Mapper for each path.
The output path (of which there is only one) is specified by the static setOutput Path() method on FileOutputFormat . It specifies a directory where the output files from the reducer functions are written. The directory shouldn’t exist before running the job because Hadoop will complain and not run the job.

Tuesday, November 4, 2014

Hashtable : Most useful java class to modify the meta-data management of Hadoop

Hashtables are an extremely useful mechanism for storing data. Hashtables work by mapping a key to a value, which is stored in an in-memory data structure. Rather than searching through all elements of the hashtable for a matching key, a hashing function analyses a key, and returns an index number. This index matches a stored value, and the data is then accessed. This is an extremely efficient data structure, and one all programmers should remember.
Hashtables are supported by Java, in the form of the java.util.Hashtable class. Hashtables accept as keys and values any Java object. You can use a String, for example, as a key, or perhaps a number such as an Integer. However, you can't use a primitive data type, so you'll need to instead use Char, Integer, Long, etc.
   // Use an Integer as a wrapper for an int
   Integer integer = new Integer ( i );
   hash.put( integer, data);
Data is placed into a hashtable through the put method, and can be accessed using the get method. It's important to know the key that maps to a value, otherwise its difficult to get the data back. If you want to process all the elements in a hashtable, you can always ask for an Enumeration of the hashtable's keys. The get method returns an object, which can then be cast back to the original object type.
   // Get all values with an enumeration of the keys
   for (Enumeration e = hash.keys(); e.hasMoreElements();)
       String str = (String) hash.get( e.nextElement() );
       System.out.println (str);
To demonstrate hashtables, I've written a little demo that adds one hundred strings to a hashtable. Each string is indexed by an Integer, which wraps the int primitive data type.  Individual elements can be returned, or the entire list can be displayed. Note that hashtables don't store keys sequentially, so there is no ordering to the list.

import java.util.*;

public class hash {
  public static void main (String args[]) throws Exception {
    // Start with ten, expand by ten when limit reached
    Hashtable hash = new Hashtable(10,10);

    for (int i = 0; i <= 100; i++)
 Integer integer = new Integer ( i );
 hash.put( integer, "Number : " + i);

    // Get value out again
    System.out.println (hash.get(new Integer(5)));
    // Get value out again
    System.out.println (hash.get(new Integer(21)));;

    // Get all values
    for (Enumeration e = hash.keys(); e.hasMoreElements();)
 System.out.println (hash.get(e.nextElement()));