spark boot封装,多线程高效执行

2023-11-13

1.简介

众所周知,spark是一个分布式计算引擎,可以将计算数据分不到不同的节点进行计算,但是往往我们的业务都是比较复杂,每天定时跑的时候不只是一个job,可能是有很多的job,但是引擎本身是串行化的,而且对于经验不深的同学,一个业务可能在一个scala文件写上上前行代码,这样就很难维护,所以这里为大家提供一个简易的spark框架。框架同时采用多线程的方式,可以提高多个job的执行效率。

2.原理

框架借助java spring的思想,使用了简易的bean管理器,一共提供了三种bean类型。

  1. 仓库bean:
  2. 计算bean:
  3. 存储bean:

仓库bean需要实现DataWarehouseService接口,并且加上注解@WarehouseService进行描述读取的数据仓库的作用。

计算bean需要实现DataCalcService接口,并且加上注解@CalcService表明计算的类型,往往一个仓库bean下面可能会有多个计算bean。

存储bean需要实现DataStorageService接口,并且加上注解@StorageService表明存储的类型,如S3、DB

流程如下:

3.使用介绍

 所有仓库bean、计算bean、存储bean都需要写到service包下面,框架才能自动读取,无论service包下的层级多深,都能够扫描到。


1.仓库bean

package com.moon.service.warehouse

import com.moon.core.DataWarehouseService
import com.moon.core.annots.WarehouseService
import com.moon.core.enums.DataWarehouseTypeE
import com.moon.service.storage.S3StorageServiceImpl
import com.moon.utils.CommonUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory

/**
 * device data house
 */
@WarehouseService(dataWarehouseType = DataWarehouseTypeE.DEVICE_DATA, serviceDesc = "read device data from s3")
class DeviceDataWarehouseServiceImpl extends DataWarehouseService {

  val log = LoggerFactory.getLogger(getClass)

  val s3Service = new S3StorageServiceImpl()

  /**
   * load data from s3
   *
   * @return
   */
  def loadWarehouse(sparkSession: SparkSession): DataFrame = {
    var df = CommonUtils.readFromS3(sparkSession, "s3://xxxx")
    if (df == null) return null

    df
  }
}

 2.计算bean

package com.moon.service.calculate.device

import com.moon.core.DataCalcService
import com.moon.core.annots.CalcService
import com.moon.core.enums.{DataWarehouseTypeE, StorageTypeE}
import com.moon.domain.CalcResult
import com.moon.utils.{DateUtils, UUIDUtil}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, count, lit, udf}
import org.slf4j.LoggerFactory

@CalcService(dataWarehouseType=DataWarehouseTypeE.DEVICE_DATA,storageType=StorageTypeE.DB,
  tableName = "test_table_name",serviceDesc="save device detail data")
class ProductDeviceDetailServiceImpl extends DataCalcService{

  val log = LoggerFactory.getLogger(getClass)

  /**
   *
   * @param df raw data for yesterday
   * @return
   */
  override def dataCalc(df: DataFrame): CalcResult = {
    log.info("start device detail service...")

    val ret = df.select("productId","productName","macAddress")
      .filter(col("productId").isNotNull).distinct()
      .filter(col("macAddress").isNotNull).distinct()

    val newRet = ret.withColumn("executeDay", lit(DateUtils.yesterday()))
    val calcResult : CalcResult = new CalcResult

    log.info("finished device detail service...")

    calcResult.setResult(newRet)
    calcResult
  }
}

3.存储bean

package com.moon.service.storage

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.moon.core.DataStorageService
import com.moon.core.annots.StorageService
import com.moon.core.enums.StorageTypeE
import org.apache.spark.sql.{DataFrame, SaveMode}

import java.time.{LocalDate, Period}

@StorageService(storageType = StorageTypeE.S3, serviceDesc = "save data to s3")
class S3StorageServiceImpl extends DataStorageService {

  val threadFactory = new ThreadFactoryBuilder().setNameFormat("test-pool-%d").build()

  def storageData(df: DataFrame, tableName: String): Unit = {
    val startTime = System.currentTimeMillis()
    val today = LocalDate.now()
    val yesterday = today.minus(Period.ofDays(1))
    val year = yesterday.getYear
    val month = if (yesterday.getMonthValue.toString.length == 1) s"0${yesterday.getMonthValue}" else yesterday.getMonthValue.toString


    df.write.mode(SaveMode.Append).option("encoding", "UTF-8").option("useSSL","false").parquet("s3://xxxxx")
    val endTime = System.currentTimeMillis()
    println("save " + tableName + " spend time is : " + (endTime - startTime))
  }

}

整个源代码已经上传到资源,scala版本使用:2.11

点击下载源码

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spark boot封装,多线程高效执行 的相关文章

  • 如何将变量的全部内容发送/导出到文本文件/xml 文件/剪贴板?

    我想将实例的内容 最好以树形形式 发送给某人 打印屏幕是不行的 因为类太复杂了 您需要将输出转回实例吗 在这种情况下 其他答案都是正确的 如果您只想手动检查实例的内容 理想情况下您的类都将实现toString 你可以将其重定向到一个文件 如
  • Quarkus 不以编程方式选择 bean

    我试图以编程方式选择 bean 但 quarkus 不会注入 bean 并引发异常 不支持吗 public enum ReportType ONE TWO Qualifier Retention RUNTIME Target METHOD
  • 在 Java 中从 SOAPMessage 获取原始 XML

    我已经在 J AX WS 中设置了 SOAP WebServiceProvider 但我无法弄清楚如何从 SOAPMessage 或任何 Node 对象获取原始 XML 下面是我现在获得的代码示例 以及我试图获取 XML 的位置 WebSe
  • 如何将现有的 SQLite3 数据库导入 Room?

    好吧 我在桌面上使用 SQLite3 创建了一个只需要读取的某些信息的数据库 我正在制作的应用程序不需要在此表中插入或删除信息 我在 Room 数据库层上做了相当多的谷歌搜索 所有文档都需要在构建应用程序时在 Room 中创建一个新的数据库
  • 如何为小程序提供对文件系统写入的访问权限

    我在设置小程序的策略文件时遇到问题 我是第一次这样做 不知道如何在java中设置小程序的策略文件 实际上我想授予小程序在文件系统上写入的权限 为此我必须向小程序授予文件权限 所以我创建了一个名为 java policy 的文件 并将以下代码
  • 检查 IPv4 地址是否在私有范围内

    在 Python 中 使用 IPy 模块您可以执行以下操作 gt gt gt ip iptype PRIVATE 有没有一个库或简单的方法可以在 Java 中执行相同的操作 似乎不完全是但是InetAddress有一些 isXX 方法 例如
  • Android WebView文件上传

    我正在开发一个 Android 应用程序 基本上它是一个WebView和一个进度条 Facebook 的移动网站 m facebook com 已加载到WebView 当我单击 选择文件 按钮上传图像时 没有任何反应 我已经尝试了所有的解决
  • Kafka Java Consumer 已关闭

    我刚刚开始使用卡夫卡 我面临着消费者的一个小问题 我用Java写了一个消费者 我收到此异常 IllegalStateException 此消费者已关闭 我在以下行中遇到异常 ConsumerRecords
  • 使用 JAX-WS 的 WebLogic 中没有模式导入的单个 WSDL

    如何使用 JAX WS 配置由 WebLogic 10 3 6 生成的 Web 服务 以将对象架构包含在单个 WSDL 文件声明 而不是导入声明 中 示例代码 界面 import javax ejb Local Local public i
  • 需要正则表达式帮助

    我正在尝试替换两次或多次出现的 br like br br br 标签与两个一起 br br 具有以下模式 Pattern brTagPattern Pattern compile lt s br s s gt s 2 Pattern CA
  • 当 JMS Prod 位于辅助 POJO 类中时,如何在事务中包含 JMS Producer

    简短的问题 有没有办法强制无状态 EJB 调用的 POJO 存在于 EJB 的上下文中 以便事务和资源注入可以在 POJO 中工作 具体来说 在我想要做的事情的上下文中 如何在 EJB 的事务中包含 POJO JMS 生产者 该生产者在调用
  • 在 Junit 测试中使用 ReflectionTestUtils.setField()

    我是 JUnittesting 的新手 所以我有一个问题 谁能告诉我为什么我们使用ReflectionTestUtils setField 在我们的 Junit 测试示例中 正如评论中提到的 java 文档很好地解释了用法 但我还想给你们举
  • Android volley使用RequestFuture.get()时出现超时异常

    在我的片段中 我尝试使用 TMDB 的开放电影数据库来获取有关 正在播放 电影的详细信息 如果我使用 RequestFuture get time TimeUnit 方法来执行此齐射请求 我总是会收到超时错误 如果我在 Safari 中手动
  • Android 解析 JSON 卡在 get 任务上

    我正在尝试解析一些 JSON 数据 我的代码工作了一段时间 我不确定我改变了什么突然破坏了代码 当我运行代码时 我没有收到任何运行时错误或警告 我创建一个新的 AsyncTask 并执行它 当我打电话时 get 在这个新任务中 调试器在此行
  • MongoDB java 驱动程序 3.0 在身份验证时无法捕获异常

    我超级卡住o 0 在尝试通过 Java 驱动程序进行身份验证时 存在捕获异常的问题 正如你可能会看到的Throwable类不工作 private MongoClient mongoClient private MongoDatabase m
  • JAXB 编组器无参数默认构造函数

    我想从 java 库中编组一个 java 对象 当使用 JAXB marschaller 编组 java 对象时 我遇到了一个问题 A 类没有无参数默认构造函数 我使用Java Decompiler来检查类的实现 它是这样的 public
  • 在循环中按名称访问变量

    我正在开发一个 Android 项目 并且有很多可绘制对象 这些绘图的名称都类似于icon 0 png icon 1 png icon 100 png 我想将这些可绘制对象的所有资源 ID 添加到整数 ArrayList 中 对于那些不了解
  • Java:一个函数有多种返回类型...可以使用泛型吗?

    为了简单起见 我有一些程序 如下所示 public String fetchValueAsString String key public DateTime fetchValueAsDateTime String key 我想要类似的东西
  • 传递 Android DialogFragment 参数时,onCreateDialog 捆绑参数意外为 null

    我正在尝试使用 DialogFragment 在 Android 中显示一个基本对话框 并使用对话框消息的参数 如中所述StackOverflow线程 https stackoverflow com questions 15459209 p
  • java.lang.ClassCastException:com.sun.proxy.$Proxy8 无法转换为 org.openqa.selenium.internal.WrapsDriver

    我有以下切入点和 AspectJ 中给出的建议 Pointcut call org openqa selenium WebElement sendKeys public void onWebElementAction After onWeb

随机推荐

  • JAVA开发环境JDK安装及配置

    一 安装JDK 获取JDK的安装包 1 通过官网下载 2 打开安装包 开始安装JDK和JRE 1 打开JDK安装包 2 点击下一步开始JDK安装 3 更改安装路径 接下来以我的电脑为例安装到E盘 其他盘同理 4 将文件夹路径改到E盘新建的文
  • 用js动态创建svg

    吃水不忘挖井人 svg基础教程https www bilibili com video BV1Pt411y7V6 p 1 要实现的效果 svg文件的写法
  • 【LSTM回归】基于粒子群优化注意力机制的长短时记忆神经网络PSO-attention-LSTM实现数据回归预测附matlab代码

    作者简介 热爱科研的Matlab仿真开发者 修心和技术同步精进 matlab项目合作可私信 个人主页 Matlab科研工作室 个人信条 格物致知 更多Matlab完整代码及仿真定制内容点击 智能优化算法 神经网络预测 雷达通信 无线传感器
  • MySQL之分表分库分区

    数据库分表可以解决单表海量数据的查询性能问题 分库可以解决单台数据库的并发访问压力问题 分表 分表分为水平分表和垂直分表 水平分表原理 分表策略通常是用户ID取模 如果不是整数 可以首先将其进行hash获取到整 水平分表遇到的问题 1 跨表
  • Hadoop序列化案例

    Hadoop序列化案例 统计每一个手机号耗费的总上行流量 总下行流量 总流量 数据 1 13736230513 192 196 100 1 www baidu com 2481 24681 200 2 13846544121 192 196
  • Kafka3.0.0版本——Leader Partition自动平衡

    目录 一 Leader Partition自动平衡的概述 二 Leader Partition自动平衡的相关配置参数 三 Leader Partition自动平衡的示例 一 Leader Partition自动平衡的概述 正常情况下 Kaf
  • 代码审计练习题

    代码审计练习题 源码 方法 简单记录一下姿势 源码 判断var1和var2是否为对象 用弱不等号判断 分别判断md5
  • 将CSDN文章导出为.md、HTML、pdf格式

    将CSDN文章导出为 md HTML pdf格式 一 将CSDN文章导出为 md文件 二 将CSDN文章导出为HTML文件 三 把 md文件转换为pdf格式 一 将CSDN文章导出为 md文件 1 打开一篇CSDN文章 点击上方的 导出 按
  • 后端(五):JVM

    目录 JVM 中的内存区域划分 JVM 的类加载机制 1 加载 2 验证 3 准备 4 解析 5 初始化 JVM 中的垃圾回收策略 找 确认垃圾 1 引用计数 2 可达行分析 释放 垃圾 对象 1 标记清除 2 复制算法 3 标记整理 分代
  • Ubuntu18.04搭建VSCode编译环境

    确认ubuntu 18 04 uname a 添加root帐户密码 sudo passwd root 第一步 配置C 编译环境 安装gcc 和 g gcc v g v sudo apt install gcc sudo apt instal
  • 【maven】maven settings.xml 中 mirror 和 repository 的区别

    一 概述 maven的settings xml文件里面有proxy server repository mirror的配置 在配置仓库地址的时候容易混淆 proxy是服务器不能直接访问外网时需要设置的代理服务 不常用 server是服务器要
  • [论文阅读] (13)英文论文模型设计(Model Design)如何撰写及精句摘抄——以入侵检测系统(IDS)为例

    娜璋带你读论文 系列主要是督促自己阅读优秀论文及听取学术讲座 并分享给大家 希望您喜欢 由于作者的英文水平和学术能力不高 需要不断提升 所以还请大家批评指正 非常欢迎大家给我留言评论 学术路上期待与您前行 加油 前一篇从个人角度介绍英文论文
  • 一个对前端程序员比较友好的mock数据工具网址

    由前大搜车公司出品的mock网站 如下 https www easy mock com login 妈妈再也不用担心我从网上找不到假数据了 更多 如何写一个自己的小程序并上线 Github搭建个人博客 2019最新版 亲测 qq加油小程序
  • FreeRTOS操作系统的学习(一)

    操作系统的定义 管理和控制计算机硬件与软件资源的计算机程序 直接运行在 裸机 上的最基本的系统软件 任何其他软件都必须在操作系统的支持下才能运行 其介于APP和硬件之间 2 为什么要使用操作系统 1 与裸机相比 大大提高了CPU的灵活性 2
  • SpringBoot如何将项目打成jar包,并运行jar包呢?

    转自 SpringBoot如何将项目打成jar包 并运行jar包呢 下文笔者讲述springboot将项目打成jar包的方法分享及运行jar包的方法分享 如下所示 实现思路 1 pom中进行相应的build配置 2 运行maven inst
  • 位运算说明

    文章目录 参考文档 表格 来自百度百科 按位与运算符 按位或运算符 异或运算符 取反运算符 左移运算符 lt lt 右移运算符 gt gt 无符号右移运算符 gt gt gt 复合赋值运算符 不同长度的数据进行位运算 参考文档 百度百科 h
  • Windows记事本编码反汇编分析

    转载自 liam page 网上有一个流传多年的段子 这个段子大致是说 若你在简体中文版本的 Windows 系统下 用系统自带的记事本程序 以默认的 ANSI 编码保存 联通 两个字 那么重新打开后 联通 二字就消失了 如果我没记错的话
  • 【JDBC】idea添加mysql-jar包(很轻松)

    添加jar包 官网下载jar包 idea导入jar包 检查 官网下载jar包 官网地址 MySQL Download Connector J 下载完之后解压 打开文件夹 直到见到我们需要的jar包 idea导入jar包 我们复制刚才下载好的
  • 【ERROR】AssertionError: The NVIDIA driver on your system is too old (found version). Please upd

    错误信息 AssertionError The NVIDIA driver on your system is too old found version 10000 Please update your GPU driver by dow
  • spark boot封装,多线程高效执行

    1 简介 众所周知 spark是一个分布式计算引擎 可以将计算数据分不到不同的节点进行计算 但是往往我们的业务都是比较复杂 每天定时跑的时候不只是一个job 可能是有很多的job 但是引擎本身是串行化的 而且对于经验不深的同学 一个业务可能