Pig Progrmming for Big Data

Pig is a high-level platform, to extend data processing with Hadoop MapReduce. Pig's language is called Pig Latin. Pig is a suitable tool for computing summry statistics from data.

Pig can be exeuted in two modes:

  • local mode: runs on a single JVM and can only access the local filesystem
  • distributed mode: runs on a Hadoop cluster, accessing HDFS, and translates queries into MapReduce jobs

Pig Latin:

  • Interacting with HDFS
  • Mainpulate data that is sitting on HDFS

Pig performs data processing in a directed acyclic graph (DAG) where each node represents an operation (alias) applied to the data. Such operations consists of two types

  1. Relation algebra operations like join, filter, project
  2. Functional programming style operations such as map, reduce, etc

Running pig

  • Using the interactive shell called grunt

    • Running pig on local mode: pig -x local
    • Running on distributed mode: pig -x mapreduce
  • Execute a pig script from command line E.g. pig myscript.pig

  • Embed your pig query into a Java program

    • Write the java program for your application
    • Compile the program
    • Execute the program

Grunt interactive shell commands

  • grunt> exec myscript.pig execute in a separate forked process
  • grunt> run myscript.pig run in the same process

Grunt also recognizes UNIX shell commands:

  • grunt> ls

Pig Latin

A high level overview:

  • Read/write from/to HDFS
  • Data types
  • Diagnostic
  • Expressions and functions
  • Relational operations (UNION, JOIN, FILTER, etc)
  • No supporting command for insert, delete, update

Pig Latin Workflow

Writing Pig scripts to process data:

  • Load data from local/HDFS into a new alias alias = LOAD "filename" AS (...);
  • Manipulate the data usin relation operations
    • For each operation/step create a new alias new_alias = pig_operation(old_alis);
  • Dump the final alis to display the output, or store in HDFS/local directory DUMP resut_alias;

Comments in a Pig script are denoted by "--".

  • Load data into an alias
-- alias = LOAD filename AS (..);
alias = LOAD 'input.txt' AS (attr1, attr2);

-- comma separate values   
-- mydata = LOAD filename USING PigStorage(',') ..;  
mydata = LOAD 'input.txt'   
         USING PigStorage(',')   
         AS (attr1, attr2, ..);
  • Manipulate the alias using relational operations

If you don't specify the name of attributes, they will be defined by $0, $1, $2, ..

Comparing SQL and Pig

Although, the syntax of Pig latin is different than SQL, but the workflow of the two are quite similar.

SQL

-- display username and address of those with age>30
Select Username,Address 
From UserTable
Where Age>30;

Pig

-- step 1: load the data
UserTable = LOAD 'usertable.txt' USING PigStorae(',') AS (Username:chararray, Address:chararray);
-- step 2: filter the dataset based on age>30
Subset = FILTER UserTable BY age > 30;
-- step 3: display the output
DUMP Subset;

Pig Read-Write Operations

Operation Description
LOAD Load data in a file. Usage: alias = LOAD 'filename' [USING function] [AS schema];
LIMIT Limit the number of tuples in a given alias. Usage: alias = LIMIT old_alias n;
DUMP Display the content of an alias. Usage: DUMP alias;
STORE Store the content of an alias into a directory. Usage: STORE alias INTO 'directory' [USING function];

Defining Table Schema

alias = LOAD 'filename' [USING function] [AS schema];
  • By default, Pig assumes columns/fields are separated by tabs.
  • Use the built-in function PigStorage() to specify the separator
  • Define the schema, attribute names and their types
Example:

We have the following input file, attribues are comma-separated,

mydata = LOAD 'filename' USING PigStorage(',') AS ();

Example workflow

data = LOAD '/user/hduser/wikipedia/wiki_edits/txt' 
       USING PigStorage(',')
       AS (rev, aid, rid, article, ts, uname, uid);

grp = GROUP data BY article; -- output has two columns: group,data

counts = FOREACH grp GENERATE group, COUNT(data);

results = LIMIT counts 4;
DUMP results;

STORE counts INTO 'output.txt' USING PigStorage(',');

Atomic Data Types

Data Types
int
long
float
double
chararray
bytearray
data = LOAD 'input.txt' 
       USING PigStorage(',')
       AS (rev:chararray, aid:int, ..);

If you don't specify the type, the attributes will be of type bytearray, the most generic data type.

More complex data types

Complex Data Types
Tuple
Bag
Map
$ hadoop fs -cat data.txt
(1,2,3) (4,5,6)
(4,5,3) (3,3,2)
grunt> A = LOAD 'data.txt' 
           AS (t1:tuple(t1a:int, t1b:int, t1c:int),
               t2:tuple(t2a:int, t2b:int, t2c:int));

DUMP A;

Relational Operations in Pig

  • FOREACH
  • FILTER
  • ORDER BY
  • SPLIT
  • UNION
  • DISTINCT
  • GROUP
  • JOIN

FOREACH

grades = LOAD 'greades.txt' 
         USING PigStorage(',')
         AS (name:chararray, hw1:int, hw2:int, hw3:int);

hwtotals = FOREACH grades GENERATE name, hw1+hw2+hw3;

DUMP hwtotals;

The result will be:

(Alex,67)
(John,79)
(Lee,73)

Renaming a column vector:

titels = FOREACH movies GENERATE $1 AS title;
  • Using Regular Expressions to extract fields:

Assume the input is given as

Samsung TV (499.99)
iPhone 6s (650.00)

and we wat to extract the item names, which is everything from the begining of the line upto the openning paranthesis "("

REGEXP_EXTRACT

FILTER

Equivalent to WHERE clause in SQL:

SELECT columns FROM tablename
WHERE expression;

Pig syntax: FILTER tablename BY expression

filtered_result = FILTER tablename BY exoression;

Exmaple:

best = FILTER  hwtotals BY $1 > 80;

ORDER BY

Pig syntax:

sorted_result = ORDER tablename BY column1 DESC [, column2 ASC];

Example:

result = ORDER hwtotals BY name ASC;

SPLIT

Splits a table into two based on a condition.

SQL equivalnce:

SELECT T1

Pig syntax

split_alias = SPLIT tablename 
              INTO t1 IF col1>s,
                   t2 IF col1<=s;

UNION

To obtain union of two tables

DISTINCT

uniqs = DISTINCT dups;

In pig, distinct is applied to a table, not to an exression (as is done in SQL).

GROUP

Syntax:

grp = GROUP tablename BY colum1;
Example:
data = LOAD 'data.txt' 
       USING PigStorage(',') 
       AS(name:chararray, score:int);

grps = GROUP data by name;

totalscores = FOREACH grps 
              GENERATE group AS name,
                       SUM(data.$1) AS total;

GROUP ALL

JOIN

alias = JOIN table1 BY col1, table2 BY col2;
Example:
grades = LOAD 'grades.txt' USING PigStorage(',') 
         AS (name:chararray, hw1:int, hw2:int, hw3:int);

majors = LOAD 'majors.txt' USING PigStorage(',') 
         AS (name:chararray, dept:chararray);

transcript = JOIN majors BY name, grades BY name;

Built-in Functions in Pig

  • AVG
  • CONCAT
  • COUNT
  • DIFF
  • MAX
  • MIN
  • SIZE
  • SUM
  • TOKENIZE
  • FLATTEN

TOKENIZE

FOREACH docs GENERATE TOKENIZE($0) AS words;

FLATTEN

Convert the bag into a list

allwords = FOREACH wordsbag GENERATE FLATTEN (wordsba

Pig Macros

To package reusable codes

DEFINE <macro> (<args>) RETURNS <returnvalue> {
    write the pig latin codes for macro definition.
}
Example
DEFINE wordcount(text) RETURNS counts {
    tokens = FOREACH $text 
             GENERATE TOKENIZE($0) 
             AS terms;

    wordlist = FOREACH tokens 
               GENERATE FLATTEN(terms) 
               AS word, 1 AS freq;

    groups = GROUP wordlist BY word;

    $counts = FOREACH groups 
              GENERATE group 
              AS word, SUM(wordlist.freq) AS freq;
}

Tokens:

terms : bags
{(a)(this)(that)}
{(cat)(is))}

wordlist:

word freq
a 1
this 1
that 1

groups:

group :

Pig User Defined Functions

Write your functions in Java.

first make sure that the functio you are intending to write is not already written by the Pig community. Check piggybank repository.

Types of functions

  • #### Eval functions

  • #### Filter functions

  • #### Store/Load functions

import java.io.*;
import java.util.*;

import org.apache.pig.FilterFunc; /* customizing FilterFunc */


public class PopularTerms extends FilterFunc {
    @override;

    public Boolean exec (Tuple tuple) throws IOException {
        // this code will filter frequency below threshold
    }
}

Piggy bank

A repositiry of Pig functions shared by the Pig community.

To locate the repository, look for piggybank.jar file:

ls $PIG_HOME/contrib/piggybank/java

Pig Programs Embedded in Java

import java.io.IoException;
import org.apache.pig.PigServer;

public class idmapreduce {
    public static void main(String[] args) {
}

Compile:

javac -cp \$PIG_HOME/pig-0.11.0.jar idmapreduce.java

Running:

java -cp ".." idmapreduce

In [ ]: