在HDFS的使用进程中,有的时候集群保护者可能想要知道哪些用户使用他们集群的资源比较多,以此有1个全面的了解。在YARN中,衡量用户使用资源的1个指标是container数,而在HDFS中我们 可以用甚么指标呢?答案是要求数。固然你可能会说,为何不能用写入写出的总数据量作为指标呢?没错,这的确也是1个可选指标,但是明显它们不容易搜集和统计,相比于要求数而言。那末问题又来了,是不是我们有必要自己写程序做这样的1套分析功能呢?HDFS是不是已帮我们做了这样的事情呢?带着以上的几个问题,我们继续下文的浏览,答案将会1点点的揭晓。
先抛开前面所说的内容,如果单纯斟酌HDFS要求数的Top统计功能,我们会采取甚么样的办法呢?可能我们的实现思路会是这样的:
在以上3步中,第1步的实现是最难的,由于这个“入口”不好找,在这里我罗列了3种办法:
上面的第3种方案无疑是最好的方案,重新回到刚刚前言中提过的1个问题,现有HDFS中是不是已实现这样的概念呢?答案是肯定的,在目前的Hadoop 2.7和以上版本中已实现了这样的功能,相干JIRA HDFS⑹982(nntop: top-like tool for name node users)。此JIRA的实现者是来自twitter的1个hadoop工程师,此特性已在twitter内部用了几个月的时间,然后此工程师将其贡献到了社区。在HDFS⑹982的设计中,还添加了Top统计值的获得方式,以下图所示:
从上面的设计中,我们可以看到,这里新定义了1个叫TopAuditLogger来做这样的拦截解析,将统计值存入TopMetrics中,然后通过web和jmx的方式进行Top用户数据的获得。在后面代码的实现分析中,大家会更能体会此架构的设计思路。在HDFS⑹982中,将此HDFS Top要求数的用户统计简称为HDFS nnTop,所以在后续的文字描写中也将会以这样的简称来称呼。
在本节中,我们将会聊聊HDFS nnTop细节上的1些实现。其中最大的1个问题就是Metric统计值的细节设计,我们到底以怎样样的情势来存这些统计值呢?由于不同的用户在不同时间段内访问的要求数是各不相同的,所以1个比较适合的办法是每次统计阶段时间内的Top user统计。然后对这个区间时间,我们是可以根据配置进行配置的,5分钟,1分钟,或30s等等。基于这个核心设计思想,在HDFS⑹982中,那位twitter的工程师使用了类似滑动窗口的机制,入下图所示:
比如上面显示的RollingWindow表示的全部区间是假定是最近1分钟,就是60s,然后其内部份成了6个bucket,每一个bucket就是代表10s。然后根据动作的产生时间,计算出其中所在的bucket,在此bucket对象内进行计数累加。但是在这里得要提1点,bucket如果配的越多,代表统计的精度就会越准,一样内存的开消也将会加大,这里会有1个内存空间与准确度之间的博弈比较。
这里的RollingWindow是对应到具体用户的具体要求动作了。所在HDFS中,你会看到针对不同用户,操作的许许多多的RollingWindow,它的总的结构组织图如:
上面的结构图用1句话概括以下:
TopMetric根据不同period时间创建不同的RollingWindowManager,在每一个RollingWindowManager中,又包括了数个不同操作要求类型的RollingWindowMap,最后在每一个RollingWindowMap中,依照用户又划分出了许多个RolingWindow对象。
下面是我们非常关注的nnTop的代码实现部份,我尽可能会挑选其中核心的代码进行分析,这样看起来会更加的简洁。
我们首先来看要求数据的拦截获得,下面是TopMetric的定义,
public class TopMetrics {
...
// RollingWindowManagers对象图,每一个对象映照到1个周期时间
final Map<Integer, RollingWindowManager> rollingWindowManagers =
new HashMap<Integer, RollingWindowManager>();
public TopMetrics(Configuration conf, int[] reportingPeriods) {
logConf(conf);
for (int i = 0; i < reportingPeriods.length; i++) {
// 根据配置传入的period,创建不同的RollingWindowManager,并加入变量rollingWindowManagers中
rollingWindowManagers.put(reportingPeriods[i], new RollingWindowManager(
conf, reportingPeriods[i]));
}
}
然后TopMetric会在FSNamesystem的initAuditLoggers方法中被构造,
private List<AuditLogger> initAuditLoggers(Configuration conf) {
// Initialize the custom access loggers if configured.
Collection<String> alClasses =
conf.getTrimmedStringCollection(DFS_NAMENODE_AUDIT_LOGGERS_KEY);
List<AuditLogger> auditLoggers = Lists.newArrayList();
...
// Add audit logger to calculate top users
// 判断是不是开启nnTop统计功能
if (topConf.isEnabled) {
// 创建TopMetric对象进行nnTop的统计,传入配置的周期时间
topMetrics = new TopMetrics(conf, topConf.nntopReportingPeriodsMs);
// 新建TopAuditLogger对象进行audit log记录的拦截处理
auditLoggers.add(new TopAuditLogger(topMetrics));
}
return Collections.unmodifiableList(auditLoggers);
}
随后在logAuditEvent方法中,topAuditLogger会被调用到,
private void logAuditEvent(boolean succeeded,
UserGroupInformation ugi, InetAddress addr, String cmd, String src,
String dst, HdfsFileStatus stat) {
FileStatus status = null;
...
final String ugiStr = ugi.toString();
// 在每次的logAuditEvent方法中,会得到每次的要求事件,给到AuditLogger处理,包括之前的TopAuditLogger
for (AuditLogger logger : auditLoggers) {
if (logger instanceof HdfsAuditLogger) {
HdfsAuditLogger hdfsLogger = (HdfsAuditLogger) logger;
hdfsLogger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst,
status, CallerContext.getCurrent(), ugi, dtSecretManager);
} else {
logger.logAuditEvent(succeeded, ugiStr, addr, cmd, src, dst, status);
}
}
}
在这里topAuditLogger的logAuditEvent一样会被调用到,我们进入此方法,
public void logAuditEvent(boolean succeeded, String userName,
InetAddress addr, String cmd, String src, String dst, FileStatus status) {
try {
// 这里会调用topMetrics的report统计方法,进行计数的累加
topMetrics.report(succeeded, userName, addr, cmd, src, dst, status);
} catch (Throwable t) {
LOG.error("An error occurred while reflecting the event in top service, "
+ "event: (cmd={},userName={})", cmd, userName);
}
...
}
每次的数据我们已拿到了,然后我们怎样统计到之前的RollingWindow对象中呢?我们继续刚才的topMetric的report统计方法,
public void report(long currTime, String userName, String cmd) {
LOG.debug("a metric is reported: cmd: {} user: {}", cmd, userName);
userName = UserGroupInformation.trimLoginMethod(userName);
for (RollingWindowManager rollingWindowManager : rollingWindowManagers
.values()) {
// 遍历不同period的rollingWindowManager对象,传入当前时间,操作类型,用户,增加的计数值
rollingWindowManager.recordMetric(currTime, cmd, userName, 1);
// 额外增加总数的统计类型
rollingWindowManager.recordMetric(currTime,
TopConf.ALL_CMDS, userName, 1);
}
}
Ok,上面的进程仿佛与之前我们所设计的进程10分的吻合了,我们再次进入RollWindowManafer的方法,
public void recordMetric(long time, String command,
String user, long delta) {
// 传入操作类型,用户名称,获得目标的RollingWindow对象
RollingWindow window = getRollingWindow(command, user);
// 在此rollingWindow中进行计数累加
window.incAt(time, delta);
}
继续进入RollingWindow的incr方法,
public void incAt(long time, long delta) {
// 计算当前时间对应的bucket下标
int bi = computeBucketIndex(time);
Bucket bucket = buckets[bi];
// 如果此时间已超过此bucket上次更新时间的周期范围,进行计数重置,
// 表明此bucket是上1周期内的bucket。
if (bucket.isStaleNow(time)) {
bucket.safeReset(time);
}
// 增加bucket上的计数统计,此bucket内部用的是AtomicLong数据类型,所以不会有线程安全的问题
bucket.inc(delta);
}
上面的统计方法,实际上是个滑动技术统计的进程,老的超时的bucket会被重新统计,然后又从0开始,相当因而往后挪了1个窗口,以此保证了最近周期内的统计,所之前面我们说bucket越多,统计的精度将会越准,bucket所控制的时间范围会越短,重置bucket的影响会变小。
前面数据的拦截,统计都完成了,那末我们有甚么样的方式拿到呢?这里以jmx的获得方式为例,获得的代码以下,
public String getTopUserOpCounts() {
if (!topConf.isEnabled) {
return null;
}
Date now = new Date();
// 从topMetric对象中获得top user返回结果
final List<RollingWindowManager.TopWindow> topWindows =
topMetrics.getTopWindows();
Map<String, Object> topMap = new TreeMap<String, Object>();
topMap.put("windows", topWindows);
topMap.put("timestamp", DFSUtil.dateToIso8601String(now));
try {
// 将Top统计结果转为json字符返回
return JsonUtil.toJsonString(topMap);
} catch (IOException e) {
LOG.warn("Failed to fetch TopUser metrics", e);
}
return null;
}
上面的代码意味着通过http://nn-address:50070/jmx的方式能够拿到top user的结果。本人在测试集群中进行测试,拿到的结果以下:
NNTop:
"NumStaleStorages" : 0,
"TopUserOpCounts" : "{\"timestamp\":\"2016-09⑴1T16:31:27+0800\",\"windows\":[{\"windowLenMs\":300000,\"ops\":
[{\"opType\":\"delete\",\"topUsers\":[{\"user\":\"root\",\"count\":3}],\"totalCount\":3},
{\"opType\":\"setTimes\",\"topUsers\":[{\"user\":\"data\",\"count\":2}],\"totalCount\":2},
{\"opType\":\"open\",\"topUsers\":[{\"user\":\"root\",\"count\":22230},{\"user\":\"data\",\"count\":4766}],\"totalCount\":26996},
{\"opType\":\"create\",\"topUsers\":[{\"user\":\"root\",\"count\":518},{\"user\":\"data\",\"count\":5}],\"totalCount\":523},
{\"opType\":\"setPermission\",\"topUsers\":[{\"user\":\"root\",\"count\":15}],\"totalCount\":15},
{\"opType\":\"*\",\"topUsers\":[{\"user\":\"root\",\"count\":50134},{\"user\":\"data\",\"count\":21943},{\"user\":\"bc\",\"count\":20}],\"totalCount\":72097},
{\"opType\":\"rename\",\"topUsers\":[{\"user\":\"root\",\"count\":285},{\"user\":\"data\",\"count\":2}],\"totalCount\":287},
{\"opType\":\"mkdirs\",\"topUsers\":[{\"user\":\"root\",\"count\":8},{\"user\":\"bc\",\"count\":4}],\"totalCount\":12},
{\"opType\":\"setReplication\",\"topUsers\":[{\"user\":\"root\",\"count\":212}],\"totalCount\":212},
{\"opType\":\"listStatus\",\"topUsers\":[{\"user\":\"data\",\"count\":12400},{\"user\":\"root\",\"count\":102}],\"totalCount\":12502},
{\"opType\":\"getfileinfo\",\"topUsers\":[{\"user\":\"root\",\"count\":25929},{\"user\":\"data\",\"count\":6362},{\"user\":\"bc\",\"count\":16}],\"totalCount\":32307}]},
{\"windowLenMs\":1500000,\"ops\":
[{\"opType\":\"delete\",\"topUsers\":...}"
}, {
相信上面的统计结果大家都能直接看得懂,大家可以通过拿到jmx的数据,自行解析成object对象便可拿来用了。
OK,HDFS nnTop用户Top要求数统计功能就是如上所描写的,此功能默许是开启的,由配置项dfs.namenode.top.enabled所控制,而且里面top用户的数量和统计的周期时间都是可配的,大家可以好好把此功能给用起来了。
[1].https://issues.apache.org/jira/browse/HDFS⑹982
[2].https://issues.apache.org/jira/secure/attachment/12665990/nntop-design-v1.pdf