Wednesday, December 24, 2014

Improving Query Performance Using Partitioning & Bucketing in Apache Hive

To improve the Query performance we can go for

1)Partitioning tables
 
   a)Manual partition
   b)Dynamic partiton

2)Bucketing

Manual partition:

In Manual partition we are partitioning the table using partition variables. Here in our dataset we are
trying to partition by country and city names.

create table if not exists empl_part (empid int,ename string,salary double,deptno int)
comment 'manual partition example'
partitioned by (country string,city string)
row format delimited
fields terminated by ','


-- data for UK

load data local inpath 'uk_edinburgh.csv' into table empl_part partition(country='UK',city='EDIN');
load data local inpath 'uk_london.csv' into table empl_part partition(country='UK',city='LON');

-- data for usa

load data local inpath 'usa_newyork.csv' into table empl_part partition(country='US',city='NY');
load data local inpath 'usa_california.csv' into table empl_part partition(country='US',city='CF');



-- count(*)  -- count(1) where counrty and city
-- select * from employee order by empid asc;

-- to stop cli from querying the entire database

set hive.mapred.mode=strict;
--nonstrict  -- to make sure that all rows are not selected cos its may cause a massive mapreduce job

> select * from empl_part order by empid asc;
-- error FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "employee" Table "employee"


show partitions empl_part; (Lists available partitions)

country=UK/city=EDIN
country=UK/city=LON
country=US/city=CF
country=US/city=NY

select * from empl_part where country='UK' order by empid asc;

When i tried using manual partition there is significant change in query performance compare to
querying on a normal table.


Dynamic partitioning

DP columns are specified the same way as it is for SP columns – in the partition clause. The only difference is that DP columns do not have values, while SP columns do. In the partition clause, we need to specify all partitioning columns, even if all of them are DP columns.
In INSERT ... SELECT ... queries, the dynamic partition columns must be specified last among the columns in the SELECT statement and in the same order in which they appear in the PARTITION() clause.

all DP columns – only allowed in nonstrict mode.

Process of dynamic partitioning

1) create a staging table
2) create the production table with partitions
3) load data from staging table into production table


CREATE TABLE IF NOT EXISTS EMPL_STAGE(EMPNO INT,ENAME STRING,SAL INT,DEPTNO INT,COUNTRY STRING,CITY STRING,DATE_JOINING STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA INPATH 'emp' into table EMPL_STAGE;

CREATE TABLE IF NOT EXISTS EMPL_PROD(EMPNO INT,ENAME STRING,SAL INT,COUNTRY STRING,CITY STRING,DATE_JOINING STRING)
PARTITIONED BY (DEPTNO INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';


-- set session properties
set  hive.exec.dynamic.partition=true;
set  hive.exec.dynamic.partition.mode=nonstrict;

INSERT OVERWRITE TABLE EMPL_PROD PARTITION(DEPTNO)
SELECT EMPNO,ENAME,SAL,COUNTRY,CITY,DATE_JOINING,DEPTNO as DEPTNO  FROM EMPL_STAGE

select count(*) from empl_stage where deptno=1;


Bucketing

When ever we do a select query on a table it has to go through whole table to retrieve the data.
which may become a performance issue and sometimes we may run out of memory.Here comes bucketing comes into the picture.

Hashing will be done on all the values internally and the values are dumped into buckets. As a result when ever we are firing a query the data will be fetched from a respective bucket.

Process of bucketing

1) create a staging table
2) create the production table with buckets
3) load data from staging table into production table



create external table nyse_daily_staging (
exchange string,
symbol string,
date_of_trading string,
open double,
high double,
low double,
close double,
volume int,
adj_close double)
row format delimited
fields terminated by '\t';


load data local inpath 'NYSE_daily' overwrite into table nyse_daily_staging;


create table nyse_daily_production (
exchange string,
symbol string,  
date_of_trading string,
open double,
high double,
low double,
close double,
volume int,
adj_close double)
clustered by (symbol) into 20 buckets
row format delimited
fields terminated by '\t';


--   assuming number of symbols are greater than the bucketsize

-- we need to enforce bucketing while loading data we set a hive parameter

set hive.enforce.bucketing=true ;


from nyse_daily_staging 
insert overwrite table nyse_daily_production
select * ;

tests
select * from nyse_daily_production where symbol = 'CBC' and open > 2.9;  160.599 sec
select * from nyse_daily_staging where symbol = 'CBC' and open > 2.9; 220.566 sec


1 comment:

  1. It is nice blog Thank you porovide important information and i am searching for same information to save my time Big Data Hadoop Online Course

    ReplyDelete