Pig is a high-level platform, to extend data processing with Hadoop MapReduce. Pig's language is called Pig Latin. Pig is a suitable tool for computing summry statistics from data.
Pig can be exeuted in two modes:
Pig Latin:
Pig performs data processing in a directed acyclic graph (DAG) where each node represents an operation (alias) applied to the data. Such operations consists of two types
Using the interactive shell called grunt
pig -x local
pig -x mapreduce
Execute a pig script from command line E.g. pig myscript.pig
Embed your pig query into a Java program
Writing Pig scripts to process data:
alias = LOAD "filename" AS (...);
new_alias = pig_operation(old_alis);
DUMP resut_alias;
Comments in a Pig script are denoted by "--".
-- alias = LOAD filename AS (..);
alias = LOAD 'input.txt' AS (attr1, attr2);
-- comma separate values
-- mydata = LOAD filename USING PigStorage(',') ..;
mydata = LOAD 'input.txt'
USING PigStorage(',')
AS (attr1, attr2, ..);
If you don't specify the name of attributes, they will be defined by $0, $1, $2, ..
Although, the syntax of Pig latin is different than SQL, but the workflow of the two are quite similar.
SQL
-- display username and address of those with age>30
Select Username,Address
From UserTable
Where Age>30;
Pig
-- step 1: load the data
UserTable = LOAD 'usertable.txt' USING PigStorae(',') AS (Username:chararray, Address:chararray);
-- step 2: filter the dataset based on age>30
Subset = FILTER UserTable BY age > 30;
-- step 3: display the output
DUMP Subset;
Operation | Description |
---|---|
LOAD | Load data in a file. Usage: alias = LOAD 'filename' [USING function] [AS schema]; |
LIMIT | Limit the number of tuples in a given alias. Usage: alias = LIMIT old_alias n; |
DUMP | Display the content of an alias. Usage: DUMP alias; |
STORE | Store the content of an alias into a directory. Usage: STORE alias INTO 'directory' [USING function]; |
alias = LOAD 'filename' [USING function] [AS schema];
We have the following input file, attribues are comma-separated,
mydata = LOAD 'filename' USING PigStorage(',') AS ();
data = LOAD '/user/hduser/wikipedia/wiki_edits/txt'
USING PigStorage(',')
AS (rev, aid, rid, article, ts, uname, uid);
grp = GROUP data BY article; -- output has two columns: group,data
counts = FOREACH grp GENERATE group, COUNT(data);
results = LIMIT counts 4;
DUMP results;
STORE counts INTO 'output.txt' USING PigStorage(',');
Data Types | |
---|---|
int | |
long | |
float | |
double | |
chararray | |
bytearray |
data = LOAD 'input.txt'
USING PigStorage(',')
AS (rev:chararray, aid:int, ..);
If you don't specify the type, the attributes will be of type bytearray, the most generic data type.
Complex Data Types | |
---|---|
Tuple | |
Bag | |
Map |
$ hadoop fs -cat data.txt
(1,2,3) (4,5,6)
(4,5,3) (3,3,2)
grunt> A = LOAD 'data.txt'
AS (t1:tuple(t1a:int, t1b:int, t1c:int),
t2:tuple(t2a:int, t2b:int, t2c:int));
DUMP A;
Renaming a column vector:
titels = FOREACH movies GENERATE $1 AS title;
Assume the input is given as
Samsung TV (499.99)
iPhone 6s (650.00)
and we wat to extract the item names, which is everything from the begining of the line upto the openning paranthesis "("
REGEXP_EXTRACT
To package reusable codes
DEFINE <macro> (<args>) RETURNS <returnvalue> {
write the pig latin codes for macro definition.
}
DEFINE wordcount(text) RETURNS counts {
tokens = FOREACH $text
GENERATE TOKENIZE($0)
AS terms;
wordlist = FOREACH tokens
GENERATE FLATTEN(terms)
AS word, 1 AS freq;
groups = GROUP wordlist BY word;
$counts = FOREACH groups
GENERATE group
AS word, SUM(wordlist.freq) AS freq;
}
Tokens:
terms : bags |
---|
{(a)(this)(that)} |
{(cat)(is))} |
wordlist:
word | freq |
---|---|
a | 1 |
this | 1 |
that | 1 |
groups:
group : | |
---|---|
Write your functions in Java.
first make sure that the functio you are intending to write is not already written by the Pig community. Check piggybank repository.
#### Eval functions
#### Filter functions
#### Store/Load functions
import java.io.*;
import java.util.*;
import org.apache.pig.FilterFunc; /* customizing FilterFunc */
public class PopularTerms extends FilterFunc {
@override;
public Boolean exec (Tuple tuple) throws IOException {
// this code will filter frequency below threshold
}
}
In [ ]: