scala学习-scala读取Hbase表中数据并且做join连接查询

2023-11-05

在这里插入图片描述

1。业务需求:sparkSQL on hbase ,sparkSQL直接读取Hbase中的两个表,进行连接查询。
2。图示
这里写图片描述
绿色的线
上图中绿色的线是做过测试的,直接在hive中建表,然后load数据进去,数据文件是存储在HDFS上的。
(1)建表

create table mycase(
c_code string,
c_rcode string,
c_region string,
c_cate string,
c_start string,
c_end string,
c_start_m bigint,
c_end_m bigint,
c_name string,
c_mark string) 
row format delimited fields terminated by ',' stored as textfile; 

load data local inpath '/opt/moudles/spark-2.2.0-bin-hadoop2.7.data/data100/mycase.txt' overwrite into table mycase; 

create table p_case(
p_code string,  
p_status string,
p_isend int
)
row format delimited fields terminated by ',' stored as textfile;

load data local inpath '/opt/moudles/spark-2.2.0-bin-hadoop2.7.data/data100/p_case.txt' overwrite into table p_case; 

create table crime_man(
m_acode string,  
m_pcode string)
row format delimited fields terminated by ',' stored as textfile;

load data local inpath '/opt/moudles/spark-2.2.0-bin-hadoop2.7.data/data100/crime_man.txt' overwrite into table crime_man; 

create table wb(
w_id bigint,
w_region string,
w_wname string,
w_address string,
w_uname string,
w_code string,
w_start string,
w_end string,
w_start_m bigint,
w_end_m bigint
) 
row format delimited fields terminated by ',' stored as textfile; 

load data local inpath '/opt/moudles/spark-2.2.0-bin-hadoop2.7.data/data100/wbfile.txt' overwrite into table wb;  

create table hotel(
h_id bigint,
h_region string,
h_hname string,
h_address string,
h_uname string,
h_code string,
h_start string,
h_end string,
h_start_m bigint,
h_end_m bigint,
h_homecode string) 
row format delimited fields terminated by ',' stored as textfile; 

load data local inpath '/opt/moudles/spark-2.2.0-bin-hadoop2.7.data/data100/hotelfile.txt' overwrite into table hotel;  

(2)添加数据
mycase.txt

A0,7,杭州市萧山区,杀人案件,2006/06/23 00:00:00,2006/06/23 21:00:00,1150992000000,1151067600000,案件名称0,暂无
A1,0,杭州市其他区,刑事案件,2006/06/25 00:00:00,2006/06/25 09:00:00,1151164800000,1151197200000,案件名称1,暂无
A2,1,杭州市上城区,强奸案件,2006/06/28 00:00:00,2006/06/28 10:00:00,1151424000000,1151460000000,案件名称2,暂无
A3,7,杭州市萧山区,杀人案件,2006/07/02 00:00:00,2006/07/02 01:00:00,1151769600000,1151773200000,案件名称3,暂无
A4,0,杭州市其他区,盗窃案件,2006/07/05 00:00:00,2006/07/05 16:00:00,1152028800000,1152086400000,案件名称4,暂无
A5,5,杭州市西湖区,强奸案件,2006/07/06 00:00:00,2006/07/06 21:00:00,1152115200000,1152190800000,案件名称5,暂无
A6,3,杭州市拱墅区,杀人案件,2006/07/06 00:00:00,2006/07/06 16:00:00,1152115200000,1152172800000,案件名称6,暂无
A7,3,杭州市拱墅区,杀人案件,2006/07/08 00:00:00,2006/07/08 10:00:00,1152288000000,1152324000000,案件名称7,暂无
A8,3,杭州市拱墅区,盗窃案件,2006/07/10 00:00:00,2006/07/10 02:00:00,1152460800000,1152468000000,案件名称8,暂无
A9,4,杭州市江干区,盗窃案件,2006/07/14 00:00:00,2006/07/14 13:00:00,1152806400000,1152853200000,案件名称9,暂无
A10,4,杭州市江干区,强奸案件,2006/07/17 00:00:00,2006/07/17 00:00:00,1153065600000,1153065600000,案件名称10,暂无
A11,1,杭州市上城区,杀人案件,2006/07/21 00:00:00,2006/07/21 21:00:00,1153411200000,1153486800000,案件名称11,暂无
A12,3,杭州市拱墅区,强奸案件,2006/07/21 00:00:00,2006/07/21 16:00:00,1153411200000,1153468800000,案件名称12,暂无
A13,7,杭州市萧山区,杀人案件,2006/07/21 00:00:00,2006/07/21 21:00:00,1153411200000,1153486800000,案件名称13,暂无
A14,4,杭州市江干区,盗窃案件,2006/07/23 00:00:00,2006/07/23 08:00:00,1153584000000,1153612800000,案件名称14,暂无
A15,2,杭州市下城区,盗窃案件,2006/07/26 00:00:00,2006/07/26 01:00:00,1153843200000,1153846800000,案件名称15,暂无
A16,3,杭州市拱墅区,刑事案件,2006/07/28 00:00:00,2006/07/28 10:00:00,1154016000000,1154052000000,案件名称16,暂无
A17,0,杭州市其他区,杀人案件,2006/07/28 00:00:00,2006/07/28 06:00:00,1154016000000,1154037600000,案件名称17,暂无
A18,0,杭州市其他区,刑事案件,2006/08/01 00:00:00,2006/08/01 15:00:00,1154361600000,1154415600000,案件名称18,暂无
A19,4,杭州市江干区,盗窃案件,2006/08/01 00:00:00,2006/08/01 20:00:00,1154361600000,1154433600000,案件名称19,暂无
A20,8,杭州市余杭区,杀人案件,2006/08/04 00:00:00,2006/08/04 06:00:00,1154620800000,1154642400000,案件名称20,暂无

p_case.txt

A0,移送起诉
A1,破案状态
A2,移送起诉
A3,破案状态
A4,移送起诉
A5,破案状态
A6,移送起诉
A7,移送起诉
A8,破案状态
A9,侦查终结
A10,侦查终结
A11,破案状态
A12,侦查终结
A13,破案状态
A14,移送起诉
A15,破案状态
A16,破案状态
A17,侦查终结
A18,移送起诉
A19,破案状态
A20,侦查终结

crime_man.txt

A0,U0
A0,U1
A1,U0
A1,U1
A1,U2
A1,U3
A1,U4
A1,U5
A1,U6
A1,U7
A1,U8
A2,U0
A2,U1
A2,U2
A2,U3
A2,U4
A2,U5
A2,U6
A3,U0
A3,U1
A4,U0
A4,U1
A4,U2
A4,U3
A5,U0
A6,U0
A6,U1
A6,U2
A6,U3
A6,U4
A6,U5
A6,U6
A7,U0
A8,U0
A8,U1
A8,U2
A8,U3
A8,U4
A8,U5
A9,U0
A9,U1
A10,U0
A10,U1
A10,U2
A10,U3
A10,U4
A10,U5
A10,U6
A11,U0
A11,U1
A11,U2
A11,U3
A12,U0
A13,U0
A13,U1
A13,U2
A13,U3
A13,U4
A13,U5
A13,U6
A13,U7
A13,U8
A14,U0
A14,U1
A14,U2
A14,U3
A14,U4
A14,U5
A14,U6
A14,U7
A15,U0
A15,U1
A15,U2
A15,U3
A16,U0
A16,U1
A17,U0
A17,U1
A17,U2
A17,U3
A17,U4
A18,U0
A18,U1
A19,U0
A19,U1
A19,U2
A19,U3
A19,U4
A19,U5
A19,U6
A20,U0
A20,U1
A20,U2
A20,U3
A20,U4
A20,U5
A20,U6
A20,U7
A20,U8

wbfile.txt

0,1,网吧583,杭州市上城区xx670路280号,姓名58,U86,2006/06/23 00:00:00,2006/06/23 19:00:00,1150992000000,1151060400000
1,0,网吧757,杭州市其他区xx570路266号,姓名55,U636,2006/06/23 00:00:00,2006/06/23 19:00:00,1150992000000,1151060400000
2,0,网吧283,杭州市其他区xx332路89号,姓名30,U793,2006/06/24 00:00:00,2006/06/24 19:00:00,1151078400000,1151146800000
3,3,网吧129,杭州市拱墅区xx662路713号,姓名33,U570,2006/06/27 00:00:00,2006/06/27 04:00:00,1151337600000,1151352000000
4,8,网吧434,杭州市余杭区xx975路721号,姓名59,U766,2006/06/29 00:00:00,2006/06/29 18:00:00,1151510400000,1151575200000
5,4,网吧80,杭州市江干区xx959路481号,姓名80,U318,2006/07/01 00:00:00,2006/07/01 18:00:00,1151683200000,1151748000000
6,6,网吧611,杭州市滨江区xx853路84号,姓名18,U220,2006/07/03 00:00:00,2006/07/03 19:00:00,1151856000000,1151924400000
7,1,网吧913,杭州市上城区xx560路157号,姓名56,U5,2006/07/03 00:00:00,2006/07/03 06:00:00,1151856000000,1151877600000
8,7,网吧684,杭州市萧山区xx754路827号,姓名34,U233,2006/07/07 00:00:00,2006/07/07 16:00:00,1152201600000,1152259200000
9,4,网吧545,杭州市江干区xx765路502号,姓名66,U167,2006/07/09 00:00:00,2006/07/09 21:00:00,1152374400000,1152450000000
10,2,网吧661,杭州市下城区xx690路657号,姓名96,U380,2006/07/09 00:00:00,2006/07/09 04:00:00,1152374400000,1152388800000
11,8,网吧928,杭州市余杭区xx61路688号,姓名90,U386,2006/07/12 00:00:00,2006/07/12 23:00:00,1152633600000,1152716400000
12,0,网吧979,杭州市其他区xx618路41号,姓名40,U378,2006/07/13 00:00:00,2006/07/13 09:00:00,1152720000000,1152752400000
13,1,网吧139,杭州市上城区xx666路869号,姓名97,U685,2006/07/13 00:00:00,2006/07/13 07:00:00,1152720000000,1152745200000
14,7,网吧109,杭州市萧山区xx558路485号,姓名32,U884,2006/07/15 00:00:00,2006/07/15 02:00:00,1152892800000,1152900000000
15,3,网吧866,杭州市拱墅区xx738路6号,姓名51,U629,2006/07/18 00:00:00,2006/07/18 09:00:00,1153152000000,1153184400000
16,0,网吧330,杭州市其他区xx251路887号,姓名79,U239,2006/07/22 00:00:00,2006/07/22 17:00:00,1153497600000,1153558800000
17,7,网吧138,杭州市萧山区xx385路448号,姓名57,U690,2006/07/22 00:00:00,2006/07/22 14:00:00,1153497600000,1153548000000
18,0,网吧816,杭州市其他区xx61路99号,姓名62,U137,2006/07/26 00:00:00,2006/07/26 01:00:00,1153843200000,1153846800000
19,5,网吧147,杭州市西湖区xx612路924号,姓名40,U569,2006/07/28 00:00:00,2006/07/28 17:00:00,1154016000000,1154077200000
20,0,网吧509,杭州市其他区xx569路234号,姓名54,U361,2006/07/30 00:00:00,2006/07/30 12:00:00,1154188800000,1154232000000

hotelfile.txt

1,5,宾馆598,杭州市西湖区xx268路894号,姓名38,U225,2006/06/24 00:00:00,2006/06/24 00:19:00,1151078400000,1151079540000,13
2,3,宾馆758,杭州市拱墅区xx480路729号,姓名92,U651,2006/06/25 00:00:00,2006/06/25 00:01:00,1151164800000,1151164860000,227
3,7,宾馆499,杭州市萧山区xx173路827号,姓名18,U329,2006/06/26 00:00:00,2006/06/26 00:04:00,1151251200000,1151251440000,794
4,7,宾馆478,杭州市萧山区xx620路622号,姓名57,U314,2006/06/27 00:00:00,2006/06/27 00:11:00,1151337600000,1151338260000,65
5,3,宾馆692,杭州市拱墅区xx165路624号,姓名15,U399,2006/06/28 00:00:00,2006/06/28 00:07:00,1151424000000,1151424420000,895
6,2,宾馆31,杭州市下城区xx635路833号,姓名60,U606,2006/06/29 00:00:00,2006/06/29 00:07:00,1151510400000,1151510820000,174
7,4,宾馆198,杭州市江干区xx622路536号,姓名71,U158,2006/06/29 00:00:00,2006/06/29 00:00:00,1151510400000,1151510400000,517
8,8,宾馆390,杭州市余杭区xx328路848号,姓名36,U27,2006/06/30 00:00:00,2006/06/30 00:11:00,1151596800000,1151597460000,670
9,4,宾馆398,杭州市江干区xx53路761号,姓名59,U624,2006/06/30 00:00:00,2006/06/30 00:01:00,1151596800000,1151596860000,878
10,0,宾馆1,杭州市其他区xx715路756号,姓名3,U703,2006/07/01 00:00:00,2006/07/01 00:00:00,1151683200000,1151683200000,898
11,4,宾馆53,杭州市江干区xx813路302号,姓名24,U226,2006/07/01 00:00:00,2006/07/01 00:10:00,1151683200000,1151683800000,983
12,8,宾馆718,杭州市余杭区xx911路813号,姓名1,U548,2006/07/01 00:00:00,2006/07/01 00:20:00,1151683200000,1151684400000,575
13,5,宾馆553,杭州市西湖区xx641路69号,姓名33,U265,2006/07/01 00:00:00,2006/07/01 00:06:00,1151683200000,1151683560000,122
14,4,宾馆179,杭州市江干区xx661路224号,姓名34,U262,2006/07/01 00:00:00,2006/07/01 00:17:00,1151683200000,1151684220000,131
15,4,宾馆582,杭州市江干区xx417路704号,姓名19,U813,2006/07/01 00:00:00,2006/07/01 00:23:00,1151683200000,1151684580000,0
16,8,宾馆895,杭州市余杭区xx527路341号,姓名80,U362,2006/07/02 00:00:00,2006/07/02 00:15:00,1151769600000,1151770500000,11
17,1,宾馆6,杭州市上城区xx62路637号,姓名35,U434,2006/07/02 00:00:00,2006/07/02 00:07:00,1151769600000,1151770020000,939
18,0,宾馆889,杭州市其他区xx943路239号,姓名46,U614,2006/07/02 00:00:00,2006/07/02 00:16:00,1151769600000,1151770560000,565
19,6,宾馆322,杭州市滨江区xx430路162号,姓名71,U911,2006/07/02 00:00:00,2006/07/02 00:10:00,1151769600000,1151770200000,542
20,4,宾馆491,杭州市江干区xx529路615号,姓名63,U911,2006/07/03 00:00:00,2006/07/03 00:09:00,1151856000000,1151856540000,385

(3)执行hive元数据命令

[root@bigdata01 ~]# hive --service metastore

(4)执行sparkSQL命令行

bin/spark-sql --master yarn-client --executor-memory 80g --conf spark.sql.warehouse.dir=hdfs://bigdata01.hzjs.co:8020/user/sparksql --conf spark.driver.maxResultSize=10g

(5)测试sql语句

案件发生区域内 2017年 盗窃案件 区域为3的 同时出现在网吧和宾馆的人 时间在两天内的
select c_rcode,c_code,c_name,c_region,p_status,h_region,h_hname,h_uname,h_code,w_region,w_wname,w_uname,w_code from mycase  left join p_case on mycase.c_code=p_case.p_code  left join hotel on mycase.c_rcode=hotel.h_region  left join wb on mycase.c_rcode=wb.w_region  where p_status !='破案状态' and c_cate='盗窃案件' and c_rcode = '3' and  3200000000 < c_start_m and c_start_m < 1514736000000  and h_code=w_code  and    ( c_start_m - 86400000 * 10 )< w_start_m  and   w_end_m < ( c_start_m + 86400000 * 10 ) and   ( c_start_m - 86400000 * 10 )< h_start_m  and   h_end_m < ( c_start_m + 86400000 * 10 ) ;

(6)执行结果

Time taken: 25.288 seconds, Fetched 25 row(s)

蓝色的线
蓝色的线只需要在建表的时候在hive里建立外部表,表指向Hbase中的一个表就可以了

create external table test_lcc_person (rowkey string,'name' string,'sex' string,'age' string) row format delimited fields terminated by '\t' STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,lcc_liezu:name,lcc_liezu:sex,lcc_liezu:age") TBLPROPERTIES ("hbase.table.name" = "test_lcc_person");

test_lcc_person两处的名字要相同,该命令在hive命令行中执行

3。思路:你读出来的数据hbaseRDD通过transform转成dataframe,然后register 成table,再join,再save不就行呢?

4。做java的不会,只能读取 一个表,还不会转换

package com.lcc.spark.hbase.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

public class SparkOnHbase {

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		
		System.setProperty("hadoop.home.dir", "E:\\02-hadoop\\hadoop-2.7.3\\");
	    System.setProperty("HADOOP_USER_NAME", "root"); 
	    
	    System.setProperty("HADOOP_USER_NAME", "root"); 
		
	   // System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
	    
		SparkConf conf = new SparkConf();
		conf.setAppName("LG_CALCULATE");
		conf.setMaster("local");

		JavaSparkContext context = new JavaSparkContext(conf);
		
		
		Configuration configuration = HBaseConfiguration.create();  
        configuration.set("hbase.zookeeper.property.clientPort", "2181");  
        configuration.set("hbase.zookeeper.quorum", "192.168.10.82");  
        //configuration.set("hbase.master", "192.168.10.82:60000");  
        
        Scan scan = new Scan();
        String tableName = "test_lcc_person";
        configuration.set(TableInputFormat.INPUT_TABLE, tableName);
        
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        String ScanToString = Base64.encodeBytes(proto.toByteArray());
        
        configuration.set(TableInputFormat.SCAN, ScanToString);
        
        JavaPairRDD<ImmutableBytesWritable, Result> myRDD = context.newAPIHadoopRDD(configuration,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        
        System.out.println(myRDD.count());
       
        
        
        myRDD.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable,Result>>(){

			@Override
			public void call(Tuple2<ImmutableBytesWritable, Result> tuple)
					throws Exception {
				Result result = tuple._2();
				String rowkey = Bytes.toString(result.getRow());
	            String name = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("name")));
	            String sex = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("sex")));
	            String age = Bytes.toString(result.getValue(Bytes.toBytes("lcc_liezu"), Bytes.toBytes("age")));
	        	System.out.print(rowkey);
	            System.out.print("\t");
	            System.out.print(name);
	            System.out.print("\t");
	            System.out.print(sex);
	            System.out.print("\t");
	            System.out.print(age);
	            System.out.println("\t");
				
			}
        	
        });

	}

}

5。采用scala学习

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}


object Test {
  
  def main(args: Array[String]): Unit = {
        // 本地模式运行,便于测试
        val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")

        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set("hbase.zookeeper.property.clientPort", "2181");  
        hBaseConf.set("hbase.zookeeper.quorum", "192.168.10.82"); 
        hBaseConf.set(TableInputFormat.INPUT_TABLE,"test_lcc_person")

        // 创建 spark context
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._

        // 从数据源获取数据
        val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val shop = hbaseRDD.map(r=>(
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("name"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("sex"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("age")))
        )).toDF("name","sex","age")

        shop.registerTempTable("shop")

        // 测试
        val df2 = sqlContext.sql("SELECT * FROM shop")
        println(df2.count())
        df2.collect().foreach(print(_))
        //df2.foreach(println)
  }
  
}

输出结果:

[梁川川1,男,12][梁川川2,男,12][梁川川3,男,12][梁川川4,男,12][梁川川5,男,12][梁川川6,男,12][梁川川7,男,17]

证明 读出来的数据hbaseRDD通过transform转成dataframe,然后register 成table 这个想法是正确的。

6。试试两个表读取试试

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}


object Test {
  
  def main(args: Array[String]): Unit = {
        // 本地模式运行,便于测试
        val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")

        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set("hbase.zookeeper.property.clientPort", "2181");  
        hBaseConf.set("hbase.zookeeper.quorum", "192.168.10.82"); 
        hBaseConf.set(TableInputFormat.INPUT_TABLE,"test_lcc_person")
        
        
        
        // 创建 spark context
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._

        // 从数据源获取数据
        val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val shop = hbaseRDD.map(r=>(
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("name"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("sex"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("age")))
        )).toDF("name","sex","age")

        shop.registerTempTable("shop")

        // 测试
        val df2 = sqlContext.sql("SELECT * FROM shop")
        println(df2.count())
        df2.collect().foreach(print(_))
        //df2.foreach(println)
        
        
        
        
        
        
        
        
        
         // 创建hbase configuration
        val hBaseConf2 = HBaseConfiguration.create()
        hBaseConf2.set("hbase.zookeeper.property.clientPort", "2181");  
        hBaseConf2.set("hbase.zookeeper.quorum", "192.168.10.82"); 
        hBaseConf2.set(TableInputFormat.INPUT_TABLE,"test_lcc_card")
        
        
        
        // 创建 spark context
        val sc2 = new SparkContext(sparkConf)
        val sqlContext2 = new SQLContext(sc2)
        import sqlContext.implicits._

        // 从数据源获取数据
        val hbaseRDD2 = sc2.newAPIHadoopRDD(hBaseConf2,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

        
        
        
        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val card = hbaseRDD.map(r=>(
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezus"),Bytes.toBytes("code"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezus"),Bytes.toBytes("money"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezus"),Bytes.toBytes("time")))
        )).toDF("code","money","time")

        card.registerTempTable("mycard")
        
        // 测试
        val df3 = sqlContext.sql("SELECT * FROM mycard")
        println(df3.count())
        df3.collect().foreach(print(_))
        
  }
  
}

但是结果报错

[梁川川1,男,12][梁川川2,男,12][梁川川3,男,12][梁川川4,男,12][梁川川5,男,12][梁川川6,男,12][梁川川7,男,17]Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.SparkContext.<init>(SparkContext.scala:76)
Test$.main(Test.scala:25)
Test.main(Test.scala)
	at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2285)

只能有一个SparkContext 存在。

7。改进以下程序

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}


object Test {
  
  def main(args: Array[String]): Unit = {
        // 本地模式运行,便于测试
        val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")

        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set("hbase.zookeeper.property.clientPort", "2181");  
        hBaseConf.set("hbase.zookeeper.quorum", "192.168.10.82"); 
        //var con = ConnectionFactory.createConnection(hBaseConf)
        
        //var table = con.getTable(TableName.valueOf(""))
        
       hBaseConf.set(TableInputFormat.INPUT_TABLE,"test_lcc_person")
      
        
        
        // 创建 spark context
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._

        // 从数据源获取数据
        var hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val shop = hbaseRDD.map(r=>(
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("id"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("name"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("sex"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezu"),Bytes.toBytes("age")))
        )).toDF("id","name","sex","age")

        shop.registerTempTable("shop")

        // 测试
        val df2 = sqlContext.sql("SELECT * FROM shop")
        println(df2.count())
        df2.collect().foreach(print(_))
        //df2.foreach(println)
        
        
        hBaseConf.set(TableInputFormat.INPUT_TABLE,"test_lcc_card")
        hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val card = hbaseRDD.map(r=>(
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezus"),Bytes.toBytes("ids"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezus"),Bytes.toBytes("code"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezus"),Bytes.toBytes("money"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("lcc_liezus"),Bytes.toBytes("time")))
        )).toDF("ids","code","money","time")

        card.registerTempTable("mycard")
        
        // 测试
        val df3 = sqlContext.sql("SELECT * FROM mycard")
        println(df3.count())
        df3.collect().foreach(print(_))
        
        
        
         // 测试
        val df4 = sqlContext.sql("SELECT * FROM shop inner join mycard on id=ids")
        println(df4.count())
        df4.collect().foreach(println(_))
        
  }
  
}

测试结果

[7,梁川川7,男,17,7,7777,7777,2015-10-11]
[3,梁川川3,男,12,3,3333,333,2015-10-11]
[5,梁川川5,男,12,5,55,55,2015-10-11]
[6,梁川川6,男,12,6,6666,6666,2015-10-11]
[1,梁川川1,男,12,1,1111111111,1111111,2015-10-11]
[4,梁川川4,男,12,4,444,444,2015-10-11]
[2,梁川川2,男,12,2,22222,22222,2015-10-11]

测试成功

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

scala学习-scala读取Hbase表中数据并且做join连接查询 的相关文章

  • boost::algorithm::join 的一个很好的例子

    我最近想用提升 算法 加入 http www boost org doc libs 1 41 0 doc html string algo reference html header boost algorithm string join
  • 从 HList 获取元素

    我尝试了 HList 并按预期进行了以下工作 val hl 1 foo HNil val i Int hl 0 val s String hl 1 但是 我无法让以下代码正常工作 让我们暂时假设对列表进行随机访问是一个聪明的主意 class
  • SQL中如何合并多个表的数据

    我想我的处境很复杂 这是场景 我在 SQL Server 中有 3 个表 注册 学生 课程 仅供参考 没有外键 表的列是 Student 学生号 学生名 Course 课程 ID 课程名称 注册 注册 ID 学生 ID 课程 ID 课程结果
  • Scala:什么是 CompactBuffer?

    我试图弄清楚 CompactBuffer 的含义 和迭代器一样吗 请解释其中的差异 根据 Spark 的文档 它是 ArrayBuffer 的替代方案 可以提供更好的性能 因为它分配的内存更少 以下是 CompactBuffer 类文档的摘
  • 显式与隐式 SQL 连接

    显式内连接与隐式内连接之间有效率差异吗 例如 SELECT FROM table a INNER JOIN table b ON a id b id vs SELECT a b FROM table a table b WHERE a id
  • 理解 Scala FP 库

    只是为了让那些想要开始使用 Scala FP 库 在纯 FP 方面变得更好的人快速清晰地了解 有人能澄清猫和猫效应 猫效应 IO 之间的区别 关系吗 最重要的是 齐奥和莫尼克斯对此有何看法 最后 与 ScalaZ 7 8 有何关系 到目前为
  • 如何在映射中将字符串转换为 Seq[String]

    我有一个Map String String 以及需要的第三方功能Map String Seq String 有没有一种简单的方法来转换它 以便我可以将地图传递给函数 original mapValues Seq 注意mapValues返回地
  • 在 Scala 中将元素追加到列表末尾

    我无法添加 type 元素T到一个列表中List T 我尝试过myList myElement但它似乎创建了一个奇怪的对象并访问myList last始终返回放入列表中的第一个元素 我怎么解决这个问题 List 1 2 3 4 Result
  • 如何在超时的情况下在单独的调度程序上运行 Akka Streams 图?

    这个问题是基于我做过的一个宠物项目 这个SO https stackoverflow com questions 34641861 akka http blocking in a future blocks the server 34645
  • 具有继承类型的 Aux 模式推理失败

    我有一个复杂的玩具算法 我希望纯粹在类型级别上表示 根据饮食要求选择当天菜肴的修改 对卷积表示歉意 但我认为我们需要每一层才能达到我想要使用的最终界面 我的代码有一个问题 如果我们表达一个类型约束Aux 模式生成的类型基于另一个泛型类型 它
  • sql join 告诉我 ID 是否存在于其他表中

    我有 2 张桌子 A B ID FKID 1 3 2 3 3 4 4 4 我需要一个 select 语句 它显示 A 的所有内容 其中一个字段告诉我表 B 是否有任何与该 ID 匹配的 id Desired Result ID hasB 1
  • Akka-Http 2.4.9 抛出 java.lang.NoClassDefFoundError: akka/actor/ActorRefFactory 异常

    我正在尝试使用 Akka http 构建一个简单的 Web 服务 我遵循了这个指南 http doc akka io docs akka 2 4 9 scala http low level server side api html htt
  • Spark scala 模拟 Spark.implicits 用于单元测试

    当尝试使用 Spark 和 Scala 简化单元测试时 我使用 scala test 和mockito scala 以及mockito Sugar 这只是让你做这样的事情 val sparkSessionMock mock SparkSes
  • IntelliJ IDEA 不会从 SBT 项目加载 Lift 库

    我通过创建了一个空白项目sbt使用最基本的指南 具体来说 gt cd xyz gt sbt here we create a new project w Scala 2 8 1 gt lift is org lifty lifty 1 6
  • 使用 Shapeless 记录组合任意数量的状态更改函数

    我正在尝试移植combineReducers从 Redux 到 Scala 这个想法是每个函数控制它的一小部分状态并且combineReducers创建一个控制整个状态的函数 我无法找出应该像这样工作的函数所需的签名 sealed trai
  • 正确使用术语 Monoid

    从下面的例子来看 我认为这样的说法是正确的String在串联运算下定义了一个幺半群 因为它是关联二元运算 并且String碰巧有一个身份元素 它是一个空字符串 scala gt Jane Doe Jane Doe res0 Boolean
  • 附加两个具有相同列、不同顺序的数据框

    我有两个熊猫数据框 noclickDF DataFrame 0 123 321 0 1543 432 columns click id location clickDF DataFrame 1 123 421 1 1543 436 colu
  • 使用 Akka 1.3 的 actor 时,我需要注意生产者-消费者速率匹配吗?

    使用 Akka 1 3 时 我是否需要担心当生成消息的 Actor 生成消息的速度比使用消息的 Actor 的处理速度快时会发生什么 如果没有任何机制 在长时间运行的进程中 队列大小将增大以消耗所有可用内存 The doc http doc
  • 使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错

    这是一个 Spark Streaming 应用程序 它使用编码的 Kafka 消息Proto Buf Using scalapb图书馆 我收到以下错误 请帮忙 gt com google protobuf InvalidProtocolBu
  • MySQL 左连接 WHERE table2.field = "X"

    我有以下表格 pages Field Type Null Key Default Extra page id int 11 NO PRI NULL auto increment type varchar 20 NO NULL

随机推荐

  • SublimeText如何快速设置代码自动补全?

    关于SublimeText如何快速设置代码自动补全 有一定的参考价值 有需要的朋友可以参考一下 希望对你有所帮助 推荐课程 SublimeText使用教程 实现效果 大家也可以通过以下链接下载所需要的版本 SublimeText3汉化版 h
  • nginx+keepalived 的安装

    安装nginx 源码编译安装 一般系统中已经装了了 make 和 g 无须再装 1 安装 make yum y install autoconf automake make 2 安装 g yum y install gcc gcc c 3
  • vsCode中安装vim插件之后无法使用ctrl+c、ctrl+v快捷键

    相信很多小伙伴在使用vscode时安装vim插件之后在编辑模式下都无法使用ctrl c和ctrl v快捷键 但是从你安装vim插件那一刻 你就拥有了成为开发大佬的潜质 打开vocode设置 相信所有人都知道怎么打开 搜索vim ctrl 找
  • SQL实验总结

    文章目录 SQL实验总结 1视图操作 2触发器 3创建外键约束 4用户权限 SQL实验总结 1视图操作 1 创建视图时SELECT语句的使用 CREATE VIEW lt 视图名 gt AS
  • spark安装时的版本兼容问题

    之前在安装hbase时 被版本兼容问题坑了很长时间 所以在这次spark的安装时谨慎了些 先去官网 查看不同版本关于JDK Hadoop的兼容性 找不同版本spark的源码 如下图 下载源码 解压缩 找到文件夹中pom xml文件 查看里面
  • 内网虚拟服务器怎么固定域名,如何让自己的局域网用(虚拟的)域名访问?

    IIS组建Intranet完全手册 如何在建立内部的电子邮件系统 如何在局域网中发布主页 以及是否可以在局域网中利用AceFTP CuteFTP来上传个人主页到单位的 Intranet 服务器等问题 主要是 WWW FTP服务器的建立及DN
  • Java heap space 异常解决思路

    java lang OutOfMemoryError Java heap space at java util HashMap resize HashMap java 704 na 1 8 0 181 at java util HashMa
  • 华为OD机试真题- 异常的打卡记录【2023Q1】【JAVA、Python、C++】

    题目描述 考勤记录是分析和考核职工工作时间利用情况的原始依据 也是计算职工工资的原始依据 为了正确地计算职工工资和监督工资基金使用情况 公司决定对员工的手机打卡记录进行异常排查 如果出现以下两种情况 则认为打卡异常 1 实际设备号与注册设备
  • 前端03——HTML字体标签、超链接以及图片标签

    本系列文章是是博主自己的学习前端笔记 所有笔记参照 Github 字体标签 font b u u b font
  • selenium 自动化测试工具(一)环境搭建

    支持多语音及跨平台 定制化高 不支持C S 只支持浏览器自动化 自动化测试适用于周期长 业务流程复杂 趋于稳定的项目 不支持基于flash的项目 selenuim会无法定位 selenium1 基于javascript 通过执行js脚本来操
  • 需求实现记录

    一 获取预设时间范围内的活跃会员数
  • MongoDB 7.0 来了 3分钟介绍,更自由,更阳光 ,更有意思

    开头还是介绍一下群 如果感兴趣polardb mongodb mysql postgresql redis oceanbase 等有问题 有需求都可以加群群内有各大数据库行业大咖 CTO 可以解决你的问题 加群请加 liuaustin3微信
  • Ubuntu本地快速搭建web小游戏网站,公网用户远程访问【内网穿透】

    文章目录 前言 1 本地环境服务搭建 2 局域网测试访问 3 内网穿透 3 1 ubuntu本地安装cpolar内网穿透 3 2 创建隧道 3 3 测试公网访问 4 配置固定二级子域名 4 1 保留一个二级子域名 4 2 配置二级子域名 4
  • 关于edge浏览器的卡顿问题

    近日惠普电脑暗影精灵系列 edge浏览器出现卡顿不流畅问题 这属于惠普的一大特色 bug 解决方案 1 打开任务管理器 找到进程为omen 的 进行关闭 此时电脑edge问题就会得到解决 2 如果不想总是开机后 要关闭进程来解决问题 治标治
  • avalon ms-repeat avalon1

    工作原因要用到avalon二次开发 但是看了下以前的avalon版本是1 现在大多数都是2版本了吧 所以很多文档不好找 但是大多数还是好用的 ms repeat 循环当前赋值的 ms repeat 加载需要循环显示的子元素上 默认el ms
  • ftp工具无法连接到Linux服务器

    ftp工具无法连接Linux服务器 文件无法上传 是因为你的ftp服务器未搭建 或未启动 许久没有登录腾讯云 今天想用xshell的xftp工具上传文件 却突然出现连接不上 用22端口 可以正常登录并访问 但是ftp工具的21端口无法连上远
  • 万字长文解读计算机视觉中的注意力机制(附论文和代码链接)

    文中论文和代码已经整理 如果需要 点击下方公号关注 领取 持续传达瓜货 所向披靡的张大刀 注意力机制是机器学习中嵌入的一个网络结构 主要用来学习输入数据对输出数据贡献 注意力机制在NLP和CV中均有使用 本文从注意力机制的起源和演进开始 并
  • centos登录root账户

    su root 然后输入密码 回车
  • Element 入门教程

    Element 入门 Element 布局 业务 element 美化页面 表格 表单 对话框 表单 分页工具条 页面 Element 入门 官网 https element eleme cn zh CN 引入 Element 的 css
  • scala学习-scala读取Hbase表中数据并且做join连接查询

    1 业务需求 sparkSQL on hbase sparkSQL直接读取Hbase中的两个表 进行连接查询 2 图示 绿色的线 上图中绿色的线是做过测试的 直接在hive中建表 然后load数据进去 数据文件是存储在HDFS上的 1 建表