国内最全IT社区平台 联系我们 | 收藏本站
华晨云阿里云优惠2
您当前位置:首页 > 服务器 > Flume+Hadoop+Hive的离线分析系统基本架构

Flume+Hadoop+Hive的离线分析系统基本架构

来源:程序员人生   发布时间:2016-06-21 11:39:00 阅读次数:3900次
      PS:历史缘由作者账号名为:ymh198816,但事实上作者的生日其实不是1988年1月6日偷笑

      最近在学习大数据的离线分析技术,所以在这里通过做1个简单的网站点击流数据分析离线系统来和大家1起梳理1下离线分析系统的架构模型。固然这个架构模型只能是离线分析技术的1个简单的入门级架构,实际生产环境中的大数据离线分析技术还触及到很多细节的处理和高可用的架构。这篇文章的目的只是带大家入个门,让大家对离线分析技术有1个简单的认识,并和大家1起做学习交换。

离线分析系统的结构图
    

      全部离线分析的整体架构就是使用FlumeFTP服务器上收集日志文件,并存储在Hadoop HDFS文件系统上,再接着用Hadoopmapreduce清洗日志文件,最后使用HIVE构建数据仓库做离线分析。任务的调度使用Shell脚本完成,固然大家也能够尝试1些自动化的任务调度工具,比如说AZKABANOOZIE等。
      分析所使用的点击流日志文件主要来自Nginxaccess.log日志文件,需要注意的是在这里其实不是用Flume直接去生产环境上拉取nginx的日志文件,而是多设置了1层FTP服务器来缓冲所有的日志文件,然后再用Flume监听FTP服务器上指定的目录并拉取目录里的日志文件到HDFS服务器上(具体缘由下面分析)。从生产环境推送日志文件到FTP服务器的操作可以通过Shell脚本配合Crontab定时器来实现。

网站点击流数据

       
         
         图片来源:http://webdataanalysis.net/data-collection-and-preprocessing/weblog-to-clickstream/#comments

      1般在WEB系统中,用户对站点的页面的访问阅读,点击行动等1系列的数据都会记录在日志中,每条日志记录就代表着上图中的1个数据点;而点击流数据关注的就是所有这些点连起来后的1个完全的网站阅读行动记录,可以认为是1个用户对网站的阅读session。比如说用户从哪个外站进入到当前的网站,用户接下来阅读了当前网站的哪些页面,点击了哪些图片链接按钮等1系列的行动记录,这1个整体的信息就称为是该用户的点击流记录。这篇文章中设计的离线分析系统就是搜集WEB系统中产生的这些数据日志,并清洗日志内容存储散布式的HDFS文件存储系统上,接着使用离线分析工具HIVE去统计所有用户的点击流信息。

      本系统中我们采取Nginx的access.log来做点击流分析的日志文件。access.log日志文件的格式以下:
      样例数据格式:
      124.42.13.230 - - [18/Sep/2013:06:57:50 +0000] "GET /shoppingMall?ver=1.2.1 HTTP/1.1" 200 7200 "http://www.baidu.com.cn" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)"
        格式分析: 
        1、 访客ip地址:124.42.13.230
        2、访客用户信息: - -
          3、要求时间:[18/Sep/2013:06:57:50 +0000]
        4、要求方式:GET
          5、要求的url/shoppingMall?ver=1.10.2
        6、要求所用协议:HTTP/1.1
          7、响应码:200
        8、返回的数据流量:7200
        9、访客的来源urlhttp://www.baidu.com.cn
        10、访客所用阅读器:Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; BTRS101170; InfoPath.2; .NET4.0C; .NET4.0E; .NET CLR 2.0.50727)

 搜集用户数据
      网站会通过前端JS代码或服务器真个后台代码搜集用户阅读数据并存储在网站服务器中。1般运维人员会在离线分析系统和真实生产环境之间部署FTP服务器,并将生产环境上的用户数据每天定时发送到FTP服务器上,离线分析系统就会从FTP服务上收集数据而不会影响到生产环境。
      
收集数据的方式有多种,1种是通过自己编写shell脚本或Java编程收集数据,但是工作量大,不方便保护,另外一种就是直接使用第3方框架去进行日志的收集,1般第3方框架的硬朗性,容错性和易用性都做得很好也易于保护。本文彩用第3方框架Flume进行日志收集,Flume是1个散布式的高效的日志收集系统,它能把散布在不同服务器上的海量日志文件数据统1搜集到1个集中的存储资源中,FlumeApache的1个顶级项目,与Hadoop也有很好的兼容性。不过需要注意的是Flume其实不是1个高可用的框架,这方面的优化得用户自己去保护。
        Flume
agent是运行在JVM上的,所以各个服务器上的JVM环境必不可少。每个Flume agent部署在1台服务器上,Flume会搜集web server 产生的日志数据,并封装成1个个的事件发送给Flume AgentSourceFlume Agent Source会消费这些搜集来的数据事件并放在Flume Agent ChannelFlume Agent Sink会从Channel中搜集这些收集过来的数据,要末存储在本地的文件系统中要末作为1个消费资源分发给下1个装在散布式系统中其它服务器上的Flume进行处理。Flume提供了点对点的高可用的保障,某个服务器上的Flume Agent Channel中的数据只有确保传输到了另外一个服务器上的Flume Agent Channel里或正确保存到了本地的文件存储系统中,才会被移除。
本系统中每个FTP服务器Hadoopname node服务器上都要部署1个Flume AgentFTPFlume Agent收集Web Server的日志并汇总到name node服务器上的Flume Agent,最后由hadoop name node服务器将所有的日志数据下沉到散布式的文件存储系统HDFS上面。
      需要注意的是Flume的Source在本文的系统当选择的是Spooling Directory Source,而没有选择Exec Source,由于当Flume服务down掉的时候Spooling Directory Source能记录上1次读取到的位置,而Exec Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。固然Spooling Directory Source也是有缺点的,会对读取过的文件重命名,所以多架1层FTP服务器也是为了不Flume“污染”生产环境。Spooling Directory Source另外1个比较大的缺点就是没法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志收集工具,像logstash等。
       FTP
服务器上的Flume配置文件以下:   
agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = spooldir agent.sources.origin.spoolDir = /export/data/trivial/weblogs agent.sources.origin.channels = memorychannel agent.sources.origin.deserializer.maxLineLength = 2048 agent.sources.origin.interceptors = i2 agent.sources.origin.interceptors.i2.type = host agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 10000 agent.sinks.target.type = avro agent.sinks.target.channel = memorychannel agent.sinks.target.hostname = 172.16.124.130 agent.sinks.target.port = 4545
     这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每一个Event的大小,默许是每一个Event2048byteFlume Agent Channel的大小默许等于于本地服务器JVM所获得到的内存的80%,用户可以通过byteCapacityBufferPercentagebyteCapacity两个参数去进行优化。
     
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。

     Hadoop服务器上的配置文件以下:   
agent.sources = origin agent.channels = memorychannel agent.sinks = target agent.sources.origin.type = avro agent.sources.origin.channels = memorychannel agent.sources.origin.bind = 0.0.0.0 agent.sources.origin.port = 4545 #agent.sources.origin.interceptors = i1 i2 #agent.sources.origin.interceptors.i1.type = timestamp #agent.sources.origin.interceptors.i2.type = host #agent.sources.origin.interceptors.i2.hostHeader = hostname agent.sinks.loggerSink.type = logger agent.sinks.loggerSink.channel = memorychannel agent.channels.memorychannel.type = memory agent.channels.memorychannel.capacity = 5000000 agent.channels.memorychannel.transactionCapacity = 1000000 agent.sinks.target.type = hdfs agent.sinks.target.channel = memorychannel agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S agent.sinks.target.hdfs.filePrefix = data-%{hostname} agent.sinks.target.hdfs.rollInterval = 60 agent.sinks.target.hdfs.rollSize = 1073741824 agent.sinks.target.hdfs.rollCount = 1000000 agent.sinks.target.hdfs.round = true agent.sinks.target.hdfs.roundValue = 10 agent.sinks.target.hdfs.roundUnit = minute agent.sinks.target.hdfs.useLocalTimeStamp = true agent.sinks.target.hdfs.minBlockReplicas=1 agent.sinks.target.hdfs.writeFormat=Text agent.sinks.target.hdfs.fileType=DataStream

round, roundValue,roundUnit3个参数是用来配置每10分钟在hdfs里生成1个文件夹保存从FTP服务器上拉取下来的数据。

    Troubleshooting 
       使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB⑸KB的小文件的问题   
       需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB⑸KB的小文件存储到HDFS上,那末极可能是HDFS Sink的配置不正确,致使系统使用了默许配置。spooldir类型的source是将指定目录中的文件的每行封装成1个event放入到channel中,默许每行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默许30秒), rollSize(默许1KB), rollCount(默许10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过量少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示1旦.tmp文件到达1定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件1旦写入了指定数量的events就下沉到HDFS文件系统中。

       使用Flume拉取到HDFS中的文件格式错乱
       这是由于HDFS Sink的配置中,hdfs.writeFormat属性默许为“Writable”会将本来的文件的内容序列化成HDFS的格式,应当手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默许是“SequenceFile”类型的,是将所有event拼成1行,应当该手动设置成hdfs.fileType=“DataStream”,这样就能够是1行1个event,与原文件格式保持1致

使用Mapreduce清洗日志文件
当把日志文件中的数据拉取到HDFS文件系统后,使用Mapreduce程序去进行日志清洗
第1步,先用Mapreduce过滤掉无效的数据
package com.guludada.clickstream; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.dataparser.WebLogParser; public class logClean { public static class cleanMap extends Mapper<Object,Text,Text,NullWritable> { private NullWritable v = NullWritable.get(); private Text word = new Text(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //将1行内容转成string String line = value.toString(); String cleanContent = webLogParser.parser(line); if(cleanContent != "") { word.set(cleanContent); try { context.write(word,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(cleanMap.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //指定job的输入原始文件所在目录 Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); FileInputFormat.setInputPaths(job, new Path("/flume/events/" + dateStr + "/*/*")); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path("/clickstream/cleandata/"+dateStr+"/")); //将job中配置的相干参数,和job所用的java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }

package com.guludada.dataparser; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.guludada.javabean.WebLogBean; /** * 用正则表达式匹配出合法的日志记录 * * */ public class WebLogParser { public String parser(String weblog_origin) { WebLogBean weblogbean = new WebLogBean(); // 获得IP地址 Pattern IPPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+"); Matcher IPMatcher = IPPattern.matcher(weblog_origin); if(IPMatcher.find()) { String IPAddr = IPMatcher.group(0); weblogbean.setIP_addr(IPAddr); } else { return "" } // 获得时间信息 Pattern TimePattern = Pattern.compile("\\[(.+)\\]"); Matcher TimeMatcher = TimePattern.matcher(weblog_origin); if(TimeMatcher.find()) { String time = TimeMatcher.group(1); String[] cleanTime = time.split(" "); weblogbean.setTime(cleanTime[0]); } else { return ""; } //获得其余要求信息 Pattern InfoPattern = Pattern.compile( "(\\\"[POST|GET].+?\\\") (\\d+) (\\d+).+?(\\\".+?\\\") (\\\".+?\\\")"); Matcher InfoMatcher = InfoPattern.matcher(weblog_origin); if(InfoMatcher.find()) { String requestInfo = InfoMatcher.group(1).replace('\"',' ').trim(); String[] requestInfoArry = requestInfo.split(" "); weblogbean.setMethod(requestInfoArry[0]); weblogbean.setRequest_URL(requestInfoArry[1]); weblogbean.setRequest_protocol(requestInfoArry[2]); String status_code = InfoMatcher.group(2); weblogbean.setRespond_code(status_code); String respond_data = InfoMatcher.group(3); weblogbean.setRespond_data(respond_data); String request_come_from = InfoMatcher.group(4).replace('\"',' ').trim(); weblogbean.setRequst_come_from(request_come_from); String browserInfo = InfoMatcher.group(5).replace('\"',' ').trim(); weblogbean.setBrowser(browserInfo); } else { return ""; } return weblogbean.toString(); } }
package com.guludada.javabean; public class WebLogBean { String IP_addr; String time; String method; String request_URL; String request_protocol; String respond_code; String respond_data; String requst_come_from; String browser; public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getRequest_protocol() { return request_protocol; } public void setRequest_protocol(String request_protocol) { this.request_protocol = request_protocol; } public String getRespond_code() { return respond_code; } public void setRespond_code(String respond_code) { this.respond_code = respond_code; } public String getRespond_data() { return respond_data; } public void setRespond_data(String respond_data) { this.respond_data = respond_data; } public String getRequst_come_from() { return requst_come_from; } public void setRequst_come_from(String requst_come_from) { this.requst_come_from = requst_come_from; } public String getBrowser() { return browser; } public void setBrowser(String browser) { this.browser = browser; } @Override public String toString() { return IP_addr + " " + time + " " + method + " " + request_URL + " " + request_protocol + " " + respond_code + " " + respond_data + " " + requst_come_from + " " + browser; } }

第1第二天记清洗后的记录以下图:
 


2步,根据访问记录生成相应的Session信息记录假定Session的过期时间是30分钟

package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.WebLogSessionBean; public class logSession { public static class sessionMapper extends Mapper<Object,Text,Text,Text> { private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); public void map(Object key,Text value,Context context) { //将1行内容转成string String line = value.toString(); String[] weblogArry = line.split(" "); IPAddr.set(weblogArry[0]); content.set(line); try { context.write(IPAddr,content); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } static class sessionReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text IPAddr = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); WebLogParser webLogParser = new WebLogParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SessionParser sessionParser = new SessionParser(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Date sessionStartTime = null; String sessionID = UUID.randomUUID().toString(); //将IP地址所对应的用户的所有阅读记录按时间排序 ArrayList<WebLogSessionBean> sessionBeanGroup = new ArrayList<WebLogSessionBean>(); for(Text browseHistory : values) { WebLogSessionBean sessionBean = sessionParser.loadBean(browseHistory.toString()); sessionBeanGroup.add(sessionBean); } Collections.sort(sessionBeanGroup,new Comparator<WebLogSessionBean>() { public int compare(WebLogSessionBean sessionBean1, WebLogSessionBean sessionBean2) { Date date1 = sessionBean1.getTimeWithDateFormat(); Date date2 = sessionBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); for(WebLogSessionBean sessionBean : sessionBeanGroup) { if(sessionStartTime == null) { //当天日志中某用户第1次访问网站的时间 sessionStartTime = timeTransform(sessionBean.getTime()); content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else { Date sessionEndTime = timeTransform(sessionBean.getTime()); long sessionStayTime = timeDiffer(sessionStartTime,sessionEndTime); if(sessionStayTime > 30 * 60 * 1000) { //将当前阅读记录的时间设为下1个session的开始时间 sessionStartTime = timeTransform(sessionBean.getTime()); sessionID = UUID.randomUUID().toString(); continue; } content.set(sessionParser.parser(sessionBean, sessionID)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } private Date timeTransform(String time) { Date standard_time = null; try { standard_time = sdf.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return standard_time; } private long timeDiffer(Date start_time,Date end_time) { long diffTime = 0; diffTime = end_time.getTime() - start_time.getTime(); return diffTime; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(logClean.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(sessionMapper.class); job.setReducerClass(sessionReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定终究输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path("/clickstream/cleandata/"+dateStr+"/*")); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path("/clickstream/sessiondata/"+dateStr+"/")); //将job中配置的相干参数,和job所用的java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import com.guludada.javabean.WebLogSessionBean; public class SessionParser { SimpleDateFormat sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH); SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public String parser(WebLogSessionBean sessionBean,String sessionID) { sessionBean.setSession(sessionID); return sessionBean.toString(); } public WebLogSessionBean loadBean(String sessionContent) { WebLogSessionBean weblogSession = new WebLogSessionBean(); String[] contents = sessionContent.split(" "); weblogSession.setTime(timeTransform(contents[1])); weblogSession.setIP_addr(contents[0]); weblogSession.setRequest_URL(contents[3]); weblogSession.setReferal(contents[7]); return weblogSession; } private String timeTransform(String time) { Date standard_time = null; try { standard_time = sdf_origin.parse(time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } return sdf_final.format(standard_time); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class WebLogSessionBean { String time; String IP_addr; String session; String request_URL; String referal; public String getTime() { return time; } public void setTime(String time) { this.time = time; } public String getIP_addr() { return IP_addr; } public void setIP_addr(String iP_addr) { IP_addr = iP_addr; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRequest_URL() { return request_URL; } public void setRequest_URL(String request_URL) { this.request_URL = request_URL; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public Date getTimeWithDateFormat() { SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); if(this.time != null && this.time != "") { try { return sdf_final.parse(this.time); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return null; } @Override public String toString() { return time + " " + IP_addr + " " + session + " " + request_URL + " " + referal; } }

第2次清算出来的Session信息结构以下:
时间 IP SessionID 要求页面URL Referal URL
2015-05⑶0 19:38:00 192.168.12.130 Session1 /blog/me www.baidu.com
2015-05⑶0 19:39:00 192.168.12.130 Session1 /blog/me/details www.mysite.com/blog/me
2015-05⑶0 19:38:00 192.168.12.40 Session2 /blog/me www.baidu.com



第3步,清洗第2步生成的Session信息,生成PageViews信息表
package com.guludada.clickstream; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.Locale; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.guludada.clickstream.logClean.cleanMap; import com.guludada.clickstream.logSession.sessionMapper; import com.guludada.clickstream.logSession.sessionReducer; import com.guludada.dataparser.PageViewsParser; import com.guludada.dataparser.SessionParser; import com.guludada.dataparser.WebLogParser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViews { public static class pageMapper extends Mapper<Object,Text,Text,Text> { private Text word = new Text(); public void map(Object key,Text value,Context context) { String line = value.toString(); String[] webLogContents = line.split(" "); //根据session来分组 word.set(webLogContents[2]); try { context.write(word,value); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public static class pageReducer extends Reducer<Text, Text, Text, NullWritable>{ private Text session = new Text(); private Text content = new Text(); private NullWritable v = NullWritable.get(); PageViewsParser pageViewsParser = new PageViewsParser(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //上1条记录的访问信息 PageViewsBean lastStayPageBean = null; Date lastVisitTime = null; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //将session所对应的所有阅读记录按时间排序 ArrayList<PageViewsBean> pageViewsBeanGroup = new ArrayList<PageViewsBean>(); for(Text pageView : values) { PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString()); pageViewsBeanGroup.add(pageViewsBean); } Collections.sort(pageViewsBeanGroup,new Comparator<PageViewsBean>() { public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) { Date date1 = pageViewsBean1.getTimeWithDateFormat(); Date date2 = pageViewsBean2.getTimeWithDateFormat(); if(date1 == null && date2 == null) return 0; return date1.compareTo(date2); } }); //计算每一个页面的停留时间 int step = 0; for(PageViewsBean pageViewsBean : pageViewsBeanGroup) { Date curVisitTime = pageViewsBean.getTimeWithDateFormat(); if(lastStayPageBean != null) { //计算前后两次访问记录像差的时间,单位是秒 Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000); //根据当前记录的访问信息更新上1条访问记录中访问的页面的停留时间 lastStayPageBean.setStayTime(timeDiff.toString()); } //更新访问记录的步数 step++; pageViewsBean.setStep(step+""); //更新上1条访问记录的停留时间后,将当前访问记录设定为上1条访问信息记录 lastStayPageBean = pageViewsBean; lastVisitTime = curVisitTime; //输出pageViews信息 content.set(pageViewsParser.parser(pageViewsBean)); try { context.write(content,v); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000"); Job job = Job.getInstance(conf); job.setJarByClass(PageViews.class); //指定本业务job要使用的mapper/Reducer业务类 job.setMapperClass(pageMapper.class); job.setReducerClass(pageReducer.class); //指定mapper输出数据的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定终究输出的数据的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); Date curDate = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd"); String dateStr = sdf.format(curDate); //指定job的输入原始文件所在目录 FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*")); //指定job的输出结果所在目录 FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/")); //将job中配置的相干参数,和job所用的java类所在的jar包,提交给yarn去运行 boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
package com.guludada.dataparser; import com.guludada.javabean.PageViewsBean; import com.guludada.javabean.WebLogSessionBean; public class PageViewsParser { /** * 根据logSession的输出数据加载PageViewsBean * * */ public PageViewsBean loadBean(String sessionContent) { PageViewsBean pageViewsBean = new PageViewsBean(); String[] contents = sessionContent.split(" "); pageViewsBean.setTime(contents[0] + " " + contents[1]); pageViewsBean.setIP_addr(contents[2]); pageViewsBean.setSession(contents[3]); pageViewsBean.setVisit_URL(contents[4]); pageViewsBean.setStayTime("0"); pageViewsBean.setStep("0"); return pageViewsBean; } public String parser(PageViewsBean pageBean) { return pageBean.toString(); } }
package com.guludada.javabean; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class PageViewsBean { String session; String IP_addr; String time; String visit_URL; String stayTime; String step; public String getSession() { return session; } public void setSession(String s
生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠
程序员人生
------分隔线----------------------------
分享到:
------分隔线----------------------------
关闭
程序员人生