需求:
用spark来进行数据ETL:
-
清洗:
- 去除json数据中的废弃字段
- 过滤json格式不正确的脏数据
- 过滤日志中的account及deviceid全为空的记录
- 过滤日志中缺少关键字段(properties/eventid/sessionid缺一不可)的记录
- 过滤日志中不符合时间段的记录(由于app上报日志可能的延迟,有数据延迟到达)
- 对于web端日志,过滤爬虫请求数据(通过useragent标识来分析)
-
转换:
- 将json打平,解析成扁平结构,主要是里面json的事件日志
- session分割:
- 对于web端日志,按天然session分割,不需要处理
- 对app日志,由于使用了登录保持技术,导致app进入后台很长时间后,再恢复前台,依然是同一个session,不符合session分析定义,需要按事件间隔切割(业内通用:30分钟)
- 对于wx小程序日志,与app类似,session有效期很长,需要按事件间隔时间切割
- 数据规范处理:
- boolean字段,在数据中有使用1/0/-1标识的,也有使用true/false表示的,统一为Y/N/U
- 字符串类型字段,在数据中有空串,有null值,统一为null值(这个很重要,会影响计算)
- 日期格式统一:2020/9/2 2020-9-2 20200902等都统一变成YYYY-MM-dd
-
集成:
- gps坐标解析为省、市 、县信息,方便后续的地域维度分析
- 若gps找不到的地域信息,使用ip进行解析
- id_mapping:为每个用户生成一个全局唯一标识(给匿名访问,绑定到一个id上,漏斗、留存、session)
操作实现:
1、构建 :一个父工程、两个模块:datahouse数仓系统、usertag用户画像
父工程maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>pub.ryan</groupId>
<artifactId>tm_sensys</artifactId>
<packaging>pom</packaging>
<version>1.01</version>
<modules>
<module>datahouse</module>
<module>usertag</module>
</modules>
<properties>
<spark.version>3.0.1</spark.version>
<lang3.version>3.10</lang3.version>
<fastjson.version>1.2.68</fastjson.version>
</properties>
<!-- 父工程中引入的依赖,所有子模块都会继承-->
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.12</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.48</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
</dependencies>
<!--依赖管理,并不会真正引入依赖,而是约束子模块中对这个依赖的引用-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<!--<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
</exclusions>-->
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<layout>default</layout>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>ali-plugin</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
<updatePolicy>never</updatePolicy>
</snapshots>
<releases>
<enabled>true</enabled>
<updatePolicy>never</updatePolicy>
</releases>
</pluginRepository>
</pluginRepositories>
<build>
<plugins>
<!-- 指定编译java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- 把依赖jar中的用到的类,提取到自己的jar中 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<mainClass>pub.ryan.dw.etl.DeviceIdAccountBind</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly-->
<executions>
<execution>
<id>make-assemble</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
datahouse数仓pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>tm_sensys</artifactId>
<groupId>pub.ryan</groupId>
<version>1.01</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>datahouse</artifactId>
<dependencies>
<dependency>
<groupId>org.openx.data</groupId>
<artifactId>json-serde</artifactId>
<version>1.3.8</version>
</dependency>
<dependency>
<groupId>ch.hsr</groupId>
<artifactId>geohash</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.lionsoul</groupId>
<artifactId>ip2region</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
</project>
测试:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)