Wednesday, December 24, 2014

Hadoop: The Definitive Guide, 3rd Edition

Book Description

With this digital Early Release edition of Hadoop: The Definitive Guide, you get the entire book bundle in its earliest form - the author's raw and unedited content - so you can take advantage of this content long before the book's official release. You'll also receive updates when significant changes are made. Ready to unleash the power of your massive dataset? With the latest edition of this comprehensive resource, you'll learn how to use Apache Hadoop to build and maintain reliable, scalable, distributed systems. It's ideal for programmers looking to analyze datasets of any size, and for administrators who want to set up and run Hadoop clusters.

This third edition covers recent changes to Hadoop, including new material on the new MapReduce API, as well as version 2 of the MapReduce runtime (YARN) and its more flexible execution model. You'll also find illuminating case studies that demonstrate how Hadoop is used to solve specific problems.

source: http://it-ebooks.info/book/635/

Improving Query Performance Using Partitioning & Bucketing in Apache Hive

To improve the Query performance we can go for

1)Partitioning tables
 
   a)Manual partition
   b)Dynamic partiton

2)Bucketing

Manual partition:

In Manual partition we are partitioning the table using partition variables. Here in our dataset we are
trying to partition by country and city names.

create table if not exists empl_part (empid int,ename string,salary double,deptno int)
comment 'manual partition example'
partitioned by (country string,city string)
row format delimited
fields terminated by ','


-- data for UK

load data local inpath 'uk_edinburgh.csv' into table empl_part partition(country='UK',city='EDIN');
load data local inpath 'uk_london.csv' into table empl_part partition(country='UK',city='LON');

-- data for usa

load data local inpath 'usa_newyork.csv' into table empl_part partition(country='US',city='NY');
load data local inpath 'usa_california.csv' into table empl_part partition(country='US',city='CF');



-- count(*)  -- count(1) where counrty and city
-- select * from employee order by empid asc;

-- to stop cli from querying the entire database

set hive.mapred.mode=strict;
--nonstrict  -- to make sure that all rows are not selected cos its may cause a massive mapreduce job

> select * from empl_part order by empid asc;
-- error FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "employee" Table "employee"


show partitions empl_part; (Lists available partitions)

country=UK/city=EDIN
country=UK/city=LON
country=US/city=CF
country=US/city=NY

select * from empl_part where country='UK' order by empid asc;

When i tried using manual partition there is significant change in query performance compare to
querying on a normal table.


Dynamic partitioning

DP columns are specified the same way as it is for SP columns – in the partition clause. The only difference is that DP columns do not have values, while SP columns do. In the partition clause, we need to specify all partitioning columns, even if all of them are DP columns.
In INSERT ... SELECT ... queries, the dynamic partition columns must be specified last among the columns in the SELECT statement and in the same order in which they appear in the PARTITION() clause.

all DP columns – only allowed in nonstrict mode.

Process of dynamic partitioning

1) create a staging table
2) create the production table with partitions
3) load data from staging table into production table


CREATE TABLE IF NOT EXISTS EMPL_STAGE(EMPNO INT,ENAME STRING,SAL INT,DEPTNO INT,COUNTRY STRING,CITY STRING,DATE_JOINING STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA INPATH 'emp' into table EMPL_STAGE;

CREATE TABLE IF NOT EXISTS EMPL_PROD(EMPNO INT,ENAME STRING,SAL INT,COUNTRY STRING,CITY STRING,DATE_JOINING STRING)
PARTITIONED BY (DEPTNO INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';


-- set session properties
set  hive.exec.dynamic.partition=true;
set  hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE EMPL_PROD PARTITION(DEPTNO)
SELECT EMPNO,ENAME,SAL,COUNTRY,CITY,DATE_JOINING,DEPTNO as DEPTNO  FROM EMPL_STAGE

select count(*) from empl_stage where deptno=1;


Bucketing

When ever we do a select query on a table it has to go through whole table to retrieve the data.
which may become a performance issue and sometimes we may run out of memory.Here comes bucketing comes into the picture.

Hashing will be done on all the values internally and the values are dumped into buckets. As a result when ever we are firing a query the data will be fetched from a respective bucket.

Process of bucketing

1) create a staging table
2) create the production table with buckets
3) load data from staging table into production table



create external table nyse_daily_staging (
exchange string,
symbol string,
date_of_trading string,
open double,
high double,
low double,
close double,
volume int,
adj_close double)
row format delimited
fields terminated by '\t';


load data local inpath 'NYSE_daily' overwrite into table nyse_daily_staging;


create table nyse_daily_production (
exchange string,
symbol string,  
date_of_trading string,
open double,
high double,
low double,
close double,
volume int,
adj_close double)
clustered by (symbol) into 20 buckets
row format delimited
fields terminated by '\t';


--   assuming number of symbols are greater than the bucketsize

-- we need to enforce bucketing while loading data we set a hive parameter

set hive.enforce.bucketing=true ;


from nyse_daily_staging 
insert overwrite table nyse_daily_production
select * ;

tests
select * from nyse_daily_production where symbol = 'CBC' and open > 2.9;  160.599 sec
select * from nyse_daily_staging where symbol = 'CBC' and open > 2.9; 220.566 sec


Tuesday, December 23, 2014

SerDe in Apache Hive

What is a SerDe?

The SerDe interface allows you to instruct Hive as to how a record should be processed. A SerDe is a combination of a Serializer and a Deserializer (hence, Ser-De). The Deserializer interface takes a string or binary representation of a record, and translates it into a Java object that Hive can manipulate. The Serializer, however, will take a Java object that Hive has been working with, and turn it into something that Hive can write to HDFS or another supported system. Commonly, Deserializers are used at query time to execute SELECT statements, and Serializers are used when writing data, such as through an INSERT-SELECT statement.

Note: will be updated further !

source:cloudera

Complex Data Types in HIVE

There are three complex types in hive.

Arrays: It is an ordered collection of elements.The elements in the array must be of the same type.

Map: It is an unordered collection of key-value pairs.Keys must be of primitive types.Values can be of any type.

Struct: It is a collection of elements of different types.


Examples:

Struct:

If the data pattern is like 100,John$Martin$Doe (customerID,Customer First middle and lastname seperated by '$')

create table cust_struct(
custid int,
name struct<fname:string,mname:string,lname:string>
)
row format delimited
fields terminated by ','
collection items terminated by '$';

Loading Data

!hdfs dfs -copyFromLocal cust-struct.dat /user/hive/warehouse/cust_struct;

Extracting data
select name.fname from cust_struct;


Map:

Data pattern:

100, John$Martin$Doe, home#01234$office#00000$mobile#9999

(last field is a map but map is also a collection terminated by '$' to identify the various key-value pairs )

create table cust_struct_map (
custid int,
name struct<fname:string,mname:string,lname:string>,
phone_nos map<string,int>
)
row format delimited
fields terminated by ','
collection items terminated by '$'
map keys terminated by '#';

Loading Data

!hdfs dfs -copyFromLocal cust-struct-map.dat /user/hive/warehouse/cust_struct_map;

Extracting data:

select name.fname,phone_nos["home"] from cust_struct_map;


Arrays:

Data pattern:

100,John$Martin$Doe,home#01234$office#00000$mobile#9999,abc@yahoo.com$doe@ooo.com

create table cust_struct_map_array(
custid int,
name struct<fname:string,mname:string,lname:string>,
phone_nos map<string,int>,
emails array<string>
)
row format delimited
fields terminated by ','
collection items terminated by '$'
map keys terminated by '#';

Loading Data

!hdfs dfs -copyFromLocal cust-struct-map-array.dat /user/hive/warehouse/cust_struct_map_array;

Extracting Data

select custid,emails from cust_struct_map_array;

Monday, December 22, 2014

What difference of RDBMS and Hive?

Hive resembles a traditional database by supporting SQL interface but it is not a full database.

Hive can be better called as data warehouse instead of database.

Hive enforces schema on read time whereas RDBMS enforces schema on write time.

In RDBMS, a table’s schema is enforced at data load time, If the data being loaded doesn’t conform to the schema, then it is rejected. This design is called schema on write.But Hive doesn’t verify the data when it is loaded, but rather when a it is retrieved. This is called schema on read.

Schema on read makes for a very fast initial load, since the data does not have to be read, parsed, and serialized to disk in the database’s internal format. The load operation is just a file copy or move.

Schema on write makes query time performance faster, since the database can index columns and perform compression on the data but it takes longer to load data into the database.

Hive is based on the notion of Write once, Read many times but RDBMS is designed for Read and Write many times.

In RDBMS, record level updates, insertions and deletes, transactions and indexes are possible. Whereas these are not allowed in Hive because Hive was built to operate over HDFS data using MapReduce, where fulltable scans are the norm and a table update is achieved by transforming the data into a new table.

In RDBMS, maximum data size allowed will be in 10’s of Terabytes but whereas Hive can 100’s Petabytes very easily.
As Hadoop is a batchoriented system, Hive doesn’t support OLTP (Online Transaction Processing) but it is closer to OLAP (Online Analytical Processing) but not ideal since there is significant latency between issuing a query and receiving a reply, due to the overhead of Mapreduce jobs and due to the size of the data sets Hadoop was designed to serve.

RDBMS is best suited for dynamic data analysis and where fast responses are expected but Hive is suited for data warehouse applications, where relatively static data is analyzed, fast
response times are not required, and when the data is not changing rapidly.

To overcome the limitations of Hive, HBase is being integrated with Hive to support record level operations and OLAP.


Hive is very easily scalable at low cost but RDBMS is not that much scalable that too it is very costly scale up.

source:http://hadooptutorial.info/

Saturday, December 20, 2014

Overview of IMPALA | IMPALA vs PIG & HIVE

What is IMPALA ?

Impala is a High-performance SQL Engine for vast amounts of data.


  • Massively parallel procesing
  • Inspired by google's Dremel project
  • Query latency measured in milliseconds
Impala runs on hadoop clusters
  • Can query data store in HDFS or HBase tables
  • Read and Write data in common hadoop file formats
Developed by cloudera
  • 100% open source under apache server licence
Impala supports a subset of SQL-92

Why to use IMPALA ?

Many benefits are the same as with Hive or PIG
  • More productive than writing Map-Reduce code 
  • Leverage existing knowledge of SQL
One benefit exclusive to impala is Speed
  • Highly optimized for queries
  • Almost 5times faster than Hive or Pig. Often 20 times faster or more
IMPORTANT:
Pig & Hive functions from your laptop where as the impala sits on Hadoop cluster on all data nodes.

Comparing Impala to Hive & Pig

Similarities:
  • Queries expressed in high-level languages
  • Alternatives to writting mapreduce code
  • Used to analyze data stored on Hadoop clusters
  • Impala shares the meta store with Hive (Tables created in Hive as visible in Impala (viceversa))
Contrasting Impala to Hive & Pig

  • Hive & Pig answers queries by running Mapreduce jobs.Map reduce over heads results in high latency.(even a trivial query takes 10sec or more)
  • Impala does not use mapreduce.It uses a custom execution engine build specifically for Impala.
  • Queries can complete in a fraction of sec.
  • Hive & Pig are best suited long-running batch processes (Data Transformation Tasks).Impala best for interactive/Adhoc queries.
  • Impala can't handle complex data types(Array,Map or Struct)
  • No support for binary data type.
If a node fails in Hive or Pig, They answers queries by running mapreduce jobs in other nodes. But,incase of impala if the node fails during a query,the query will fails and it has to be re-run

Relation Database vs Impala

Importance of Trash folder in HDFS

HDFS Trash Folder

The Hadoop trash feature helps prevent accidental deletion of files and directories. If trash is enabled and a file or directory is deleted using the Hadoop shell, the file is moved to the .Trash directory in the user's home directory instead of being deleted. Deleted files are initially moved to the Current sub-directory of the .Trash directory, and their original path is preserved. Files in .Trash are permanently removed after a user-configurable time interval. The interval setting also enables trash checkpointing, where the Current directory is periodically renamed using a timestamp. Files and directories in the trash can be restored simply by moving them to a location outside the .Trash directory.

Enabling and Disabling Trash

Go to the HDFS service.
Select Configuration > View and Edit.
Click the Gateway Default Group category.
Check or uncheck the Use Trash checkbox.
Click the Save Changes button.
Restart the HDFS service.
Setting the Trash Interval

Go to the HDFS service.
Select Configuration > View and Edit.
Click the NameNode Default Group category.
Specify the Filesystem Trash Interval property, which controls the number of minutes after which a trash checkpoint directory is deleted and the number of minutes between trash checkpoints. For example, to enable trash so that deleted files are deleted after 24 hours, set the value of theFilesystem Trash Interval property to 1440.

 Note: The trash interval is measured from the point at which the files are moved to trash, not from the last time the files were modified.

Click the Save Changes button.
Restart the HDFS service.

Source :Cloudera

Diffrence between Combiner and Reducer in Hadoop

Think of a Combiner as a function of your map output. This you can primarily use for decreasing the amount of data needed to be processed by Reducers. In some cases, because of the nature of the algorithm you implement, this function can be the same as the Reducer. But in some other cases this function can of course be different.

A combiner will still be implementing the Reducer interface. Combiners can only be used in specific cases which are going to be job dependent.

Difference # 1

One constraint that a Combiner will have, unlike a Reducer, is that the input/output key and value types must match the output types of your Mapper.

Difference  #2

Combiners can only be used on the functions that are commutative(a.b = b.a) and associative {a.(b.c) = (a.b).c} . This also means that combiners may operate only on a subset of your keys and values or may not execute at all, still you want the output of the program to remain same.

Difference  #3

Reducers can get data from multiple Mappers as part of the partitioning process. Combiners can only get its input from one Mapper.

source: http://blog.optimal.io/3-differences-between-a-mapreduce-combiner-and-reducer/ 

Friday, December 19, 2014

Comparison between MRv1 vs YARN (MRv2)

MRv1 uses the JobTracker to create and assign tasks to data nodes, which can become a resource bottleneck when the cluster scales out far enough (usually around 4,000 clusters).In general,Job tracker has to manage the resources and application

MRv2 (aka YARN, "Yet Another Resource Negotiator") has a Resource Manager for each cluster,which bothers about how many slots available? what if node fails? what is the capacity? and each data node runs a Node Manager. For each job, one slave node will act as the Application Master, monitoring resources/tasks, etc.


Thursday, December 18, 2014

How can we import MySQL data into HDFS | Working with Sqoop

Sqoop:


  • Sqoop is a tool that used to import SQL data from SQL databases to HDFS. It was devloped by cloudera.
  • It uses JDBC to talk to database.
  • Sqoop provides the java code that can import data to HDFS
  • After the java code generation,a map only mapreduce job is run to import the data.
  • By default 4mappers are run with 25% each.
Here i am providing few commands that can list the databases,tables and import the tables.

1: sqoop list-databases --connect jdbc:mysql://localhost/training_db --username root --password root
(This will list the available databases in your connection)

2:sqoop list-tables --connect jdbc:mysql://localhost/training_db --username root --password root

3:sqoop list-tables --connect jdbc:mysql://localhost/training_db --table user_log --fields-terminated-by '\t'  -m 1 --username root --password root


Install Hadoop in 10 Steps | pseudo distributed mode |Linux Platform

Step 1: Run the following command to install hadoop from yum
repository in a pseudo distributed mode

sudo yum install hadoop‐0.20‐conf‐pseudo

Step 2: Verify if the packages are installed properly
rpm ‐ql hadoop‐0.20‐conf‐pseudo


Step 3: Format the namenode
sudo ‐u hdfs hdfs namenode ‐format

Step 4: Stop existing services
$ for service in /etc/init.d/hadoop*
> do
> sudo $service stop
> done

Step 5: Start HDFS
$ for service in /etc/init.d/hadoop‐hdfs‐*
> do
> sudo $service start
> done

Step 6: Verify if HDFS has started properly (In the browser)
http://localhost:50070

Step 7: Create the /tmp directory
$ sudo ‐u hdfs hadoop fs ‐mkdir /tmp
$ sudo ‐u hdfs hadoop fs ‐chmod ‐R 1777 /tmp

Step 8: Create mapreduce specific directories
sudo -u hdfs hadoop fs -mkdir /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
sudo -u hdfs hadoop fs -chmod -R 1777 /var/lib/hadoop-hdfs/cache/mapred/mapred/staging
sudo -u hdfs hadoop fs -chown -R mapred /var/lib/hadoop-hdfs/cache/mapred

Step 9: Start MapReduce
$ for service in /etc/init.d/hadoop‐0.20‐
mapreduce‐*
> do
> sudo $service start
> done

Step 10: Verify if MapReduce has started properly (In Browser)
http://localhost:50030

Monday, December 15, 2014

How to change resolution of ubuntu in VMWARE Player

1)Go to Terminal
2) use command "xrandr" (with out quotes)
3)It will list all the available resolutions
4)set according to your screensize (ex: $xrandr -s 1366x768)