Data Analysis Using Pig

What is Pig

Apache Pig is a platform for analyzing large data sets. Pig's language, Pig Latin, is a simple query algebra that lets you express data transformations such as merging data sets, filtering them, and applying functions to records or groups of records. Users can create their own functions to do special-purpose processing. Pig Latin queries execute in a distributed fashion on a cluster. Our current implementation compiles Pig Latin programs into MapReduce jobs, and executes them using Hadoop cluster.

Design Philosophy

Pig Latin is designed with the following philosophies

  • Pig can eat everything - meaning it can process variety of data formats
  • Pig live anywhere - Pig can run not only on Hadoop platform but in others clustered computing system as well
  • Pigs are domestic - Pig is adaptable and contains a rich set of UDF's
  • Pigs can fly - processes data fast!

Criticism

While Pig Latin is extremely popular, critics point out that Pig Latin scripting language is not reusable. It is only used by Pig.

Popularity of Pig

With rise of Apache spark, popularity of Pig is laying flat. New workload suitable for Pig, are getting implemented in Apache Spark.

Data Types in Pig

Scalar Type

Complex Type

  • int
  • long
  • float
  • double
  • boolean
  • datetime (formatted as MM-dd-yyyy-HH-mm-ss, coverted using toDate)
  • chararray
  • bytearray
  • map - key value pair. Key is a chararray, value is of any type
  • tuple - ordered collection of fields.
  • bag - unordered collection of tuples. It is like a row in RDBMS.
  • relation - collection of bags/tuples. It like a table in RDBMS

Launch grunt

$ pig

Create a pig relation using HDFS data stored stocks dir.

grunt> STOCKS = LOAD '/user/cloudera/stocks'
USING PigStorage(',') AS
(date: chararray, 
open: double, 
high: double, 
low: double, 
close: double, 
volume: double, 
adjClose: double, 
symbol: chararray);

In Pig default data storage format is PigStorage and default delimiter is '\t'.

View relation description

grunt> DESCRIBE STOCKS; -- This is similar to describe <table name> in SQL

View fields, data type of fields and a sample row from a relation.

grunt> ILLUSTRATE STOCKS;

View computation plan using explain. Explain is an advanced feature that shows the data flow including map reduce jobs.

grunt> EXPLAIN STOCKS;

View STOCKS relation data. Caution: since it will show all the records, use it with care for large data sets.

grunt> dump STOCKS; # This is similar to select * from stocks. 

Create a new relation with 5 columns from STOCKS relation.

grunt> STOCKS_TOP5 = LIMIT STOCKS 5; 
grunt> DUMP STOCKS_TOP5;

Project symbol and volume columns from STOCKS relation. Cast the volume column to long data type.

grunt> SYMBOL_VOLUME = FOREACH STOCKS GENERATE symbol, (long) volume as volume; --Similar to select symbol, CAST(volume INT) from stocks;
grunt> ILLUSTRATE SYMBOL_VOLUME;

Store the SYMBOL_VOLUME relation into HDFS in csv format.

grunt> STORE SYMBOL_VOLUME INTO '/user/cloudera/stocks.lean' USING PigStorage(',');

grunt> fs -tail /user/cloudera/stocks.lean/part-m-00000;

Download the parts file using get-merge command

grunt> fs -getmerge /user/cloudera/stocks.lean/part-* stock-volumes.csv;

Create a hcatalog table for SYMBOL_VOLUME relation.

$ hcat -e "create table stocks_lean(symbol string, volume bigint) row format delimited fields terminated by ':' stored as textfile"
$ hcat -e "show tables"
$ hcat -e "describe stocks_lean"

Load relation into a hcatalog table.

grunt> STORE SYMBOL_VOLUME INTO 'stocks_lean' USING org.apache.hive.hcatalog.pig.HCatStorer();

Verify the data stored in hcat table by running a hive query.

$ hive -e "select * from stocks_lean limit 10"

Load the relation from a hcatalog table.

grunt> SYMBOLS = LOAD 'stocks_lean' USING org.apache.hive.hcatalog.pig.HCatLoader();
grunt> ILLUSTRATE SYMBOLS;

Count the total number of symbol.

grunt> SYMBOLS_COUNT = FOREACH (GROUP SYMBOLS ALL) GENERATE COUNT(SYMBOLS) PARALLEL 3; -- Using parallel keyword, setting the number of reducers to 3
grunt> DUMP SYMBOLS_COUNT;

Find date range from STOCKS relation

grunt> GROUPED = GROUP STOCKS ALL;
grunt> DATE_AGGR = FOREACH GROUPED GENERATE MIN(STOCKS.date) AS min, MAX(STOCKS.date) AS max;
grunt> DUMP DATE_AGGR;

Find date range from STOCKS relation

grunt> GROUPED = GROUP STOCKS ALL;
grunt> DATE_AGGR = FOREACH GROUPED GENERATE MIN(STOCKS.date) AS min, MAX(STOCKS.date) AS max;
grunt> DUMP DATE_AGGR;

Sample STOCKS relation selecting 0.01% data

grunt> SAMPLED = SAMPLE STOCKS 0.0001;
grunt> DUMP SAMPLED;

Find average volume of stocks traded in 2016

grunt> STOCKS2016 = FILTER STOCKS BY STARTSWITH(date, '2016');
grunt> GROUPED = GROUP STOCKS BY symbol;
grunt> VOL_AGGR = FOREACH GROUPED GENERATE group AS symbol, AVG(STOCKS.volume) AS avg;

grunt> VOL_AGGR_ORDERED = ORDER VOL_AGGR BY avg DESC;

grunt> DUMP VOL_AGGR_ORDERED;

Pig also supports join, union type operations as well.