pig中的表的都是装在内存中的,如果pig命令行退出后这些表也不复存在。
1、需要把Hadoop的JobHistory Server启动
mr-jobhistory-daemon.sh start historyserver
Web Console:http://ip:19888/jobhistory
2、常用的PigLatin语句
(*)load:加载数据,创建表,相当于create table
(*)foreach:是一个循环,对表中的每一行进行处理
(*)group by 分组
(*)filter:过滤,相当于where
(*)join:连接,多表查询
(*)union、intersect:集合运算
(*)generate:提取列,相当于:select 列1,列2,列3 ******
以上的语句,都不会立即触发计算;只有下面的语句才会立即执行MapReduce
(*)dump:打印在屏幕
(*)store:输出到文件
对应Spark中,算子有两种
(1)Transformation:延迟计算
(2)Action:触发计算
3、通过PigLatin分析数据:数据 emp.csv,dept.csv
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
(1)创建员工表
emp = load '/scott/emp.csv';
查看表结构
describe emp; ---> Schema for emp unknown.
因为我们创建表的时候并没有制定表结构,所有显示emp的约束为未知
(2)创建员工表和表结构:默认的数据类型:bytearray
emp = load '/scott/emp.csv' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);
因为没有指定列对应的数据类型,所以默认的字节数据
然后我们使用dump查看一下表的数据
然后我们就看到很多的逗号,这是因为我们在创建表的时候没有对csv总数据指定分割符而是使用了pig默认的分割符
创建表,表结构,列的类型,指定分隔符
emp = load '/scott/emp.csv' using PigStorage(',') as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
创建部门表
dept = load '/scott/dept.csv' using PigStorage(',') as(deptno:int,dname:chararray,loc:chararray);
(3)join:查询员工信息:员工姓名、部门名称
SQL:select ename,dname
from emp,dept
where emp.deptno=dept.deptno;
PL:t31 = join dept by deptno,emp by deptno; ---> 不会立即执行计算
emp中的数据:
dept中的数据:
t31中的数据:
(10,ACCOUNTING,NEW YORK,7934,MILLER,CLERK,7782,1982/1/23,1300,0,10)
(10,ACCOUNTING,NEW YORK,7839,KING,PRESIDENT,-1,1981/11/17,5000,0,10)
(10,ACCOUNTING,NEW YORK,7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10)
(20,RESEARCH,DALLAS,7876,ADAMS,CLERK,7788,1987/5/23,1100,0,20)
(20,RESEARCH,DALLAS,7788,SCOTT,ANALYST,7566,1987/4/19,3000,0,20)
(20,RESEARCH,DALLAS,7369,SMITH,CLERK,7902,1980/12/17,800,0,20)
(20,RESEARCH,DALLAS,7566,JONES,MANAGER,7839,1981/4/2,2975,0,20)
(20,RESEARCH,DALLAS,7902,FORD,ANALYST,7566,1981/12/3,3000,0,20)
(30,SALES,CHICAGO,7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30)
(30,SALES,CHICAGO,7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30)
(30,SALES,CHICAGO,7698,BLAKE,MANAGER,7839,1981/5/1,2850,0,30)
(30,SALES,CHICAGO,7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30)
(30,SALES,CHICAGO,7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30)
(30,SALES,CHICAGO,7900,JAMES,CLERK,7698,1981/12/3,950,0,30)
t32 = foreach t31 generate dept::dname,emp::ename; ---> 不会立即执行计算
dump t32; -----> 立即执行计算
(4)查询员工信息:员工号,姓名和薪水
SQL: select empno,ename,sal from emp;
PL: emp4 = foreach emp generate empno,ename,sal; ---> 不会立即执行计算
dump emp4; -----> 立即执行计算
(5)查询员工信息:按照薪水排序
SQL:select * from emp order by sal;
PL: emp5 = order emp by sal; ---> 不会立即执行计算(延迟计算)
dump emp5; -----> 立即执行计算
(6)分组:求每个部门工资的最大值
SQL:select deptno,max(sal) from emp group by deptno;
PL: 第一步:分组
emp61 = group emp by deptno;
表结构
emp61: {group: int,
emp: {(empno: int,ename: chararray,job: chararray,mgr: int,hiredate: chararray,sal: int,comm: int,deptno: int)}}
数据 dump emp61;
(10,{(7934,MILLER,CLERK,7782,1982/1/23,1300,0,10),
(7839,KING,PRESIDENT,-1,1981/11/17,5000,0,10),
(7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10)})
(20,{(7876,ADAMS,CLERK,7788,1987/5/23,1100,0,20),
(7788,SCOTT,ANALYST,7566,1987/4/19,3000,0,20),
(7369,SMITH,CLERK,7902,1980/12/17,800,0,20),
(7566,JONES,MANAGER,7839,1981/4/2,2975,0,20),
(7902,FORD,ANALYST,7566,1981/12/3,3000,0,20)})
(30,{(7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30),
(7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30),
(7698,BLAKE,MANAGER,7839,1981/5/1,2850,0,30),
(7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30),
(7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30),
(7900,JAMES,CLERK,7698,1981/12/3,950,0,30)})
第二步:最高工资
emp62 = foreach emp61 generate group,MAX(emp.sal);
(7)执行WordCount
① 加载数据 ----> 延迟计算
mydata = load '/input/data.txt' as (line:chararray);
② 将字符串分割成单词 ----> 延迟计算
words = foreach mydata generate flatten(TOKENIZE(line)) as word;
③ 对单词进行分组 ----> 延迟计算
grpd = group words by word;
④ 统计每组中单词数量 ----> 延迟计算
cntd = foreach grpd generate group,COUNT(words);
⑤ 打印结果 ----> 执行计算
dump cntd;
Pig的自定义函数:过滤函数,运算函数,加载函数
需要的jar包
$PIG_HOME/pig-0.17.0-core-h2.jar
$PIG_HOME/lib
$PIG_HOME/lib/h2
$HADOOP_HOME/share/hadoop/common
$HADOOP_HOME/share/hadoop/common/lib
1、自定义的过滤函数:相当于where语句
举例:查询薪水大于3000的员工
package demo;
import java.io.IOException;
import org.apache.pig.FilterFunc;
import org.apache.pig.data.Tuple;
public class IsSalaryTooHigh extends FilterFunc
{
@Override
public Boolean exec(Tuple tuple) throws IOException {
int sal = (Integer)tuple.get(0);
return sal>3000?true:false;
}
}
2、自定义的运算函数:求表达式的值
举例:根据员工的薪水,判断级别
sal<=1000 返回 Grade A
1000<sal<=3000 返回 Grade B
sal>3000 返回 Grade C
package demo;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
public class CheckSalaryGrade extends EvalFunc<String>
{
@Override
public String exec(Tuple tuple) throws IOException {
int sal = (Integer)tuple.get(0);
/*sal<=1000 返回 Grade A
0100<sal<=3000 返回 Grade B
sal>3000 返回 Grade C*/
if (sal>3000 )
{
return "Grade C";
}else if (sal>1000) {
return "Grade B";
}else {
return "Grade A";
}
}
}
3、自定义的加载函数
还需要MapReduce的jar包
$HADOOP_HOME/share/hadoop/mapreduce
$HADOOP_HOME/share/hadoop/mapreduce/lib
package demo;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
public class MyloadFunction extends LoadFunc
{
private RecordReader reader;
@Override
public InputFormat getInputFormat() throws IOException {
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
Tuple tuple=null;
try
{
if (!reader.nextKeyValue())
{
return null;
}
//创建一个返回的结果
tuple = TupleFactory.getInstance().newTuple();
//获取数据: I love Beijing
Text value = (Text) this.reader.getCurrentValue();
String data = value.toString();
//分词
String[] words = data.split(" ");
//创建表
DataBag bag = BagFactory.getInstance().newDefaultBag();
for(String w:words) {
Tuple one = TupleFactory.getInstance().newTuple();
//把单词放在tuple上
one.append(w);
//再把tuple放入表
bag.add(one);
}
//最后,把表放入tuple
tuple.append(bag);
} catch (Exception e)
{
// TODO: handle exception
}
return tuple;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
this.reader=reader;
}
@Override
public void setLocation(String path, Job job) throws IOException {
FileInputFormat.setInputPaths(job, path);
}
}
当这些程序写完了之后需要使用pig的register命令注册一下
register /root/training/pigdemo.jar
也可以对里边的具体方方法使用define命令给函数起个别名,暂时不操作,最后测试一下
然后使用自定义的函数进行测试;
emp1 =filter emp by demo.IsSalaryTooHigh(sal);.
emp2 =foreach emp generate ename ,demo.CheckSalaryGrade(sal);
mydata =load '/input/data.txt' using demo.MyloadFunction();
最后我们测试一下define命令
define isSTH demo.IsSalaryTooHigh();
emp1 =filter emp by isSTH(sal);
最后效果是一样的