需要注意的是Flume的Source在本文的系统当选择的是Spooling
Directory Source,而没有选择Exec
Source,由于当Flume服务down掉的时候Spooling
Directory Source能记录上1次读取到的位置,而Exec
Source则没有,需要用户自己去处理,当重启Flume服务器的时候如果处理不好就会有重复数据的问题。固然Spooling
Directory Source也是有缺点的,会对读取过的文件重命名,所以多架1层FTP服务器也是为了不Flume“污染”生产环境。Spooling
Directory Source另外1个比较大的缺点就是没法做到灵活监听某个文件夹底下所有子文件夹里的所有文件里新追加的内容。关于这些问题的解决方案也有很多,比如选择其它的日志收集工具,像logstash等。
FTP服务器上的Flume配置文件以下: agent.channels = memorychannel
agent.sinks = target
agent.sources.origin.type = spooldir
agent.sources.origin.spoolDir = /export/data/trivial/weblogs
agent.sources.origin.channels = memorychannel
agent.sources.origin.deserializer.maxLineLength = 2048
agent.sources.origin.interceptors = i2
agent.sources.origin.interceptors.i2.type = host
agent.sources.origin.interceptors.i2.hostHeader = hostname
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel
agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 10000
agent.sinks.target.type = avro
agent.sinks.target.channel = memorychannel
agent.sinks.target.hostname = 172.16.124.130
agent.sinks.target.port = 4545
这里有几个参数需要说明,Flume Agent Source可以通过配置deserializer.maxLineLength这个属性来指定每一个Event的大小,默许是每一个Event是2048个byte。Flume
Agent Channel的大小默许等于于本地服务器上JVM所获得到的内存的80%,用户可以通过byteCapacityBufferPercentage和byteCapacity两个参数去进行优化。
需要特别注意的是FTP上放入Flume监听的文件夹中的日志文件不能同名,不然Flume会报错并停止工作,最好的解决方案就是为每份日志文件拼上时间戳。
在Hadoop服务器上的配置文件以下: agent.sources = origin
agent.channels = memorychannel
agent.sinks = target
agent.sources.origin.type = avro
agent.sources.origin.channels = memorychannel
agent.sources.origin.bind = 0.0.0.0
agent.sources.origin.port = 4545
#agent.sources.origin.interceptors = i1 i2
#agent.sources.origin.interceptors.i1.type = timestamp
#agent.sources.origin.interceptors.i2.type = host
#agent.sources.origin.interceptors.i2.hostHeader = hostname
agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memorychannel
agent.channels.memorychannel.type = memory
agent.channels.memorychannel.capacity = 5000000
agent.channels.memorychannel.transactionCapacity = 1000000
agent.sinks.target.type = hdfs
agent.sinks.target.channel = memorychannel
agent.sinks.target.hdfs.path = /flume/events/%y-%m-%d/%H%M%S
agent.sinks.target.hdfs.filePrefix = data-%{hostname}
agent.sinks.target.hdfs.rollInterval = 60
agent.sinks.target.hdfs.rollSize = 1073741824
agent.sinks.target.hdfs.rollCount = 1000000
agent.sinks.target.hdfs.round = true
agent.sinks.target.hdfs.roundValue = 10
agent.sinks.target.hdfs.roundUnit = minute
agent.sinks.target.hdfs.useLocalTimeStamp = true
agent.sinks.target.hdfs.minBlockReplicas=1
agent.sinks.target.hdfs.writeFormat=Text
agent.sinks.target.hdfs.fileType=DataStream
round, roundValue,roundUnit3个参数是用来配置每10分钟在hdfs里生成1个文件夹保存从FTP服务器上拉取下来的数据。
Troubleshooting
使用Flume拉取文件到HDFS中会遇到将文件分散成多个1KB⑸KB的小文件的问题
需要注意的是如果遇到Flume会将拉取过来的文件分成很多份1KB⑸KB的小文件存储到HDFS上,那末极可能是HDFS
Sink的配置不正确,致使系统使用了默许配置。spooldir类型的source是将指定目录中的文件的每行封装成1个event放入到channel中,默许每行最大读取1024个字符。在HDFS Sink端主要是通过rollInterval(默许30秒), rollSize(默许1KB), rollCount(默许10个event)3个属性来决定写进HDFS的分片文件的大小。rollInterval表示经过量少秒后就将当前.tmp文件(写入的是从channel中过来的events)下沉到HDFS文件系统中,rollSize表示1旦.tmp文件到达1定的size后,就下沉到HDFS文件系统中,rollCount表示.tmp文件1旦写入了指定数量的events就下沉到HDFS文件系统中。
这是由于HDFS Sink的配置中,hdfs.writeFormat属性默许为“Writable”会将本来的文件的内容序列化成HDFS的格式,应当手动设置成hdfs.writeFormat=“text”; 并且hdfs.fileType默许是“SequenceFile”类型的,是将所有event拼成1行,应当该手动设置成hdfs.fileType=“DataStream”,这样就能够是1行1个event,与原文件格式保持1致
使用Mapreduce清洗日志文件
当把日志文件中的数据拉取到HDFS文件系统后,使用Mapreduce程序去进行日志清洗
第1步,先用Mapreduce过滤掉无效的数据
package com.guludada.clickstream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.guludada.dataparser.WebLogParser;
public class logClean {
public static class cleanMap extends Mapper<Object,Text,Text,NullWritable> {
private NullWritable v = NullWritable.get();
private Text word = new Text();
WebLogParser webLogParser = new WebLogParser();
public void map(Object key,Text value,Context context) {
//将1行内容转成string
String line = value.toString();
String cleanContent = webLogParser.parser(line);
if(cleanContent != "") {
word.set(cleanContent);
try {
context.write(word,v);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(logClean.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(cleanMap.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//指定job的输入原始文件所在目录
Date curDate = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
String dateStr = sdf.format(curDate);
FileInputFormat.setInputPaths(job, new Path("/flume/events/" + dateStr + "/*/*"));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path("/clickstream/cleandata/"+dateStr+"/"));
//将job中配置的相干参数,和job所用的java类所在的jar包,提交给yarn去运行
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
package com.guludada.dataparser;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.guludada.javabean.WebLogBean;
/**
* 用
正则表达式匹配出合法的日志记录
*
*
*/
public class WebLogParser {
public String parser(String weblog_origin) {
WebLogBean weblogbean = new WebLogBean();
// 获得IP地址
Pattern IPPattern = Pattern.compile("\\d+.\\d+.\\d+.\\d+");
Matcher IPMatcher = IPPattern.matcher(weblog_origin);
if(IPMatcher.find()) {
String IPAddr = IPMatcher.group(0);
weblogbean.setIP_addr(IPAddr);
} else {
return ""
}
// 获得时间信息
Pattern TimePattern = Pattern.compile("\\[(.+)\\]");
Matcher TimeMatcher = TimePattern.matcher(weblog_origin);
if(TimeMatcher.find()) {
String time = TimeMatcher.group(1);
String[] cleanTime = time.split(" ");
weblogbean.setTime(cleanTime[0]);
} else {
return "";
}
//获得其余要求信息
Pattern InfoPattern = Pattern.compile(
"(\\\"[POST|GET].+?\\\") (\\d+) (\\d+).+?(\\\".+?\\\") (\\\".+?\\\")");
Matcher InfoMatcher = InfoPattern.matcher(weblog_origin);
if(InfoMatcher.find()) {
String requestInfo = InfoMatcher.group(1).replace('\"',' ').trim();
String[] requestInfoArry = requestInfo.split(" ");
weblogbean.setMethod(requestInfoArry[0]);
weblogbean.setRequest_URL(requestInfoArry[1]);
weblogbean.setRequest_protocol(requestInfoArry[2]);
String status_code = InfoMatcher.group(2);
weblogbean.setRespond_code(status_code);
String respond_data = InfoMatcher.group(3);
weblogbean.setRespond_data(respond_data);
String request_come_from = InfoMatcher.group(4).replace('\"',' ').trim();
weblogbean.setRequst_come_from(request_come_from);
String browserInfo = InfoMatcher.group(5).replace('\"',' ').trim();
weblogbean.setBrowser(browserInfo);
} else {
return "";
}
return weblogbean.toString();
}
}
package com.guludada.javabean;
public class WebLogBean {
String IP_addr;
String time;
String method;
String request_URL;
String request_protocol;
String respond_code;
String respond_data;
String requst_come_from;
String browser;
public String getIP_addr() {
return IP_addr;
}
public void setIP_addr(String iP_addr) {
IP_addr = iP_addr;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getRequest_URL() {
return request_URL;
}
public void setRequest_URL(String request_URL) {
this.request_URL = request_URL;
}
public String getRequest_protocol() {
return request_protocol;
}
public void setRequest_protocol(String request_protocol) {
this.request_protocol = request_protocol;
}
public String getRespond_code() {
return respond_code;
}
public void setRespond_code(String respond_code) {
this.respond_code = respond_code;
}
public String getRespond_data() {
return respond_data;
}
public void setRespond_data(String respond_data) {
this.respond_data = respond_data;
}
public String getRequst_come_from() {
return requst_come_from;
}
public void setRequst_come_from(String requst_come_from) {
this.requst_come_from = requst_come_from;
}
public String getBrowser() {
return browser;
}
public void setBrowser(String browser) {
this.browser = browser;
}
@Override
public String toString() {
return IP_addr + " " + time + " " + method + " "
+ request_URL + " " + request_protocol + " " + respond_code
+ " " + respond_data + " " + requst_come_from + " " + browser;
}
}
第1第二天记清洗后的记录以下图:
第2步,根据访问记录生成相应的Session信息记录,假定Session的过期时间是30分钟
package com.guludada.clickstream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.dataparser.SessionParser;
import com.guludada.dataparser.WebLogParser;
import com.guludada.javabean.WebLogSessionBean;
public class logSession {
public static class sessionMapper extends Mapper<Object,Text,Text,Text> {
private Text IPAddr = new Text();
private Text content = new Text();
private NullWritable v = NullWritable.get();
WebLogParser webLogParser = new WebLogParser();
public void map(Object key,Text value,Context context) {
//将1行内容转成string
String line = value.toString();
String[] weblogArry = line.split(" ");
IPAddr.set(weblogArry[0]);
content.set(line);
try {
context.write(IPAddr,content);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
static class sessionReducer extends Reducer<Text, Text, Text, NullWritable>{
private Text IPAddr = new Text();
private Text content = new Text();
private NullWritable v = NullWritable.get();
WebLogParser webLogParser = new WebLogParser();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
SessionParser sessionParser = new SessionParser();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Date sessionStartTime = null;
String sessionID = UUID.randomUUID().toString();
//将IP地址所对应的用户的所有阅读记录按时间排序
ArrayList<WebLogSessionBean> sessionBeanGroup = new ArrayList<WebLogSessionBean>();
for(Text browseHistory : values) {
WebLogSessionBean sessionBean = sessionParser.loadBean(browseHistory.toString());
sessionBeanGroup.add(sessionBean);
}
Collections.sort(sessionBeanGroup,new Comparator<WebLogSessionBean>() {
public int compare(WebLogSessionBean sessionBean1, WebLogSessionBean sessionBean2) {
Date date1 = sessionBean1.getTimeWithDateFormat();
Date date2 = sessionBean2.getTimeWithDateFormat();
if(date1 == null && date2 == null) return 0;
return date1.compareTo(date2);
}
});
for(WebLogSessionBean sessionBean : sessionBeanGroup) {
if(sessionStartTime == null) {
//当天日志中某用户第1次访问网站的时间
sessionStartTime = timeTransform(sessionBean.getTime());
content.set(sessionParser.parser(sessionBean, sessionID));
try {
context.write(content,v);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} else {
Date sessionEndTime = timeTransform(sessionBean.getTime());
long sessionStayTime = timeDiffer(sessionStartTime,sessionEndTime);
if(sessionStayTime > 30 * 60 * 1000) {
//将当前阅读记录的时间设为下1个session的开始时间
sessionStartTime = timeTransform(sessionBean.getTime());
sessionID = UUID.randomUUID().toString();
continue;
}
content.set(sessionParser.parser(sessionBean, sessionID));
try {
context.write(content,v);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
private Date timeTransform(String time) {
Date standard_time = null;
try {
standard_time = sdf.parse(time);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return standard_time;
}
private long timeDiffer(Date start_time,Date end_time) {
long diffTime = 0;
diffTime = end_time.getTime() - start_time.getTime();
return diffTime;
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(logClean.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(sessionMapper.class);
job.setReducerClass(sessionReducer.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定终究输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Date curDate = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
String dateStr = sdf.format(curDate);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path("/clickstream/cleandata/"+dateStr+"/*"));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path("/clickstream/sessiondata/"+dateStr+"/"));
//将job中配置的相干参数,和job所用的java类所在的jar包,提交给yarn去运行
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
package com.guludada.dataparser;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import com.guludada.javabean.WebLogSessionBean;
public class SessionParser {
SimpleDateFormat sdf_origin = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.ENGLISH);
SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public String parser(WebLogSessionBean sessionBean,String sessionID) {
sessionBean.setSession(sessionID);
return sessionBean.toString();
}
public WebLogSessionBean loadBean(String sessionContent) {
WebLogSessionBean weblogSession = new WebLogSessionBean();
String[] contents = sessionContent.split(" ");
weblogSession.setTime(timeTransform(contents[1]));
weblogSession.setIP_addr(contents[0]);
weblogSession.setRequest_URL(contents[3]);
weblogSession.setReferal(contents[7]);
return weblogSession;
}
private String timeTransform(String time) {
Date standard_time = null;
try {
standard_time = sdf_origin.parse(time);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return sdf_final.format(standard_time);
}
}
package com.guludada.javabean;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class WebLogSessionBean {
String time;
String IP_addr;
String session;
String request_URL;
String referal;
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
public String getIP_addr() {
return IP_addr;
}
public void setIP_addr(String iP_addr) {
IP_addr = iP_addr;
}
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRequest_URL() {
return request_URL;
}
public void setRequest_URL(String request_URL) {
this.request_URL = request_URL;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public Date getTimeWithDateFormat() {
SimpleDateFormat sdf_final = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if(this.time != null && this.time != "") {
try {
return sdf_final.parse(this.time);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return null;
}
@Override
public String toString() {
return time + " " + IP_addr + " " + session + " "
+ request_URL + " " + referal;
}
}
第2次清算出来的Session信息结构以下:
时间 |
IP |
SessionID |
要求页面URL |
Referal URL |
2015-05⑶0 19:38:00 |
192.168.12.130 |
Session1 |
/blog/me |
www.baidu.com |
2015-05⑶0 19:39:00 |
192.168.12.130 |
Session1 |
/blog/me/details |
www.mysite.com/blog/me |
2015-05⑶0 19:38:00 |
192.168.12.40 |
Session2 |
/blog/me |
www.baidu.com |
第3步,清洗第2步生成的Session信息,生成PageViews信息表
package com.guludada.clickstream;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.guludada.clickstream.logClean.cleanMap;
import com.guludada.clickstream.logSession.sessionMapper;
import com.guludada.clickstream.logSession.sessionReducer;
import com.guludada.dataparser.PageViewsParser;
import com.guludada.dataparser.SessionParser;
import com.guludada.dataparser.WebLogParser;
import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;
public class PageViews {
public static class pageMapper extends Mapper<Object,Text,Text,Text> {
private Text word = new Text();
public void map(Object key,Text value,Context context) {
String line = value.toString();
String[] webLogContents = line.split(" ");
//根据session来分组
word.set(webLogContents[2]);
try {
context.write(word,value);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class pageReducer extends Reducer<Text, Text, Text, NullWritable>{
private Text session = new Text();
private Text content = new Text();
private NullWritable v = NullWritable.get();
PageViewsParser pageViewsParser = new PageViewsParser();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//上1条记录的访问信息
PageViewsBean lastStayPageBean = null;
Date lastVisitTime = null;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//将session所对应的所有阅读记录按时间排序
ArrayList<PageViewsBean> pageViewsBeanGroup = new ArrayList<PageViewsBean>();
for(Text pageView : values) {
PageViewsBean pageViewsBean = pageViewsParser.loadBean(pageView.toString());
pageViewsBeanGroup.add(pageViewsBean);
}
Collections.sort(pageViewsBeanGroup,new Comparator<PageViewsBean>() {
public int compare(PageViewsBean pageViewsBean1, PageViewsBean pageViewsBean2) {
Date date1 = pageViewsBean1.getTimeWithDateFormat();
Date date2 = pageViewsBean2.getTimeWithDateFormat();
if(date1 == null && date2 == null) return 0;
return date1.compareTo(date2);
}
});
//计算每一个页面的停留时间
int step = 0;
for(PageViewsBean pageViewsBean : pageViewsBeanGroup) {
Date curVisitTime = pageViewsBean.getTimeWithDateFormat();
if(lastStayPageBean != null) {
//计算前后两次访问记录像差的时间,单位是秒
Integer timeDiff = (int) ((curVisitTime.getTime() - lastVisitTime.getTime())/1000);
//根据当前记录的访问信息更新上1条访问记录中访问的页面的停留时间
lastStayPageBean.setStayTime(timeDiff.toString());
}
//更新访问记录的步数
step++;
pageViewsBean.setStep(step+"");
//更新上1条访问记录的停留时间后,将当前访问记录设定为上1条访问信息记录
lastStayPageBean = pageViewsBean;
lastVisitTime = curVisitTime;
//输出pageViews信息
content.set(pageViewsParser.parser(pageViewsBean));
try {
context.write(content,v);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://ymhHadoop:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(PageViews.class);
//指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(pageMapper.class);
job.setReducerClass(pageReducer.class);
//指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定终究输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
Date curDate = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd");
String dateStr = sdf.format(curDate);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path("/clickstream/sessiondata/"+dateStr+"/*"));
//指定job的输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path("/clickstream/pageviews/"+dateStr+"/"));
//将job中配置的相干参数,和job所用的java类所在的jar包,提交给yarn去运行
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
package com.guludada.dataparser;
import com.guludada.javabean.PageViewsBean;
import com.guludada.javabean.WebLogSessionBean;
public class PageViewsParser {
/**
* 根据logSession的输出数据加载PageViewsBean
*
* */
public PageViewsBean loadBean(String sessionContent) {
PageViewsBean pageViewsBean = new PageViewsBean();
String[] contents = sessionContent.split(" ");
pageViewsBean.setTime(contents[0] + " " + contents[1]);
pageViewsBean.setIP_addr(contents[2]);
pageViewsBean.setSession(contents[3]);
pageViewsBean.setVisit_URL(contents[4]);
pageViewsBean.setStayTime("0");
pageViewsBean.setStep("0");
return pageViewsBean;
}
public String parser(PageViewsBean pageBean) {
return pageBean.toString();
}
}
package com.guludada.javabean;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class PageViewsBean {
String session;
String IP_addr;
String time;
String visit_URL;
String stayTime;
String step;
public String getSession() {
return session;
}
public void setSession(String s
生活不易,码农辛苦
如果您觉得本网站对您的学习有所帮助,可以手机扫描二维码进行捐赠
------分隔线----------------------------
------分隔线----------------------------