【编者按】毋庸置疑,Hadoop已成为当下最流行的大数据处理平台,让机构可以用更低廉的价格对海量数据进行深度挖掘,同时,在YARN出现后,其生态圈也愈加繁荣;然而,Hadoop原生数据库HBase却因众多问题饱受诟病,比如部署难、以Java为中心等工程问题,以及故障转移、面向主从设计的架构问题,这直接导致了HBase人气甚至不如同为列存储类型的Cassandra。幸运的是,在我们之前有很多先行者对Hadoop进入了深入的研究,本次即为大家带来@无尘道长 的心得分享――HBase写数据过程。
CSDN推荐:欢迎免费订阅《Hadoop与大数据周刊》获取更多Hadoop技术文献、大数据技术分析、企业实战经验,生态圈发展趋势。
以下为原文
博文说明:1、研究版本HBase 0.94.12;2、贴出的源代码可能会有删减,只保留关键的代码。
从client和server两个方面探讨HBase的写数据过程。
一、client端
1、写数据API
写数据主要是HTable的单条写和批量写两个API,源码如下:
//单条写API public void put(final Put put) throws IOException { doPut(put); if (autoFlush) { flushCommits(); } } //批量写API public void put(final List<Put> puts) throws IOException { for (Put put : puts) { doPut(put); } if (autoFlush) { flushCommits(); } } //具体的put实现 private void doPut(Put put) throws IOException{ validatePut(put); writeBuffer.add(put); currentWriteBufferSize += put.heapSize(); if (currentWriteBufferSize > writeBufferSize) { flushCommits(); } } public void close() throws IOException { if (this.closed) { return; } flushCommits(); …. } |
通过两个put API可以看出如果autoFlush为false,则无论是否是批量写效果均是相同,均是等待写入的数据超过配置的writeBufferSize(通过hbase.client.write.buffer配置,默认为2M)时才提交写数据请求,如果最后的写入数据没有超过2M,则在调用close方法时会进行最后的提交,当然,如果使用批量的put方法时,自己控制flushCommits则效果不同,比如每隔1000条进行一次提交,如果1000条数据的总大小超过了2M,则实际上会发生多次提交,导致最终的提交次数多过只由writeBufferSize控制的提交次数,因此在实际的项目中,如果对写性能的要求比对数据的实时可查询和不可丢失的要求更高则可以设置autoFlush为false并采用单条写的put(final Put put)API,这样即可以简化写操作数据的程序代码,写入效率也更优,需要注意的是如果对数据的实时可查询和不可丢失有较高的要求则应该设置autoFlush为true并采用单条写的API,这样可以确保写一条即提交一条。
2、关于多线程写
在0.94.12这个版本中,对于写操作,HBase内部就是多线程,线程数量与批量提交的数据涉及的region个数相同,通常情况下不需要再自己写多线程代码,自己写的多线程代码主要是解决数据到HTable的put这个过程中的性能问题,数据进入put的缓存,当达到writeBufferSize设定的大小后才会真正发起写操作(如果不是自己控制flush),这个过程的线程数与这批数据涉及的region个数相同,会并行写入所有相关region,一般不会出现性能问题,当涉及的region个数过多时会导致创建过多的线程,消耗大量的内存,甚至会出现线程把内存耗尽而导致OutOfMemory的情况,比较理想的写入场景是调大writeBufferSize,并且一次写入适量的不同regionserver的region,这样可以充分把写压力分摊到多个服务器。
HBase写数据的客户端核心方法是HConnectionManager的processBatchCallback方法,相关源码如下:
public void flushCommits() throws IOException { try { Object[] results = new Object[writeBuffer.size()]; try { this.connection.processBatch(writeBuffer, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { … } finally { … } } public void processBatch(List<? extends Row> list, final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { … processBatchCallback(list, tableName, pool, results, null); } public <R> void processBatchCallback(List<? extends Row> list, byte[] tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { …. HRegionLocation [] lastServers = new HRegionLocation[results.length]; for (int tries = 0; tries < numRetries && retry; ++tries) { … // step 1: break up into regionserver-sized chunks and build the data structs Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { HRegionLocation loc = locateRegion(tableName, row.getRow()); byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); actionsByServer.put(loc, actions); //每一个region对应一个MultiAction对象,每个MultiAction对象持有该region所有的put Action } Action<R> action = new Action<R>(row, i); lastServers[i] = loc; actions.add(regionName, action); } } // step 2: make the requests,每个region 开启一个线程 Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>(actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3: collect the failures and successes and prepare for retry … // step 4: identify failures and prep for a retry (if applicable). … } … } |
3、在写入数据前,需要先定位具体的数据应该写入的region,核心方法:
//从缓存中定位region,通过NavigableMap实现,如果没有缓存则需查询.META.表 HRegionLocation getCachedLocation(final byte [] tableName, final byte [] row) { SoftValueSortedMap<byte [], HRegionLocation> tableLocations = getTableLocations(tableName); … //找到小于rowKey并且最接近rowKey的startKey对应的region,通过NavigableMap实现 possibleRegion = tableLocations.lowerValueByKey(row); if (possibleRegion == null) { return null; } //表的最末一个region的endKey是空字符串,如果不是最末一个region,则只有当rowKey小于endKey才返回region。 byte[] endKey = possibleRegion.getRegionInfo().getEndKey(); if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || KeyValue.getRowComparator(tableName).compareRows( endKey, 0, endKey.length, row, 0, row.length) > 0) { return possibleRegion; } return null; } |
二、服务端
服务端写数据的主要过程是:写WAL日志(如果没有关闭写WAL日志)-》写memstore-》触发flush memstore(如果memstore大小超过hbase.hregion.memstore.flush.size的设置值),在flush memstore过程中可能会触发compact和split操作,在以下内容会对写put方法、flush memstore、compact和split进行讲解。
1、HTableInterface接口操作HBase数据的API对应的服务端是由HRegionServer类实现,源代码如下:
//单条put public void put(final byte[] regionName, final Put put) throws IOException { HRegion region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { //检查HRegionServer的memstore总内存占用量是否已经超过了hbase.regionserver.global.memstore.upperLimit(默认值是0.4)或者hbase.regionserver.global.memstore.lowerLimit(默认值是0.35)的限制,如果超过了则会在flush队列中添加一个任务,其中如果是超过了upper的限制则会阻塞所有的写memstore的操作,直到内存降至lower限制以下。 this.cacheFlusher.reclaimMemStoreMemory(); } boolean writeToWAL = put.getWriteToWAL(); //region会调用Store的add()方法把数据保存到相关Store的memstore中 //region在保存完数据后,会检查是否需要flush memstore,如果需要则发出flush请求,由HRegionServer的flush守护线程异步执行。 region.put(put, getLockFromId(put.getLockId()), writeToWAL); } //批量put public int put(final byte[] regionName, final List<Put> puts) throws IOException { region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } OperationStatus codes[] = region.batchMutate(putsWithLocks); for (i = 0; i < codes.length; i++) { if (codes[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { return i; } } return -1; } |
2、Flush Memstore
memstore的flush过程由类MemStoreFlusher控制,该类是Runnable的实现类,在HRegionServer启动时会启动一个MemStoreFlusher的守护线程,每隔10s从flushQueue中获取flush任务进行刷新,如果需要flush memstore时,只需调用MemStoreFlusher的requestFlush或者requestDelayedFlush方法把flush请求加入到flush队列中即可,具体的flush是异步执行的。
memstore的大小有两个控制级别:
1)Region级
a、hbase.hregion.memstore.flush.size:默认值128M,超过将被flush到磁盘
b、hbase.hregion.memstore.block.multiplier:默认值2,如果memstore的内存大小已经超过了hbase.hregion.memstore.flush.size的2倍,则会阻塞该region的写操作,直到内存大小降至该值以下
2)RegionServer级
a、hbase.regionserver.global.memstore.lowerLimit:默认值0.35,HRegionServer的所有memstore占用内存在HRegionServer总内存中占的lower比例,当达到该值,则会触发整个RegionServer的flush(并不会真正flush所有的region,关于该点请参看后续内容),直到总内存比例降至该数限制以下
b、hbase.regionserver.global.memstore.upperLimit:默认值0.4,HRegionServer的所有memstore占用内存在总内存中的upper比例,当达到该值,则会触发整个RegionServer的flush,直到总内存比例降至该数限制以下,并且在降至限制比例以下前将阻塞所有的写memstore的操作
在对整个HRegionServer进行flush操作时,并不会刷新所有的region,而是每次均会根据region的memstore大小、storeFile数量等因素找出最需要flush的region进行flush,flush完成后再进行内存总比例的判断,如果还未降至lower限制以下则会再寻找新的region进行flush。
在flush region时会flush该region下所有的store,虽然可能某些store的memstore内容很少。
在flush memstore时会产生updatesLock(HRegion类的一个属性,采用jdk的ReentrantReadWriteLock实现)的排它锁write lock,当获取完memstore的快照后释放updatesLock的write lock,在释放之前,所有的需要获取updatesLock的write、read lock的操作均会被阻塞,该影响是整个HRegion范围,因此如果表的HRegion数量过少,或者数据写入时热点在一个region时会导致该region不断flush memstore,由于该过程会产生write排他锁(虽然进行memstore快照的时间会很快),因此会影响region 的整体写能力。
3、Compact操作
HBase有两种compact:minor和major,minor通常会把若干个小的storeFile合并成一个大的storeFile,minor不会删除标示为删除的数据和过期的数据,major则会删除这些数据,major合并之后,一个store只有一个storeFile文件,这个过程对store的所有数据进行重写,有较大的资源开销,major 合并默认1天执行一次,可以通过hbase.hregion.majorcompaction配置执行周期,通常是把该值设置为0进行关闭,采用手工执行,这样可以避免当集群繁忙时执行整个集群的major合并,major合并是必须执行的操作,因为删除标示为删除和过期的数据操作是在该合并过程中进行的。通过merge可以对表的两个region进行合并,以减少region的数量,执行命令:
$ bin/hbase org.apache.hadoop.hbase.util.Merge <tablename> <region1> <region2>
参数<region1>需要写region的名称,比如:
gd500M,4-605-52-78641,1384227418983.ccf74696ef8a241088356039a65e1aca
执行该操作时需要先停止运行HBase集群,并且如果hdfs不是与HBase拥有相同的用户组和用户且hdfs配置为需要进行权限控制(由配置项dfs.permissions控制,默认为true)时需要切换linux用户到hdfs用户下执行该操作,执行完成后,需要通过hadoop dfs