Spark Streaming实战对论坛网站动态行为pv,uv,注册人数,跳出率的多维度分析,实时统计每天pv,uv的sparkStreaming结合redis结果存入mysql供前端展示

2023-10-26

论坛数据运行代码自动生成,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从Kafka中在线Pull到论坛或者网站的用户在线行为信息,进而进行多维度的在线分析
数据格式如下:
date:日期,格式为yyyy-MM-dd
timestamp:时间戳
userID:用户ID
pageID:页面ID
chanelID:板块的ID
action:点击和注册

生成的用户点击模拟数据如下:

product:2017-06-20      1497948113817   1397    91      ML      View
product:2017-06-20      1497948113819   149     1941    ML      Register
product:2017-06-20      1497948113820   null    335     Spark   Register
product:2017-06-20      1497948113821   1724    1038    ML      View
product:2017-06-20      1497948113822   282     494     Flink   View
product:2017-06-20      1497948113823   null    1619    ML      View
product:2017-06-20      1497948113823   991     1950    ML      View
product:2017-06-20      1497948113824   686     1347    Kafka   Register
product:2017-06-20      1497948113825   1982    1145    Hive    View
product:2017-06-20      1497948113826   211     1097    Storm   View
product:2017-06-20      1497948113827   633     1345    Hive    View
product:2017-06-20      1497948113828   957     1381    Hadoop  Register
product:2017-06-20      1497948113831   300     1781    Spark   View
product:2017-06-20      1497948113832   1244    1076    Hadoop  Register
product:2017-06-20      1497948113833   1958    634     ML      View

生成模拟数据代码:

package org.apache.spark.examples.streaming;
 
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
/**
 * 这里产生数据,就会发送给kafka,kafka那边启动消费者,就会接收到数据,这一步是用来测试生成数据和消费数据没有问题的,确定没问题后要关闭消费者,
 * 启动OnlineBBSUserLogss.java的类作为消费者,就会按pv,uv等方式处理这些数据。
 * 因为一个topic只能有一个消费者,所以启动程序前必须关闭kafka方式启动的消费者(我这里没有关闭关闭kafka方式启动的消费者也没正常啊) 
 */
public class SparkStreamingDataManuallyProducerForKafkas extends Thread{
    
    //具体的论坛频道
    static String[] channelNames = new  String[]{
        "Spark","Scala","Kafka","Flink","Hadoop","Storm",
        "Hive","Impala","HBase","ML"
    };
    //用户的两种行为模式
    static String[] actionNames = new String[]{"View", "Register"};
    private static Producer<String, String> producerForKafka;
    private static String dateToday;
    private static Random random;
    
    //2、作为线程而言,要复写run方法,先写业务逻辑,再写控制
    @Override
    public void run() {
        int counter = 0;//搞500条
        while(true){//模拟实际情况,不断循环,异步过程,不可能是同步过程
           counter++;
          String userLog = userlogs();
          System.out.println("product:"+userLog);
          //"test"为topic
          producerForKafka.send(new KeyedMessage<String, String>("test", userLog));
          if(0 == counter%500){
                counter = 0;
                try {
                   Thread.sleep(1000);
                } catch (InterruptedException e) {
                   // TODO Auto-generated catch block
                   e.printStackTrace();
                }
            }
        }
    }
        
    private static String userlogs() {
        StringBuffer userLogBuffer = new StringBuffer("");
        int[] unregisteredUsers = new int[]{1, 2, 3, 4, 5, 6, 7, 8};
        long timestamp = new Date().getTime();
            Long userID = 0L;
            long pageID = 0L;
            //随机生成的用户ID 
            if(unregisteredUsers[random.nextInt(8)] == 1) {
               userID = null;
            } else {
               userID = (long) random.nextInt((int) 2000);
            }
            //随机生成的页面ID
            pageID =  random.nextInt((int) 2000);          
            //随机生成Channel
            String channel = channelNames[random.nextInt(10)];
            //随机生成action行为
            String action = actionNames[random.nextInt(2)];
            
            userLogBuffer.append(dateToday)
                        .append("\t")
                        .append(timestamp)
                        .append("\t")
                        .append(userID)
                        .append("\t")
                        .append(pageID)
                        .append("\t")
                        .append(channel)
                        .append("\t")
                        .append(action);   //这里不要加\n换行符,因为kafka自己会换行,再append一个换行符,消费者那边就会处理不出数据
        return userLogBuffer.toString();
    }
    
    public static void main(String[] args) throws Exception {
      dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
      random = new Random();
        Properties props = new Properties();
        props.put("zk.connect", "h71:2181,h72:2181,h73:2181");
        props.put("metadata.broker.list","h71:9092,h72:9092,h73:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig config = new ProducerConfig(props);
        producerForKafka = new Producer<String, String>(config);
        new SparkStreamingDataManuallyProducerForKafkas().start(); 
    }
}

pv,uv,注册人数,跳出率的多维度分析代码:

package org.apache.spark.examples.streaming;
 
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
 
import kafka.serializer.StringDecoder;
 
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
 
import scala.Tuple2;
 
/*
 *消费者消费SparkStreamingDataManuallyProducerForKafka类中逻辑级别产生的数据,这里是计算pv,uv,注册人数,跳出率的方式
 */
public class OnlineBBSUserLogss {
 
   public static void main(String[] args) {
       /**
         * 第一步:配置SparkConf:
         * 1,至少2条线程:因为Spark Streaming应用程序在运行的时候,至少有一条
         * 线程用于不断的循环接收数据,并且至少有一条线程用于处理接受的数据(否则的话无法
         * 有线程用于处理数据,随着时间的推移,内存和磁盘都会不堪重负);
         * 2,对于集群而言,每个Executor一般肯定不止一个Thread,那对于处理Spark Streaming的
         * 应用程序而言,每个Executor一般分配多少Core比较合适?根据我们过去的经验,5个左右的
         * Core是最佳的(一个段子分配为奇数个Core表现最佳,例如3个、5个、7个Core等);
         */
//      SparkConf conf = new SparkConf().setMaster("spark://h71:7077").setAppName("OnlineBBSUserLogs");
      SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]");
      /**
         * 第二步:创建SparkStreamingContext:
         * 1,这个是SparkStreaming应用程序所有功能的起始点和程序调度的核心
         * SparkStreamingContext的构建可以基于SparkConf参数,也可基于持久化的SparkStreamingContext的内容
         * 来恢复过来(典型的场景是Driver崩溃后重新启动,由于Spark Streaming具有连续7*24小时不间断运行的特征,
         * 所有需要在Driver重新启动后继续上次的状态,此时的状态恢复需要基于曾经的Checkpoint);
         * 2,在一个Spark Streaming应用程序中可以创建若干个SparkStreamingContext对象,使用下一个SparkStreamingContext
         * 之前需要把前面正在运行的SparkStreamingContext对象关闭掉,由此,我们获得一个重大的启发SparkStreaming框架也只是
         * Spark Core上的一个应用程序而已,只不过Spark Streaming框架箱运行的话需要Spark工程师写业务逻辑处理代码;
         */
      JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
 
      /**
         * 第三步:创建Spark Streaming输入数据来源input Stream:
         * 1,数据输入来源可以基于File、HDFS、Flume、Kafka、Socket等
         * 2, 在这里我们指定数据来源于网络Socket端口,Spark Streaming连接上该端口并在运行的时候一直监听该端口
         *         的数据(当然该端口服务首先必须存在),并且在后续会根据业务需要不断的有数据产生(当然对于Spark Streaming
         *         应用程序的运行而言,有无数据其处理流程都是一样的); 
         * 3,如果经常在每间隔5秒钟没有数据的话不断的启动空的Job其实是会造成调度资源的浪费,因为并没有数据需要发生计算,所以
         *         实例的企业级生成环境的代码在具体提交Job前会判断是否有数据,如果没有的话就不再提交Job;
         */
      Map<String, String> kafkaParameters = new HashMap<String, String>();
      kafkaParameters.put("metadata.broker.list","h71:9092,h72:9092,h73:9092");
      Set topics = new HashSet<String>();
      topics.add("test");
      JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
                jsc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParameters, 
                topics);
      //在线PV计算
      onlinePagePV(lines);
      //在线UV计算
//      onlineUV(lines);
      //在线计算注册人数
//      onlineRegistered(lines);
      //在线计算跳出率
//      onlineJumped(lines);
      //在线不同模块的PV
//      onlineChannelPV(lines);
      
      /*
       * Spark Streaming执行引擎也就是Driver开始运行,Driver启动的时候是位于一条新的线程中的,当然其内部有消息循环体,用于
       * 接受应用程序本身或者Executor中的消息;
       */
      jsc.start();
      jsc.awaitTermination();
      jsc.close();
   }
 
   private static void onlineChannelPV(JavaPairInputDStream<String, String> lines) {
      lines.mapToPair(new PairFunction<Tuple2<String,String>, String, Long>() {
         @Override
         public Tuple2<String, Long> call(Tuple2<String,String> t) throws Exception {
            String[] logs = t._2.split("\t");
            String channelID =logs[4];
            return new Tuple2<String,Long>(channelID, 1L);
         }
      }).reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
         @Override
         public Long call(Long v1, Long v2) throws Exception {
            return v1 + v2;
         }
      }).print();
   }
 
   private static void onlineJumped(JavaPairInputDStream<String, String> lines) {
      lines.filter(new Function<Tuple2<String,String>, Boolean>() {
         @Override
         public Boolean call(Tuple2<String, String> v1) throws Exception {
            String[] logs = v1._2.split("\t");
            String action = logs[5];
            if("View".equals(action)){
               return true;
            } else {
               return false;
            }
         }
      }).mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() {
         @Override
         public Tuple2<Long, Long> call(Tuple2<String,String> t) throws Exception {
            String[] logs = t._2.split("\t");
         // Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1"); 这个有错
            Long usrID = Long.valueOf("null".equals(logs[2])  ? "-1" : logs[2]);
            return new Tuple2<Long,Long>(usrID, 1L);
         }
      }).reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
         @Override
         public Long call(Long v1, Long v2) throws Exception {
            return v1 + v2;
         }
      }).filter(new Function<Tuple2<Long,Long>, Boolean>() {
         @Override
         public Boolean call(Tuple2<Long, Long> v1) throws Exception {
            if(1 == v1._2){
               return true;
            } else {
               return false;
            }
         }
      }).count().print();
   }
 
   private static void onlineRegistered(JavaPairInputDStream<String, String> lines) {
      lines.filter(new Function<Tuple2<String,String>, Boolean>() {
         @Override
         public Boolean call(Tuple2<String, String> v1) throws Exception {
            String[] logs = v1._2.split("\t");
            String action = logs[5];
            if("Register".equals(action)){
               return true;
            } else {
               return false;
            }
         }
      }).count().print();
   }
 
   /**
    * 因为要计算UV,所以需要获得同样的Page的不同的User,这个时候就需要去重操作,DStreamzhong有distinct吗?当然没有(截止到Spark 1.6.1的时候还没有该Api)
    * 此时我们就需要求助于DStream魔术般的方法tranform,在该方法内部直接对RDD进行distinct操作,这样就是实现了用户UserID的去重,进而就可以计算出UV了。
    * @param lines
    */
   private static void onlineUV(JavaPairInputDStream<String, String> lines) {
      /*
       * 第四步:接下来就像对于RDD编程一样基于DStream进行编程!!!原因是DStream是RDD产生的模板(或者说类),在Spark Streaming具体
       * 发生计算前,其实质是把每个Batch的DStream的操作翻译成为对RDD的操作!!!
       * 对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
       */
      JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() {
         @Override
         public Boolean call(Tuple2<String, String> v1) throws Exception {
            String[] logs = v1._2.split("\t");
            String action = logs[5];
            if("View".equals(action)){
               return true;
            } else {
               return false;
            }
         }
      });
      
      //在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
      logsDStream.map(new Function<Tuple2<String,String>,String>(){
         @Override
         public String call(Tuple2<String, String> v1) throws Exception {
            String[] logs =v1._2.split("\t");
            String usrID = String.valueOf(logs[2] != null ? logs[2] : "-1" );
            //原文是Long usrID = Long.valueOf(logs[2] != null ? logs[2] : "-1" );
            //报错:java.lang.NumberFormatException: For input string: "null"
            Long pageID = Long.valueOf(logs[3]);
            return pageID+"_"+usrID;
         }
      }).transform(new Function<JavaRDD<String>,JavaRDD<String>>(){
         @Override
         public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
            // TODO Auto-generated method stub
            return v1.distinct();
         }
      }).mapToPair(new PairFunction<String, Long, Long>() {
         @Override
         public Tuple2<Long, Long> call(String t) throws Exception {
            String[] logs = t.split("_");
            Long pageId = Long.valueOf(logs[0]);
            return new Tuple2<Long,Long>(pageId, 1L);
         }
      }).reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
         @Override
         public Long call(Long v1, Long v2) throws Exception {
            return v1 + v2;
         }
      }).print();
   }
 
   private static void onlinePagePV(JavaPairInputDStream<String, String> lines) {
      /*
       * 第四步:接下来就像对于RDD编程一样基于DStream进行编程!!!原因是DStream是RDD产生的模板(或者说类),在Spark Streaming具体
       * 发生计算前,其实质是把每个Batch的DStream的操作翻译成为对RDD的操作!!!
       * 对初始的DStream进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算
       */
      JavaPairDStream<String, String> logsDStream = lines.filter(new Function<Tuple2<String,String>, Boolean>() {
         @Override
         public Boolean call(Tuple2<String, String> v1) throws Exception {
            String[] logs = v1._2.split("\t");
            String action = logs[5];
            if("View".equals(action)){
               return true;
            } else {
               return false;
            }
         }
      });
      
      //在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1)
      JavaPairDStream<Long, Long> pairs = logsDStream.mapToPair(new PairFunction<Tuple2<String,String>, Long, Long>() {
         @Override
         public Tuple2<Long, Long> call(Tuple2<String, String> t) throws Exception {
            String[] logs = t._2.split("\t");
            Long pageId = Long.valueOf(logs[3]);
            return new Tuple2<Long,Long>(pageId, 1L);
         }
      });
      //在单词实例计数为1基础上,统计每个单词在文件中出现的总次数
      JavaPairDStream<Long, Long> wordsCount = pairs.reduceByKey(new Function2<Long, Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
        //对相同的key,进行Value的累加(包括Local和Reducer级别同时Reduce)
         @Override
         public Long call(Long v1, Long v2) throws Exception {
            return v1 + v2;
         }
      });
      
      /*
       * 此处的print并不会直接出发Job的执行,因为现在的一切都是在Spark Streaming框架的控制之下的,对于Spark Streaming
       * 而言具体是否触发真正的Job运行是基于设置的Duration时间间隔的
       * 
       * 诸位一定要注意的是Spark Streaming应用程序要想执行具体的Job,对Dtream就必须有output Stream操作,
       * output Stream有很多类型的函数触发,类print、saveAsTextFile、saveAsHadoopFiles等,最为重要的一个
       * 方法是foraeachRDD,因为Spark Streaming处理的结果一般都会放在Redis、DB、DashBoard等上面,foreachRDD
       * 主要就是用用来完成这些功能的,而且可以随意的自定义具体数据到底放在哪里!!!
       *
       * 在企業生產環境下,一般會把計算的數據放入Redis或者DB中,采用J2EE等技术进行趋势的绘制等,这就像动态更新的股票交易一下来实现
       * 在线的监控等;
       */
      wordsCount.print();
   }
}

启动hadoop、spark、zookeeper、kafka集群(启动过程就不多言了)这里把我使用的版本列出:
hadoop         hadoop-2.6.0-cdh5.5.2
kafka              kafka_2.10-0.8.2.0
spark             spark-1.3.1-bin-hadoop2.6(后来我又装了spark-1.6.0-bin-hadoop2.6也行)
zookeeper     zookeeper-3.4.5-cdh5.5.2

java                 jdk1.7.0_25

在myeclipse中创建项目:

(这里我吐槽一下,在myeclipse-8.5和myeclipse-10.7.1版本中只能识别spark-1.3.1-bin-hadoop2.6的jar包却无法识别spark-1.6.0-bin-hadoop2.6的jar包,虽然用spark-1.3.1-bin-hadoop2.6的jar包也能正常运行不影响什么,但有强迫症的我咋能忍,无奈我下载了个myeclipse-pro-2014-GA版本(你下载最新的版本应该也可以吧)才这两个版本spark的jar包都识别,我尼玛也是醉了。。。)

将该项目打成streaming.jar包上从本地上传到虚拟机上,我这里是上传到了/home/hadoop/spark-1.3.1-bin-hadoop2.6目录中

第一步:kafka建立topic

[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper h71:2181 --replication-factor 2 --partitions 2 --topic test

(如果不创建该topic的话,也倒无妨,因为你如果先直接运行SparkStreamingDataManuallyProducerForKafkas.java的时候会自动创建topic,如果是先运行的OnlineBBSUserLogss.java的话虽然第一次会报错:Exception in thread "main" org.apache.spark.SparkException: org.apache.spark.SparkException: Couldn't find leader offsets for Set(),但是它已经为你创建了该topic,再运行的话则不会报错了,只不过他们创建的该topic都默认分区和副本都为1)

第二步:运行SparkStreamingDataManuallyProducerForKafka

[hadoop@h71 spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar

会报错:

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/producer/ProducerConfig
        at com.spark.study.streaming.SparkStreamingDataManuallyProducerForKafkas.main(SparkStreamingDataManuallyProducerForKafkas.java:102)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.producer.ProducerConfig
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 10 more

解决:
第一种方法:

在spark-env.sh中添加如下内容:

[hadoop@h71 spark-1.3.1-bin-hadoop2.6]$ vi conf/spark-env.sh
export SPARK_HOME=/home/hadoop/spark-1.3.1-bin-hadoop2.6
export SPARK_CLASSPATH=$SPARK_HOME/lib/*

再运行SparkStreamingDataManuallyProducerForKafka

[hadoop@h71 spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar

但是这种方法不是很好,因为再运行OnlineBBSUserLogss的时候会显示如下内容但不影响运行:

[hadoop@h71 spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar

17/06/21 22:49:46 WARN spark.SparkConf: 
SPARK_CLASSPATH was detected (set to '/home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/*').
This is deprecated in Spark 1.0+.
 
Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

第二种方法:(推荐使用这种)
上面不是都已经提示了嘛,Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath

所以运行如下命令:

[hadoop@h71 spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.SparkStreamingDataManuallyProducerForKafkas --executor-memory 500m --total-executor-cores 2 streaming.jar --driver-class-path /home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar

运行该命令后会产生数据写入到kafka中,再执行

[hadoop@h71 spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar --driver-class-path /home/hadoop/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar

注意:在spark-1.6.0-bin-hadoop2.6版本中--driver-class-path的位置还不能放在最后,否则无法识别,运行命令为

[hadoop@h71 spark-1.6.0-bin-hadoop2.6]$ bin/spark-submit --master spark://h71:7077 --name JavaWordCountByHQ --driver-class-path /home/hadoop/spark-1.6.0-bin-hadoop2.6/lib/spark-examples-1.6.0-hadoop2.6.0.jar --class org.apache.spark.examples.streaming.OnlineBBSUserLogss --executor-memory 500m --total-executor-cores 2 streaming.jar

OnlineBBSUserLogs成功消费数据,并统计出数值,实验成功

.......
 
16/05/08 19:00:33 INFO scheduler.DAGScheduler: Job 2 finished: print at OnlineBBSUserLogs.java:113, took 0.385315 s
-------------------------------------------
Time: 1462705200000 ms
-------------------------------------------
(Flink,89)
(Storm,99)
(Scala,97)
(HBase,107)
(Spark,91)
(Hadoop,108)
(Hive,129)
(Impala,82)
(Kafka,101)
(ML,97)
...

知识点:
1、创建kafka的createDirectStream,返回JavaPairInputDStream类型的line值
org.apache.spark.streaming.kafka.createDirectStream 源代码
package org.apache.spark.streaming.kafka
 
/**
   * Create an input stream that directly pulls messages from Kafka Brokers
   * without using any receiver. This stream can guarantee that each message
   * from Kafka is included in transformations exactly once (see points below).
   *
   * Points to note:
   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
   *    You can access the offsets used in each batch from the generated RDDs (see
   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the [[StreamingContext]]. The information on consumed offset can be
   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
   *  - End-to-end semantics: This stream ensures that every records is effectively received and
   *    transformed exactly once, but gives no guarantees on whether the transformed data are
   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
   *    that the output operation is idempotent, or use transactions to output records atomically.
   *    See the programming guide for more details.
   *
   * @param jssc JavaStreamingContext object
   * @param keyClass Class of the keys in the Kafka records
   * @param valueClass Class of the values in the Kafka records
   * @param keyDecoderClass Class of the key decoder
   * @param valueDecoderClass Class type of the value decoder
   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
   *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
   *   host1:port1,host2:port2 form.
   *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
   *   to determine where the stream starts (defaults to "largest")
   * @param topics Names of the topics to consume
   * @tparam K type of Kafka message key
   * @tparam V type of Kafka message value
   * @tparam KD type of Kafka message key decoder
   * @tparam VD type of Kafka message value decoder
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V]](
      jssc: JavaStreamingContext,
      keyClass: Class[K],
      valueClass: Class[V],
      keyDecoderClass: Class[KD],
      valueDecoderClass: Class[VD],
      kafkaParams: JMap[String, String],
      topics: JSet[String]
    ): JavaPairInputDStream[K, V] = {
    implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
    implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
    implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
    implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
    createDirectStream[K, V, KD, VD](
      jssc.ssc,
      Map(kafkaParams.asScala.toSeq: _*),
      Set(topics.asScala.toSeq: _*)
    )
  }
}

2、读取kafka的数据流的值以后,进行相关mapToPair、reduceByKey的操作
mapToPair-reduceByKey-PairFunction-Function2的源代码
package org.apache.spark.api.java.function.PairFunction 
/**
 * A function that returns key-value pairs (Tuple2<K, V>), and can be used to
 * construct PairRDDs.
 */
public interface PairFunction<T, K, V> extends Serializable {
  public Tuple2<K, V> call(T t) throws Exception;
}
 
 
 
package org.apache.spark.api.java.function.Function2
/**
 * A two-argument function that takes arguments of type T1 and T2 and returns an R.
 */
public interface Function2<T1, T2, R> extends Serializable {
  public R call(T1 v1, T2 v2) throws Exception;
}
 
 
 
package org.apache.spark.streaming.api.java.reduceByKey
/**
   * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
   * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
   * with Spark's default number of partitions.
   */
  def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
    dstream.reduceByKey(func)
 
 
package org.apache.spark.streaming.api.java.mapToPair
 
 /** Return a new DStream by applying a function to all elements of this DStream. */
  def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
    def cm: ClassTag[(K2, V2)] = fakeClassTag
    new JavaPairDStream(dstream.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])
  }

 

 

-----------------------------------------------------------------------------------------------------------------------------------------------------

最近有个需求,实时统计pv,uv,结果按照date,hour,pv,uv来展示,按天统计,第二天重新统计,当然了实际还需要按照类型字段分类统计pv,uv,比如按照date,hour,pv,uv,type来展示。这里介绍最基本的pv,uv的展示。

id    uv    pv    date    hour
1    155599    306053    2018-07-27    18
关于什么是pv,uv,可以参见这篇博客https://blog.csdn.net/petermsh/article/details/78652246

1、项目流程

日志数据从flume采集过来,落到hdfs供其它离线业务使用,也会sink到kafka,sparkStreaming从kafka拉数据过来,计算pv,uv,uv是用的redis的set集合去重,最后把结果写入mysql数据库,供前端展示使用。

2、具体过程
1)pv的计算
拉取数据有两种方式,基于received和direct方式,这里用direct直拉的方式,用的mapWithState算子保存状态,这个算子与updateStateByKey一样,并且性能更好。当然了实际中数据过来需要经过清洗,过滤,才能使用。

定义一个状态函数

// 实时流量状态更新函数
  val mapFunction = (datehour:String, pv:Option[Long], state:State[Long]) => {
    val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)
    val output = (datehour,accuSum)
    state.update(accuSum)
    output
  }
1
2
3
4
5
6
7
 计算pv
 val stateSpec = StateSpec.function(mapFunction)
 val helper_count_all = helper_data.map(x => (x._1,1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
1
2
3
这样就很容易的把pv计算出来了。

2)uv的计算
uv是要全天去重的,每次进来一个batch的数据,如果用原生的reduceByKey或者groupByKey对配置要求太高,在配置较低情况下,我们申请了一个93G的redis用来去重,原理是每进来一条数据,将date作为key,guid加入set集合,20秒刷新一次,也就是将set集合的尺寸取出来,更新一下数据库即可。

helper_data.foreachRDD(rdd => {
        rdd.foreachPartition(eachPartition => {
        // 获取redis连接
          val jedis = getJedis
          eachPartition.foreach(x => {
            val date:String = x._1.split(":")(0)
            val key = date
            // 将date作为key,guid(x._2)加入set集合
            jedis.sadd(key,x._2)
            // 设置存储每天的数据的set过期时间,防止超过redis容量,这样每天的set集合,定期会被自动删除
            jedis.expire(key,ConfigFactory.rediskeyexists)
          })
          // 关闭连接
          closeJedis(jedis)
        })
      })
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
3)结果保存到数据库
结果保存到mysql,数据库,20秒刷新一次数据库,前端展示刷新一次,就会重新查询一次数据库,做到实时统计展示pv,uv的目的。

/**
    * 插入数据
    * @param data (addTab(datehour)+helperversion)
    * @param tbName
    * @param colNames
    */
  def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {
    data.foreachRDD(rdd => {
      val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt)
      if (!rdd.isEmpty()) {
        val hour_now = tmp_rdd.max() // 获取当前结果中最大的时间,在数据恢复中可以起作用
        rdd.foreachPartition(eachPartition => {
          try {
            val jedis = getJedis
            val conn = MysqlPoolUtil.getConnection()
            conn.setAutoCommit(false)
            val stmt = conn.createStatement()
            eachPartition.foreach(x => {
              val datehour = x._1.split("\t")(0)
              val helperversion = x._1.split("\t")(1)
              val date_hour = datehour.split(":")
              val date = date_hour(0)
              val hour = date_hour(1).toInt

              val colName0 = colNames(0) // date
              val colName1 = colNames(1) // hour
              val colName2 = colNames(2) // count_all
              val colName3 = colNames(3) // count
              val colName4 = colNames(4) // helperversion
              val colName5 = colNames(5) // datehour
              val colName6 = colNames(6) // dh

              val colValue0 = addYin(date)
              val colValue1 = hour
              val colValue2 = x._2.toInt
              val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22
              val colValue4 = addYin(helperversion)
              var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
              val colValue6 = if(hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"

              var sql = ""
              if (hour == hour_now) { // uv只对现在更新
                sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5}) on duplicate key update ${colName2} =  ${colValue2},${colName3} = ${colValue3}"
              } else {
                sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5}) on duplicate key update ${colName2} =  ${colValue2}"
              }
              stmt.addBatch(sql)
            })
            closeJedis(jedis)
            stmt.executeBatch() // 批量执行sql语句
            conn.commit()
            conn.close()
          } catch {
            case e: Exception => {
              logger.error(e)
              logger2.error(HelperHandle.getClass.getSimpleName + e)
            }
          }
        })
      }
    })
  }

// 计算当前时间距离次日零点的时长(毫秒)
def resetTime = {
    val now = new Date()
    val todayEnd = Calendar.getInstance
    todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小时制
    todayEnd.set(Calendar.MINUTE, 59)
    todayEnd.set(Calendar.SECOND, 59)
    todayEnd.set(Calendar.MILLISECOND, 999)
    todayEnd.getTimeInMillis - now.getTime
 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
4)数据容错
流处理消费kafka都会考虑到数据丢失问题,一般可以保存到任何存储系统,包括mysql,hdfs,hbase,redis,zookeeper等到。这里用SparkStreaming自带的checkpoint机制来实现应用重启时数据恢复。

checkpoint
这里采用的是checkpoint机制,在重启或者失败后重启可以直接读取上次没有完成的任务,从kafka对应offset读取数据。

// 初始化配置文件
ConfigFactory.initConfig()

val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
conf.set("spark.streaming.kafka.maxRatePerPartition",consumeRate)
conf.set("spark.default.parallelism","24")
val sc = new SparkContext(conf)

while (true){
    val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0),getStreamingContext _ )
    ssc.start()
    ssc.awaitTerminationOrTimeout(resetTime)
    ssc.stop(false,true)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
checkpoint是每天一个目录,在第二天凌晨定时销毁StreamingContext对象,重新统计计算pv,uv。

注意
ssc.stop(false,true)表示优雅地销毁StreamingContext对象,不能销毁SparkContext对象,ssc.stop(true,true)会停掉SparkContext对象,程序就直接停了。

应用迁移或者程序升级
在这个过程中,我们把应用升级了一下,比如说某个功能写的不够完善,或者有逻辑错误,这时候都是需要修改代码,重新打jar包的,这时候如果把程序停了,新的应用还是会读取老的checkpoint,可能会有两个问题:

执行的还是上一次的程序,因为checkpoint里面也有序列化的代码;
直接执行失败,反序列化失败;
其实有时候,修改代码后不用删除checkpoint也是可以直接生效,经过很多测试,我发现如果对数据的过滤操作导致数据过滤逻辑改变,还有状态操作保存修改,也会导致重启失败,只有删除checkpoint才行,可是实际中一旦删除checkpoint,就会导致上一次未完成的任务和消费kafka的offset丢失,直接导致数据丢失,这种情况下我一般这么做。

这种情况一般是在另外一个集群,或者把checkpoint目录修改下,我们是代码与配置文件分离,所以修改配置文件checkpoint的位置还是很方便的。然后两个程序一起跑,除了checkpoint目录不一样,会重新建,都插入同一个数据库,跑一段时间后,把旧的程序停掉就好。以前看官网这么说,只能记住不能清楚明了,只有自己做时才会想一下办法去保证数据准确。

5)日志
日志用的log4j2,本地保存一份,ERROR级别的日志会通过邮件发送到手机。

val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)
  // 邮件level=error日志
  val logger2 = LogManager.getLogger("email")
 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Streaming实战对论坛网站动态行为pv,uv,注册人数,跳出率的多维度分析,实时统计每天pv,uv的sparkStreaming结合redis结果存入mysql供前端展示 的相关文章

  • Caffe各层的参数设置(在prototxt文件中设置)

    Caffe各层的参数设置 在prototxt文件中设置 这些参数定义在include caffe proto caffe proto文件中 如果要查看某种层的参数 请展开本文的目录 点击相应的条目来跳转 syntax proto2 pack
  • CSS字体、文本属性

    CSS字体 文本属性 字体系列 font family设置字体 字体大小 字体粗细 文字样式 字体复合属性 字体属性总结 文本系列 文本属性 文本颜色 对齐文本 装饰文本 文本缩进 行间距 字体系列 font family设置字体 cs使用

随机推荐

  • C++ 调试 The breakpoint will not currently be hit. No symbols have been loaded for this document.

    C debug调试 The breakpoint will not currently be hit No symbols have been loaded for this document 1 编译 以Test为例 右键Properti
  • Nginx解决“no resolver defined to resolve xxx.xxx”

    1 2 3 4 5 6 7 8 9 10
  • AI制作ICON展示

    作者 陈石军 撰写时间 2019年4月7日 我先做了个背景色 这个背景色我用了三种颜色 它们分别为白色 fdfdfd 蓝色 94cfe2 绿色 72c190 背景色是由一个矩形和俩个形状图形组成的 接下来就是排版了 排版有好几种 分别是靠左
  • Scratch第一讲:scratch编程软件介绍

    喜欢编程的各位小朋友们你们好呀 欢迎来到scratch小课堂 从今天起 我们要从0开始学习scratch编程 那么有的同学要问了 什么是scratch Scratch是由麻省理工学院 MIT 设计开发的一款面向少年的简易编程工具 它的功能非
  • Linux线程性能分析和CPU亲和力

    一 线程迁移和负载均衡 Linux系统在多核CPU和SMP系统上有完善的负载均衡支持 在SMP系统中 每个CPU的核都有一个迁移线程守护程序migration 一般是系统最高优先级139 实时99 以实现执行资源平衡作业 当我们调用sche
  • 5g信号云端服务器,5G基站已有11W 国内云游戏迎来春天

    目前有报道称全国已经开通了11 3W个5G网络基站 已有87万户5G签约用户 这意味着在全国范围 有关需要网络的IT产品和生活产品都将迎来春天 其中包括网络连接使用的云游戏 进入到2019年 国内5G商用全面启动 华为 小米 OPPO等手机
  • QString : 类型转换,不留神就留坑?

    QString作为Qt中内置的数据类型 功能强大且使用方便 绝对是在Qt开发过程中出场率最高的数据类型 本篇我们只重点探讨下QString转换成其他数据类型的注意事项 short toShort bool ok nullptr int ba
  • gg修改器修改数值没有用怎么办_gg修改器修改游戏数值教程_gg修改器怎么修改数值_3DM手游...

    GG修改器是很多玩家都在用的一款游戏辅助工具 使用这款软件 能够对多种游戏的数值进行随意的修改 调整成你所需要的数值 让你玩游戏玩的更爽 今天3DM小编为大家带来的是GG修改器修改游戏数值的教程 有需要的小伙伴们可以来一起了解下 GG修改器
  • Android事件分发机制及设计思路,熬了整整30天

    前言 想要成为一名优秀的Android开发 你需要一份完备的知识体系 在这里 让我们一起成长为自己所想的那样 此篇文章是初中高级工程师学习文章 知识体系较为完整 有如下特点 1 知识结构全面 2 跟随当下技术潮流实时更新 3 可用于面试 学
  • mybatis

    mybatis 起步1 之前的mybatis写法 起步2 接口式编程写法 mybatis的配置 properties settings mapUnderscoreToCamelCase typeAliases mappers 这里项目结构发
  • (三)系统与架构级低功耗设计

    前面讲解了使用EDA工具 主要是power compiler 进行功耗分析的流程 这里我们将介绍在数字IC中进行低功耗设计的方法 同时也结合EDA工具 主要是Design Compiler 如何实现 我们的讲解的低功耗设计主要是自顶向下的设
  • 笔录Flutter(十一) FloatingActionButton

    Flutter练习Demo FloatingActionButton也是经常用的 除了常见的悬浮在右下角的一个按钮 还可以利用floatingActionButtonLocation属性 控制位置的展示 floatingActionButt
  • Python:使用爬虫抓取网页中的视频并下载(完整源码)

    Python 使用爬虫抓取网页中的视频并下载 完整源码 在今天的程序开发世界中 网站是不可或缺的一部分 人们使用网站来获取有用的信息 购买商品和娱乐自己 这些网站的内容通常包含了各种类型的文件 其中最常见的就是视频 对于有经验的程序开发者来
  • 黑马JVM总结(八)

    1 StringTable面试题 1 8 1 6时 2 StringTable的位置 jvm1 6时StringTable是常量池的一部分 它随着常量池存储在永久代当中 在1 7 1 8中从永久代变成了堆中 为什么做这个更改呢 因为永久代的
  • 关于javascript md5 函数介绍

    转自 微点阅读 https www weidianyuedu com var hexcase 1 var b64pad var chrsz 8 var mode 16 模式选择 16为16位的加密 32 为32位的加密 function p
  • Eureka的常用配置讲解

    1 关闭自我保护 保护模式主要用于一组客户端和Eureka Server之间存在网络分区场景时 一旦进入保护模式 Eureka Server将会尝试保护其服务的注册表中的信息 不在删除服务注册表中的数据 当网络故障恢复后 Eureka Se
  • 外包四年太差劲,才幡然醒悟要跳槽

    前几天有个读者过来说 程序猿 外包干了四年太差劲了 感觉和外界差距有点大 现在被动醒悟 希望你能帮我制定一下学习路线 如果不是女朋友和我提分手 我估计现在还没醒悟 大专生 18年通过校招进入湖南某软件公司 干了3年多的CRUD 今年年初 感
  • VS--屏蔽编译warning警告设置

    VS 屏蔽编译warning警告设置 在 项目 gt 属性 gt 配置属性 gt C C gt 高级 的 禁用特定警告 中添加相应的警告编号 如4819
  • 机器学习-前期知识储备

    1 什么是机器学习 机器学习 利用数学模型来理解数据 发现数据中的规律 用作数据的分析和预测 数据通常由一组向量组成 这组向量中的每个向量都是一个样本 我们用 x i x i xi 来表示一个样本 其中 i
  • Spark Streaming实战对论坛网站动态行为pv,uv,注册人数,跳出率的多维度分析,实时统计每天pv,uv的sparkStreaming结合redis结果存入mysql供前端展示

    论坛数据运行代码自动生成 该生成的数据会作为Producer的方式发送给Kafka 然后SparkStreaming程序会从Kafka中在线Pull到论坛或者网站的用户在线行为信息 进而进行多维度的在线分析 数据格式如下 date 日期 格