1.简介
众所周知,spark是一个分布式计算引擎,可以将计算数据分不到不同的节点进行计算,但是往往我们的业务都是比较复杂,每天定时跑的时候不只是一个job,可能是有很多的job,但是引擎本身是串行化的,而且对于经验不深的同学,一个业务可能在一个scala文件写上上前行代码,这样就很难维护,所以这里为大家提供一个简易的spark框架。框架同时采用多线程的方式,可以提高多个job的执行效率。
2.原理
框架借助java spring的思想,使用了简易的bean管理器,一共提供了三种bean类型。
- 仓库bean:
- 计算bean:
- 存储bean:
仓库bean需要实现DataWarehouseService接口,并且加上注解@WarehouseService进行描述读取的数据仓库的作用。
计算bean需要实现DataCalcService接口,并且加上注解@CalcService表明计算的类型,往往一个仓库bean下面可能会有多个计算bean。
存储bean需要实现DataStorageService接口,并且加上注解@StorageService表明存储的类型,如S3、DB
流程如下:
3.使用介绍
所有仓库bean、计算bean、存储bean都需要写到service包下面,框架才能自动读取,无论service包下的层级多深,都能够扫描到。
1.仓库bean
package com.moon.service.warehouse
import com.moon.core.DataWarehouseService
import com.moon.core.annots.WarehouseService
import com.moon.core.enums.DataWarehouseTypeE
import com.moon.service.storage.S3StorageServiceImpl
import com.moon.utils.CommonUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
/**
* device data house
*/
@WarehouseService(dataWarehouseType = DataWarehouseTypeE.DEVICE_DATA, serviceDesc = "read device data from s3")
class DeviceDataWarehouseServiceImpl extends DataWarehouseService {
val log = LoggerFactory.getLogger(getClass)
val s3Service = new S3StorageServiceImpl()
/**
* load data from s3
*
* @return
*/
def loadWarehouse(sparkSession: SparkSession): DataFrame = {
var df = CommonUtils.readFromS3(sparkSession, "s3://xxxx")
if (df == null) return null
df
}
}
2.计算bean
package com.moon.service.calculate.device
import com.moon.core.DataCalcService
import com.moon.core.annots.CalcService
import com.moon.core.enums.{DataWarehouseTypeE, StorageTypeE}
import com.moon.domain.CalcResult
import com.moon.utils.{DateUtils, UUIDUtil}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, count, lit, udf}
import org.slf4j.LoggerFactory
@CalcService(dataWarehouseType=DataWarehouseTypeE.DEVICE_DATA,storageType=StorageTypeE.DB,
tableName = "test_table_name",serviceDesc="save device detail data")
class ProductDeviceDetailServiceImpl extends DataCalcService{
val log = LoggerFactory.getLogger(getClass)
/**
*
* @param df raw data for yesterday
* @return
*/
override def dataCalc(df: DataFrame): CalcResult = {
log.info("start device detail service...")
val ret = df.select("productId","productName","macAddress")
.filter(col("productId").isNotNull).distinct()
.filter(col("macAddress").isNotNull).distinct()
val newRet = ret.withColumn("executeDay", lit(DateUtils.yesterday()))
val calcResult : CalcResult = new CalcResult
log.info("finished device detail service...")
calcResult.setResult(newRet)
calcResult
}
}
3.存储bean
package com.moon.service.storage
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.moon.core.DataStorageService
import com.moon.core.annots.StorageService
import com.moon.core.enums.StorageTypeE
import org.apache.spark.sql.{DataFrame, SaveMode}
import java.time.{LocalDate, Period}
@StorageService(storageType = StorageTypeE.S3, serviceDesc = "save data to s3")
class S3StorageServiceImpl extends DataStorageService {
val threadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build()
def storageData(df: DataFrame, tableName: String): Unit = {
val startTime = System.currentTimeMillis()
val today = LocalDate.now()
val yesterday = today.minus(Period.ofDays(1))
val year = yesterday.getYear
val month = if (yesterday.getMonthValue.toString.length == 1) s"0${yesterday.getMonthValue}" else yesterday.getMonthValue.toString
df.write.mode(SaveMode.Append).option("encoding", "UTF-8").option("useSSL","false").parquet("s3://xxxxx")
val endTime = System.currentTimeMillis()
println("save " + tableName + " spend time is : " + (endTime - startTime))
}
}
整个源代码已经上传到资源,scala版本使用:2.11
点击下载源码