第一部分 核心设计篇
第1章 HDFS的数据存储
本章将从HDFS的数据存储开始说起,因为正是先有了数据的存储,才有后续的写入和管理等操作。HDFS的数据存储包括两块:一块是HDFS内存存储,另一块是HDFS异构存储。HDFS内存存储是一种十分特殊的存储方式,将会对集群数据的读写带来不小的性能提升,而HDFS异构存储则能帮助我们更加合理地把数据存到应该存的地方。
1.1 HDFS内存存储
HDFS的内存存储是HDFS所有数据存储方式中比较特殊的一种,与之后将会提到的HDFS缓存有一些相同之处:都用机器的内存作为存储数据的载体。不同之处在于:HDFS缓存需要用户主动设置目标待缓存的文件、目录,其间需要使用HDFS缓存管理命令。而HDFS内存存储策略:LAZY_PERSIST则直接将内存作为数据存放的载体,可以这么理解,此时节点的内存也充当了一块“磁盘”。只要将文件设置为内存存储方式,最终会将其存储在节点的内存中。综合地看,HDFS缓存更像是改进用户使用的一种功能,而HDFS内存存储则是从底层扩展了HDFS的数据存储方式。本节将对HDFS内存存储策略进行更细致的分析。
1.1.1 HDFS内存存储原理
对于内存存储的存储策略,可能很多人会存有这么几种看法:
❑数据临时维持在内存中,服务一停止,数据全部丢失。
❑数据存在于内存中,在服务停止时做持久化处理,最终将数据全部写入到磁盘。
仔细来看以上这2种观点,其实都有不小的瑕疵:
❑第一个观点,服务一旦停止,内存数据全部丢失,这是无法接受的,我们只能容忍内存中少量的数据丢失。这个观点的另一个问题是,内存的存储空间是有限的,在服务运行过程中如果不及时处理一部分数据,内存空间迟早会被耗尽。
❑第二个观点,在服务停止退出的时候做持久化操作,同样会面临上面提到的内存空间的限制问题。如果机器的内存足够大,数据可能会很多,那么最后写入磁盘的阶段速度会很慢。
所以一般情况下,通用的、比较好的做法是异步持久化,什么意思呢?在内存存储新数据的同时,持久化距离当前时刻最远(存储时间最早)的数据。换一个通俗的解释,好比有个内存数据块队列,在队列头部不断有新增的数据块插入,就是待存储的块,因为资源有限,需要把队列尾部的块,也就是更早些时间点的块持久化到磁盘中,这样才有空间存储新的块。然后形成这样的一个循环,新的块加入,老的块移除,保证了整体数据的更新。
HDFS的LAZY_PERSIST内存存储策略用的就是这套方法,原理如图1-1。
图1-1 LAZY_PERSIST策略原理图
上面描述的原理在图中的表示是第4个步骤和第6个步骤。第4步写数据到内存中,第6步异步地将数据写到磁盘。前面几个步骤是如何设置StorageType的操作,在下文中会具体提到。所以异步存储的大体步骤可以归纳如下:
1)对目标文件目录设置StoragePolicy为LAZY_PERSIST的内存存储策略。
2)客户端进程向NameNode发起创建/写文件的请求。
3)客户端请求到具体的DataNode后DataNode会把这些数据块写入RAM内存中,同时启动异步线程服务将内存数据持久化写到磁盘上。
内存的异步持久化存储是内存存储与其他介质存储不同的地方。这也是LAZY_PERSIST名称的源由,数据不是马上落盘,而是懒惰的、延时地进行处理。
1.1.2 Linux 虚拟内存盘
这里需要了解一个额外的知识点:Linux虚拟内存盘。之前笔者也一直有个疑惑,内存也可以当作一个块盘使用?内存不就是临时存数据用的吗?于是在学习此模块知识之前,特意查了相关的资料。其实在Linux中,的确有将内存模拟为一个块盘的技术,叫虚拟内存盘(RAM disk)。这是一种模拟的盘,实际数据都是存放在内存中的。虚拟内存盘可以在某些特定的内存式存储文件系统下结合使用,比如tmpfs、ramfs。关于tmpfs的具体内容,大家可以查阅维基百科等资料。通过此项技术,我们就可以将机器内存利用起来,作为一块独立的虚拟盘供DataNode使用了。
1.1.3 HDFS的内存存储流程分析
下面讲述本章的核心内容:HDFS内存存储的主要流程。不要小看这个存储策略,里面的过程可并不简单,在下面的内容中,笔者会给出比较多的过程图,帮助大家理解。
1. HDFS文件内存存储策略设置
要想让文件数据存储到内存中,一开始要做的操作是设置此文件的存储策略,即上面提到的LAZY_PERSIST,而不是使用默认的存储策略:StoragePolicy.DEFAULT,默认策略的存储介质是DISK类型的。设置存储策略的方法目前有以下3种:
第一种方法,通过命令行的方式,调用如下命令:
hdfs storagepolicies -setStoragePolicy -path <path> -policy LAZY_PERSIST
这种方式比较方便、快速。
第二种方法,调用对应的程序方法,比如调用暴露在外部的create文件方法,但是得带上参数CreateFlag.LAZY_PERSIST。如下所示:
FSDataOutputStream fos = fs.create( path, FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.LAZY_PERSIST), bufferLength, replicationFactor, blockSize, null);
上述方式最终调用的是DFSClient的create同名方法,如下所示:
// DFSClient创建文件方法 public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt) throws IOException { return create(src, permission, flag, true, replication, blockSize, progress, buffersize, checksumOpt, null); }
方法经过RPC层层调用,经过FSNamesystem,最终会到FSDirWriteFileOp的startFile方法,在此方法内部,会有设置存储策略的动作:
static HdfsFileStatus startFile( FSNamesystem fsn, FSPermissionChecker pc, String src, PermissionStatus permissions, String holder, String clientMachine, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks, boolean logRetryEntry) throws IOException { assert fsn.hasWriteLock(); boolean create = flag.contains(CreateFlag.CREATE); boolean overwrite = flag.contains(CreateFlag.OVERWRITE); // 判断CreateFlag是否带有LAZY_PERSIST标识,来判断是否是内存存储策略 boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST); ... // 在此设置策略 setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip, isLazyPersist); fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " + src + " inode " + newNode.getId() + " " + holder); } return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath); }
这部分的过程调用见图1-2。
图1-2 LAZY_PERSIST策略设置流程图
还有一种方法是通过FileSystem的setStoragePolicy方法,不过此方法在还未发布的2.8版本中提供,如下所示:
fs.setStoragePolicy(path, "LAZY_PERSIST");
这种方式的优点在于可以用程序动态地设置目标路径的存储方式。
以上就是存储策略的设置过程,这一部分还是非常直接明了的。
2. LAZY_PERSIST内存存储
当我们为文件设置了LAZY_PERSIST的存储方式之后,DataNode如何进行内存式的存储呢?笔者在下面会分模块、分角色进行介绍。
首先要介绍的是LAZY_PERSIST相关结构。在之前的内容中已经提到过,在数据存储的同时会有另外一批数据被异步地持久化,所以这里一定会涉及多个服务对象的合作。这些服务对象的指挥者是FsDatasetImpl,它是一个管理DataNode所有磁盘读写的管家。
在FsDatasetImpl中,与内存存储相关的服务对象有3个,如图1-3所示。
图1-3 LAZY_PERSIST相关服务对象
说明如下:
❑RamDiskAsyncLazyPersistService:此对象是异步持久化线程服务,针对每一个磁盘块设置一个对应的线程池,需要持久化到给定磁盘的数据块会被提交到对应的线程池中去。每个线程池的最大线程数为1。
❑LazyWriter:这是一个线程服务,此线程会不断地从数据块列表中取出数据块,将数据块加入到异步持久化线程池RamDiskAsyncLazyPersistService中去执行。
❑RamDiskReplicaLruTracker:是副本块跟踪类,此类中维护了所有已持久化、未持久化的副本以及总副本数据信息。所以当一个副本被最终存储到内存中后,相应地会有副本所属队列信息的变更。当节点内存不足时,会将最近最少被访问的副本块移除。
以上3者的紧密合作,最终实现HDFS的内存存储。下面是具体的角色介绍。
(1)RamDiskReplicaLruTracker
RamDiskReplicaLruTracker起到了一个中间人的角色,它内部维护了多个关系的数据块信息,主要是以下3类:
public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker { ... // blockpool Id对副本信息的映射图 Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps; // 待写入磁盘的副本队列 Queue<RamDiskReplicaLru> replicasNotPersisted; // 已持久化写入磁盘的映射图 TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted; ...
这里的Queue<RamDiskReplicaLru>就是待存入内存存储队列。以上3个变量之间的关系见图1-4。
图1-4 RamDisk副本块结构关系图
RamDiskReplicaLruTracker中的方法操作绝大多数与这3个变量的增删改动相关,所以逻辑并不复杂,我们只需要了解这些方法有什么作用即可。笔者将方法分成了以下两类:第一类,异步持久化操作相关方法。如图1-5所示。
图1-5 异步持久化操作相关流程图
当节点重启或者有新的文件设置了LAZY_PERSIST策略后,就会有新的副本块存储到内存中,同时会加入到replicaNotPersisted队列中。经过中间的dequeueNextReplicaToPersist方法,取出下一个将被持久化的副本块,进行写磁盘的操作。在持久化的过程中将调用recordStartLazyPersist、recordEndLazyPersist这两个方法,标志着持久化状态的变更。
第二类,异步持久化操作无直接关联方法。方法如下:
1)discardReplica:当检测到不再需要某副本的时候(包括副本已被删除,或已损坏的情况),可以从内存中移除、撤销副本。
2)touch:恰好与Linux中的touch命令同名,此方法意味着访问了一次某特定的副本块,并会更新此副本块的lastUesdTime(最近一次使用时间)。lastUesdTime会在后面提到的LRU算法中起到关键的作用。
3)getNextCandidateForEviction:此方法在DataNode内存空间不足,需要内存额外预留出空间给新的副本块时被调用。此方法会根据所设置的eviction scheme模式,选择需要被移除的块,默认的策略模式是LRU策略。
这里反复提到一个名词LRU, LRU的全称是Least Recently Used,意为最近最少使用算法。getNextCandidateForEviction方法采用此算法的好处是保证了现有副本块的一个活跃度,把最近很久没有访问过的块给移除掉。对于这个操作,我们有必要了解其中的细节。
首先touch方法会更新副本块最近访问的时间:
synchronized void touch(final String bpid, final long blockId) { Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid); RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId); ... // 更新最近访问时间戳,并重新插入数据 if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) { ramDiskReplicaLru.lastUsedTime = Time.monotonicNow(); replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru); } } // 第二步获取候选移除块 synchronized RamDiskReplicaLru getNextCandidateForEviction() { // 获取replicasPersisted迭代器进行遍历 final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator(); while (it.hasNext()) { // 因为replicasPersisted已经根据时间排好序了,所以取出当前的块进行移除即可 final RamDiskReplicaLru ramDiskReplicaLru = it.next(); it.remove(); Map<Long, RamDiskReplicaLru> replicaMap = replicaMaps.get(ramDiskReplicaLru.getBlockPoolId()); if (replicaMap ! = null && replicaMap.get(ramDiskReplicaLru.getBlockId()) ! = null) { return ramDiskReplicaLru; } // 如果副本不存在,则继续下一个副本 } return null; }
这里比较有意思的是,根据已持久化块的访问时间来进行筛选移除,而不是直接在内存块对象中记录访问时间,然后进行排序和移除。最后在内存中移除与候选块属于同一副本信息的块并释放内存空间:
// 从内存中移除副本块信息直到满足需要字节数的大小 public void evictBlocks(long bytesNeeded) throws IOException { int iterations = 0; final long cacheCapacity = cacheManager.getCacheCapacity(); // 当检测到内存空间不满足外界需要的大小时 while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION && (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) { // 获取待移除副本信息 RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidate ForEviction(); if (replicaState == null) { break; } if (LOG.isDebugEnabled()) { LOG.debug("Evicting block " + replicaState); } ... // 移除内存中的相关块并释放空间 removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile, blockFileUsed, metaFileUsed, bpid); } } }
(2)LazyWriter
LazyWriter是一个线程服务,它是一个发动机,循环不断地从队列中取出待持久化的数据块,提交到异步持久化服务中去。其中主要的run方法如下所示:
public void run() { int numSuccessiveFailures = 0; while (fsRunning && shouldRun) { try { // 取出新的副本块并提交到异步服务中,返回是否提交成功的布尔值 numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1); // 如果所有的持久化操作失败,则进行睡眠等待,避免短时间内连续的重试 if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicas NotPersisted()) { Thread.sleep(checkpointerInterval * 1000); numSuccessiveFailures = 0; } } catch (InterruptedException e) { LOG.info("LazyWriter was interrupted, exiting"); break; } catch (Exception e) { LOG.warn("Ignoring exception in LazyWriter:", e); } } } 之后,进入saveNextReplica方法的处理: private boolean saveNextReplica() { RamDiskReplica block = null; FsVolumeReference targetReference; FsVolumeImpl targetVolume; ReplicaInfo replicaInfo; boolean succeeded = false; try { // 从队列中取出新的待持久化的块 block = ramDiskReplicaTracker.dequeueNextReplicaToPersist(); if (block ! = null) { synchronized (FsDatasetImpl.this) { ... // 提交到异步服务中去 asyncLazyPersistService.submitLazyPersistTask( block.getBlockPoolId(), block.getBlockId(), replicaInfo.getGenerationStamp(), block.getCreationTime(), replicaInfo.getMetaFile(), replicaInfo.getBlockFile(), targetReference); } } } succeeded = true; } catch(IOException ioe) { LOG.warn("Exception saving replica " + block, ioe); } finally { if (! succeeded && block ! = null) { LOG.warn("Failed to save replica " + block + ". re-enqueueing it."); // 进行副本块提交失败处理,此副本块将会再次提交到待持久化队列中 onFailLazyPersist(block.getBlockPoolId(), block.getBlockId()); } } return succeeded; }
LazyWriter线程服务的流程图可以归纳为图1-6。
图1-6 LazyWriter服务流程图
我们结合LazyWriter和RamDiskReplicaTracker跟踪服务,就可以得到下面一个完整的流程(暂且不考虑RamDiskAsyncLazyPersistService的内部执行),如图1-7所示。
(3)RamDiskAsyncLazyPersistService
最后一部分异步服务的内容相对就比较简单了,主要围绕着Volume磁盘和Executor线程池这两部分的内容,秉持着下面一个原则:
一个磁盘服务对应一个线程池,并且一个线程池的最大线程数也只有1个。
线程池列表定义如下:
图1-7 异步持久化流程图
class RamDiskAsyncLazyPersistService { ... private Map<File, ThreadPoolExecutor> executors = new HashMap<File, ThreadPoolExecutor>(); ...
这里的File代表一个磁盘上的目录,个人认为这里完全可以用String字符串替代。既可以减少存储空间,又直观明了。从这里可以看出磁盘服务与线程池一对一的关系了。
当服务启动的时候,就会有新的磁盘目录加入,如下代码所示:
synchronized void addVolume(File volume) { if (executors == null) { throw new RuntimeException("AsyncLazyPersistService is already shutdown"); } ThreadPoolExecutor executor = executors.get(volume); // 如果当前已存在此磁盘目录对应的线程池,则抛异常 if (executor ! = null) { throw new RuntimeException("Volume " + volume + " is already existed."); } // 否则进行添加 addExecutorForVolume(volume); } 之后,进入addExecutorForVolume方法: private void addExecutorForVolume(final File volume) { ... // 新建线程池,最大线程执行数为1 ThreadPoolExecutor executor = new ThreadPoolExecutor( CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); executor.allowCoreThreadTimeOut(true); // 加入到executors中,以volume作为key executors.put(volume, executor); }
还有一个需要注意的地方是提交执行方法submitLazyPersistTask,如下所示:
void submitLazyPersistTask(String bpId, long blockId, long genStamp, long creationTime, File metaFile, File blockFile, FsVolumeReference target) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: " + bpId + " block id: " + blockId); } // 获取需要持久化的目标磁盘实例 FsVolumeImpl volume = (FsVolumeImpl)target.getVolume(); File lazyPersistDir = volume.getLazyPersistDir(bpId); if (! lazyPersistDir.exists() && ! lazyPersistDir.mkdirs()) { FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir); throw new IOException("LazyWriter fail to find or create lazy persist dir: " + lazyPersistDir.toString()); } // 新建此服务Task ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask( bpId, blockId, genStamp, creationTime, blockFile, metaFile, target, lazyPersistDir); // 提交到对应volume的线程池中执行 execute(volume.getCurrentDir(), lazyPersistTask); }
如果在上述执行的过程中发生失败,会调用失败处理的方法,并会重新将此副本块插入到replicateNotPersisted队列中,等待下一次的持久化:
public void onFailLazyPersist(String bpId, long blockId) {
RamDiskReplica block = null;
block = ramDiskReplicaTracker.getReplica(bpId, blockId);
if (block ! = null) {
LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
// 重新插入队列操作
ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
}
}
其他如removeVolume等方法实现比较简单,这里不做过多介绍。图1-8是RamDisk-AsyncLazyPersistService总的结构图。
以上3部分描述了LAZT_PERSIST下的队列式内存数据块持久化服务、异步持久化服务的内部运行逻辑和LRU预留内存空间算法策略。
图1-8 RamDiskAsyncLazyPersistService相关结构图
1.1.4 LAZY_PERSIST内存存储的使用
介绍完原理部分之后,下面介绍具体的配置使用。
第一步,要使用LAZY_PERSIST内存存储策略,需要有对应的存储介质,内存存储介质对应的类型是RAM_DISK。
在使用RAM_DISK之前,需要完成虚拟内存盘的配置工作,这里以tmpfs文件系统为例进行介绍。在默认情况下,tmpfs是被挂载到/dev/shm,并且大小是32GB。也就是说,在此目录下的数据实质上是存在于内存中的。但是有的时候,我们可能会想挂载到自己想要挂载的目录下,而且我们也想对内存的使用大小进行有效的控制,可以使用下面的命令进行这2方面的设置:
sudo mount -t tmpfs -o size=16g tmpfs /mnt/dn-tmpfs/
以上操作的意思是将tmpfs挂载到目录/mnt/dn-tmpfs,并且限制内存使用大小为16GB。最后,建议在/etc/fstab文件中将这层挂载关系写入,这可以让机器在重启之后自动创建好挂载关系。
首先需要将机器中已经完成好的虚拟内存盘配置到dfs.datanode.data.dir中,其次还要带上RAM_DISK标签,以此表明此目录对应的存储介质为RAM_DISK,配置样例如下:
<property> <name>dfs.datanode.data.dir</name> <value>/grid/0, /grid/1, /grid/2, [RAM_DISK]/mnt/dn-tmpfs</value> </property>
注意,这个标签是必须要打上的,否则HDFS默认的都是DISK。
第二步就是设置具体的文件策略类型。
注意
❑ 确保HDFS异构存储策略没有被关闭,默认是开启的,配置项是dfs.storage. policy.enabled。
❑ 确认dfs.datanode.max.locked.memory是否设置了足够大的内存值,是否已是DataNode能承受的最大内存大小。内存值过小会导致内存中的总的可存储的数据块变少,但如果超过DataNode能承受的最大内存大小的话,部分内存块会被直接移出。
FsDatasetAsyncDiskService
在FsDatasetImpl类中,还有一个与内存存储异步持久化相类似的服务FsDataset-AsyncDiskService。在类的实现逻辑上,该服务与RamDiskAsyncLazyPersistService有许多相似之处,同样包含许多执行线程池,并且每个线程池对应一个存储目录。不过在具体的执行内容上,它主要做两类异步任务:
❑文件目录的异步删除。
❑异步执行SyncFileRange请求操作。SyncFileRange的全称是sync a file segment with disk,它的作用在于数据做多次更新后,对其进行一次性的写出,提高IO的效率。
FsDatasetAsyncDiskService类位于包org.apache.hadoop.hdfs.server.datanode. fsdataset.impl下,感兴趣的读者可以自行研究。
在HDFS异构存储方式中,除了内存存储之外,其实还有另外一类存储方式也尤为重要,就是HDFS的Archival Storage。Archival Storage指的是一种高密度的存储方式,以此解决集群数据规模增长带来的存储空间不足的问题。通常用于Archival Storage的节点不需要很好的计算性能,一般用于冷数据的存储。HDFS的Archival Storage的具体设计与实现,可以参阅相关JIRA, HDFS-6584(Support Archival Storage)。
1.2 HDFS异构存储
Hadoop在2.6.0版本中引入了一个新特性:异构存储。异构存储关键在于“异构”两个字。异构存储可以根据各个存储介质读写特性的不同发挥各自的优势。一个很适用的场景就是上节提到的冷热数据的存储。针对冷数据,采用容量大的、读写性能不高的存储介质存储,比如最普通的磁盘。而对于热数据而言,可以采用SSD的方式进行存储,这样就能保证高效的读性能,在速率上甚至能做到十倍或百倍于普通磁盘的读写速度。换句话说,HDFS异构存储特性的出现使得我们不需要搭建2套独立的集群来存放冷热2类数据,在一套集群内就能完成。所以这个功能还是有非常大的实用价值的。本节就带领大家全面了解HDFS的异构存储,包括异构存储的类型、存储策略、HDFS如何做到智能化的异构存储等。
1.2.1 异构存储类型
以下是在HDFS中声明的Storage Type:
❑RAM_DISK
❑SSD
❑DISK
❑ARCHIVE
HDFS中定义了这4种异构存储类型,SSD、DISK一看就知道是什么意思,这里看一下其余的两个。RAM_DISK其实就是内存,而ARCHIVE并没有特指哪种存储介质,主要指的是高密度存储介质,用于解决数据扩容的问题。这4种类型定义在StorageType类中,如下所示:
public enum StorageType {
// 根据存储的速度,从快到慢
RAM_DISK(true),
SSD(false),
DISK(false),
ARCHIVE(false);
...
其中true或者false代表此类存储类型是否为transient特性。transient的意思是转瞬即逝的,并非持久化的。在上述4类介质中,只有内存存储才是transient特性的。在HDFS中,如果没有主动声明数据目录存储类型,默认都是DISK类型。这4类存储介质之间一个很大的区别在于读写速度,从上到下依次减慢。所以将热数据存在内存中或是SSD中会是不错的选择,而将冷数据存放于DISK和ARCHIVE类型的介质中会更好。在HDFS中,StorageType的设定非常重要。那么如何让HDFS知道集群中的数据存储目录分别是哪种类型的存储介质呢?这就需要在配置属性时主动声明,HDFS并没有自动检测识别的功能。配置属性dfs.datanode.data.dir可以对本地对应存储目录进行设置,同时带上一个存储类型标签,声明此目录用的是哪种类型的存储介质,例子如下:
[SSD]file:///grid/dn/ssd0
如果目录前没有带上[SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]这4种类型中的任何一种,则默认是DISK类型。图1-9是存储介质结构图。
图1-9 HDFS存储介质类型
1.2.2 异构存储原理
了解完异构存储类型后,我们有必要了解一下HDFS异构存储的实现原理。本节会结合部分HDFS源码进行阐述。HDFS异构存储可总结为以下三点:
❑DataNode通过心跳汇报自身数据存储目录的StorageType给NameNode。
❑随后NameNode进行汇总并更新集群内各个节点的存储类型情况。
❑待复制文件根据自身设定的存储策略信息向NameNode请求拥有此类型存储介质的DataNode作为候选节点。
从以上3点来看,HDFS异构存储原理并不复杂。下面结合部分源码,来一步步跟踪内部的过程细节。
1. DataNode存储目录汇报
首先是数据存储目录的解析与心跳汇报过程。在FsDatasetImpl的构造函数中对dataDir进行存储目录的解析,生成了StorageType的List列表:
// FsDatasetImple初始构造函数
FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
) throws IOException {
...
String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
dataLocations, storage);
...
真正调用的是DataNode的getStorageLocations方法:
public static List<StorageLocation> getStorageLocations(Configuration conf) { // 获取dfs.datanode.data.dir配置中的多个目录地址字符串 Collection<String> rawLocations = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); List<StorageLocation> locations = new ArrayList<StorageLocation>(rawLocations.size()); for(String locationString : rawLocations) { final StorageLocation location; try { // 解析为对应的StorageLocation location = StorageLocation.parse(locationString); } catch (IOException ioe) { LOG.error("Failed to initialize storage directory " + locationString + ". Exception details: " + ioe); // 此处忽略异常 continue; } catch (SecurityException se) { LOG.error("Failed to initialize storage directory " + locationString + ". Exception details: " + se); // 此处忽略异常 continue; } // 将解析好的StorageLocation加入到列表中 locations.add(location); } return locations; }
当然我们最关心如何解析配置并最终得到对应存储类型的过程,即下面这行操作所执行的内容:
location = StorageLocation.parse(locationString);
StorageLocation的解析方法如下:
public static StorageLocation parse(String rawLocation)
throws IOException, SecurityException {
// 采用正则匹配的方式进行解析
Matcher matcher = regex.matcher(rawLocation);
StorageType storageType = StorageType.DEFAULT;
String location = rawLocation;
if (matcher.matches()) {
String classString = matcher.group(1);
location = matcher.group(2);
if (! classString.isEmpty()) {
storageType =
StorageType.valueOf(StringUtils.toUpperCase(classString));
}
}
return new StorageLocation(storageType, new Path(location).toUri());
}
这里的StorageType.DEFAULT就是DISK,在StorageType中定义如下:
public static final StorageType DEFAULT = DISK;
后续这些解析好的存储目录以及对应的存储介质类型会加入到storageMap中,如下所示:
private void addVolume(Collection<StorageLocation> dataLocations, Storage.StorageDirectory sd) throws IOException { final File dir = sd.getCurrentDir(); final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); ... synchronized (this) { volumeMap.addAll(tempVolumeMap); storageMap.put(sd.getStorageUuid(), new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); ... }
storageMap存储了目录到类型的映射关系,可以说是非常细粒度的。更重要的是,这些信息会被DataNode组织成StorageReport通过心跳的形式上报给NameNode。于是就来到了第一阶段的下半过程:
public StorageReport[] getStorageReports(String bpid) throws IOException { List<StorageReport> reports; synchronized (statsLock) { List<FsVolumeImpl> curVolumes = getVolumes(); reports = new ArrayList<>(curVolumes.size()); for (FsVolumeImpl volume : curVolumes) { try (FsVolumeReference ref = volume.obtainReference()) { // 获取磁盘存储信息,并生成磁盘报告实例 StorageReport sr = new StorageReport(volume.toDatanodeStorage(), false, volume.getCapacity(), volume.getDfsUsed(), volume.getAvailable(), volume.getBlockPoolUsed(bpid)); // 将报告实例加入到报告列表中 reports.add(sr); } catch (ClosedChannelException e) { continue; } } } // 返回报告列表 return reports.toArray(new StorageReport[reports.size()]); }
以上是StorageReport的组织过程,它最终被BPServiceActor的sendHeartBeat调用,发送给NameNode,如下所示:
HeartbeatResponse sendHeartBeat() throws IOException { // 获取存储类型情况报告信息 StorageReport[] reports = dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) { LOG.debug("Sending heartbeat with " + reports.length + " storage reports from service actor: " + this); } // 获取坏磁盘数据信息 VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary ! = null ? volumeFailureSummary.getFailedStorageLocations().length : 0; // 还有DataNode自身的存储容量信息,最后发送给NameNode return bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary); }
2.存储心跳信息的更新处理
现在来到了第二阶段的心跳处理过程。心跳处理在DatanodeManager的handleHeartbeat中进行:
// 心跳处理方法
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
...
heartbeatManager.updateHeartbeat(nodeinfo, reports,
cacheCapacity, cacheUsed,
xceiverCount, failedVolumes,
volumeFailureSummary);
...
最终在heartbeatManager中会调用到DatanodeDescription对象的updateHeartbeatState方法,该方法会更新Storage的信息,如下所示:
// 处理心跳中统计值相关的操作 public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int volFailures, VolumeFailureSummary volumeFailureSummary) { ... for (StorageReport report : reports) { DatanodeStorageInfo storage = updateStorage(report.getStorage()); if (checkFailedStorages) { failedStorageInfos.remove(storage); } storage.receivedHeartbeat(report); // 进行统计计数的更新统计 totalCapacity += report.getCapacity(); totalRemaining += report.getRemaining(); totalBlockPoolUsed += report.getBlockPoolUsed(); totalDfsUsed += report.getDfsUsed(); } rollBlocksScheduled(getLastUpdateMonotonic()); ...
3.目标存储介质类型节点的请求
各个DataNode心跳信息都更新完毕之后,有目标存储介质需求的待复制文件块就会向NameNode请求DataNode,这部分处理在FSNamesystem的getAdditionDatanode中进行:
// 获取剩余块方法 LocatedBlock getAdditionalDatanode(String src, long fileId, final ExtendedBlock blk, final DatanodeInfo[] existings, final String[] storageIDs, final Set<Node> excludes, final int numAdditionalNodes, final String clientName ) throws IOException { ... final INodeFile file = checkLease(src, clientName, inode, fileId); clientMachine = file.getFileUnderConstructionFeature().getClientMachine(); clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine); preferredblocksize = file.getPreferredBlockSize(); // 获取待复制文件的存储策略Id,对应的就是存储策略信息类型 storagePolicyID = file.getStoragePolicyID(); // 寻找存储目录信息 final DatanodeManager dm = blockManager.getDatanodeManager(); // 获取已存在节点的存储目录列表信息 chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs)); } finally { readUnlock(); } ... // 选择满足需求的节点 final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode( src, numAdditionalNodes, clientnode, chosen, excludes, preferredblocksize, storagePolicyID); final LocatedBlock lb = new LocatedBlock(blk, targets); blockManager.setBlockToken(lb, AccessMode.COPY); return lb; }
目标存储节点信息就被设置到了具体块的信息中。这里的target类型为Datanode-StorageInfo,代表的是DataNode中的一个dataDir存储目录。上述代码中具体blockManager如何根据给定的候选DatanodeStorageInfo存储目录和存储策略来选择出目标节点,就是下一节将要重点阐述的存储介质选择策略。本节最后给出HDFS的异构存储过程调用的简单流程图,如图1-10所示。
图1-10 异构存储过程图
1.2.3 块存储类型选择策略
在现有的HDFS中,我们可以对块的网络拓朴位置进行策略的选择,同样,对于数据的存储介质,HDFS也有对应的若干种策略。对于一个完整的存储类型选择策略,有如下的
基本信息定义:
// 块存储类型选择策略对象 @InterfaceAudience.Private public class BlockStoragePolicy { public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy .class); // 策略唯一标识Id private final byte id; // 策略名称 private final String name; // 对于一个新块,存储副本块的可选存储类型信息组 private final StorageType[] storageTypes; // 对于第一个创建块,fallback情况时的可选存储类型 private final StorageType[] creationFallbacks; // 对于块的其余副本,fallback情况时的可选存储类型 private final StorageType[] replicationFallbacks; // 当创建文件的时候,是否继承祖先目录信息的策略,主要用于主动设置策略的时候 private boolean copyOnCreateFile; ...
这里出现了fallback的情况,什么叫做fallback的情况呢?即当前存储类型不可用的时候,退一级选择使用的存储类型。
相应的逻辑代码如下:
public List<StorageType> chooseStorageTypes(final short replication, final Iterable<StorageType> chosen, final EnumSet<StorageType> unavailables, final boolean isNewBlock) { ... for(int i = storageTypes.size() - 1; i >= 0; i--) { // 获取当前需要的存储类型 final StorageType t = storageTypes.get(i); // 如果当前的存储类型是在不可用的存储类型列表中,选择fallback的情况 if (unavailables.contains(t)) { // 根据是否为新块还是普通的副本块,选择相应的fallback的StorageType final StorageType fallback = isNewBlock? getCreationFallback(unavailables) : getReplicationFallback(unavailables); if (fallback == null) { removed.add(storageTypes.remove(i)); } else { storageTypes.set(i, fallback); } } } ...
在getFallback方法中会选取第一个满足条件的fallback的StorageType:
private static StorageType getFallback(EnumSet<StorageType> unavailables, StorageType[] fallbacks) { for(StorageType fb : fallbacks) { // 如果找到满足条件的StorageType,立即返回 if (! unavailables.contains(fb)) { return fb; } } return null; }
当然这些都只是单一的存储类型选择策略。HDFS在使用的时候也不是新建一个StoragePolicy对象直接调用,而是从BlockStoragePolicySuite策略集合中获取策略。
1.2.4 块存储策略集合
块存储策略集合是BlockStoragePolicySuite。在此类内部定义了6种策略,不仅仅分为冷热数据两种类型,其详细策略描述可见源代码中的解释。类策略名称如下:
❑HOT
❑COLD
❑WARM
❑ALL_SSD
❑ONE_SSD
❑LAZY_PERSIST
在这6种策略中,前三种策略和后三种策略可以看作是两大类。前三种策略是根据冷热数据的角度来区分的,后三种策略是根据存放盘的性质来区分的。策略倒是划分出来了,但是这些不同的策略之间的主要区别在哪里呢,答案就是候选存储类型组。
在创建BlockStoragePolicySuite的时候,对这些策略都进行了构造,如下所示:
// 块存储策略集的构造初始化 public static BlockStoragePolicySuite createDefaultSuite() { final BlockStoragePolicy[] policies = new BlockStoragePolicy[1 << ID_BIT_LENGTH]; final byte lazyPersistId = HdfsConstants.MEMORY_STORAGE_POLICY_ID; // LAZY_PERSIST策略构造 policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId, HdfsConstants.MEMORY_STORAGE_POLICY_NAME, new StorageType[]{StorageType.RAM_DISK, StorageType.DISK}, new StorageType[]{StorageType.DISK}, new StorageType[]{StorageType.DISK}, true); // Cannot be changed on regular files, but inherited. final byte allssdId = HdfsConstants.ALLSSD_STORAGE_POLICY_ID; // ALL_SSD策略构造 policies[allssdId] = new BlockStoragePolicy(allssdId, HdfsConstants.ALLSSD_STORAGE_POLICY_NAME, new StorageType[]{StorageType.SSD}, new StorageType[]{StorageType.DISK}, new StorageType[]{StorageType.DISK}); final byte onessdId = HdfsConstants.ONESSD_STORAGE_POLICY_ID; // ONE_SSD策略构造 policies[onessdId] = new BlockStoragePolicy(onessdId, HdfsConstants.ONESSD_STORAGE_POLICY_NAME, new StorageType[]{StorageType.SSD, StorageType.DISK}, new StorageType[]{StorageType.SSD, StorageType.DISK}, new StorageType[]{StorageType.SSD, StorageType.DISK}); final byte hotId = HdfsConstants.HOT_STORAGE_POLICY_ID; // HOT策略构造 policies[hotId] = new BlockStoragePolicy(hotId, HdfsConstants.HOT_STORAGE_POLICY_NAME, new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY, new StorageType[]{StorageType.ARCHIVE}); final byte warmId = HdfsConstants.WARM_STORAGE_POLICY_ID; // WARM策略构造 policies[warmId] = new BlockStoragePolicy(warmId, HdfsConstants.WARM_STORAGE_POLICY_NAME, new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}); final byte coldId = HdfsConstants.COLD_STORAGE_POLICY_ID; // CLOD策略构造 policies[coldId] = new BlockStoragePolicy(coldId, HdfsConstants.COLD_STORAGE_POLICY_NAME, new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY, StorageType.EMPTY_ARRAY); return new BlockStoragePolicySuite(hotId, policies); }
在这些策略对象的参数中,第三个参数起决定性作用,因为第三个参数会被返回给副本块作为候选存储类型。在storageTypes参数中,有时可能只有1个类型声明,例如ALL_SSD策略只有如下StorageType:
new StorageType[]{StorageType.SSD}
而ONE_SSD却有两个:
new StorageType[]{StorageType.SSD, StorageType.DISK}
这里面其实是有原因的。因为块有多副本机制,每个策略要为所有的副本都返回相应的StorageType,如果副本数超过候选的StorageType数组时应怎么处理,答案在下面这个方法中:
public List<StorageType> chooseStorageTypes(final short replication) { final List<StorageType> types = new LinkedList<StorageType>(); int i = 0, j = 0; // 从前往后依次匹配存储类型与对应的副本下标相匹配,同时要过滤掉 // transient属性的存储类型 for (; i < replication && j < storageTypes.length; ++j) { if (! storageTypes[j].isTransient()) { types.add(storageTypes[j]); ++i; } } // 获取最后一个存储类型,统一作为多余副本的存储类型 final StorageType last = storageTypes[storageTypes.length - 1]; if (! last.isTransient()) { for (; i < replication; i++) { types.add(last); } } return types; }
这样的话,ONE_SSD就必然只有第一个块的副本块是此类型的,其余副本则是DISK类型存储,而ALL_SSD则将会全部是SSD的存储。图1-11给出存储策略集合的结构图。
图1-11 存储策略集合
上述策略中有一个策略在前面提到过,就是LAZY_PERSIST。此策略在执行的时候会先将数据写到内存中,然后再持久化。大家可以试试此策略,看看性能到底如何。
1.2.5 块存储策略的调用
分析完块存储策略的种类之后,我们看看HDFS在哪些地方设置了这些策略。
首先,我们要知道HDFS的默认策略是哪种,默认策略如下:
@VisibleForTesting public static BlockStoragePolicySuite createDefaultSuite() { ... return new BlockStoragePolicySuite(hotId, policies); } ... public BlockStoragePolicySuite(byte defaultPolicyID, BlockStoragePolicy[] policies) { this.defaultPolicyID = defaultPolicyID; this.policies = policies; }
可以看出,这就是HOT的策略。也就是说,在默认情况下,HDFS把集群中的数据都看成是经常访问的数据。然后进一步查看getPolicy的方法调用,如图1-12所示。
图1-12 getPolicy方法调用
我们以方法chooseTarget4NewBlock为例,追踪一下上游的调用过程。
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src, final int numOfReplicas, final Node client, final Set<Node> excludedNodes, final long blocksize, final List<String> favoredNodes, final byte storagePolicyID) throws IOException { List<DatanodeDescriptor> favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy (storagePolicyID); ...
在父方法中获取了StoragePolicyID策略ID,往上追踪,来到了FSNamesystem的get-NewBlockTargets方法:
DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
...
replication = pendingFile.getFileReplication();
storagePolicyID = pendingFile.getStoragePolicyID();
} finally {
readUnlock();
}
if (clientNode == null) {
clientNode = getClientNode(clientMachine);
}
// 为新分配的块选择目标节点
return getBlockManager().chooseTarget4NewBlock(
src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
storagePolicyID);
}
于是我们看到StoragePolicyID是从INodeFile中获取而来的。这与上文中目标节点请求的过程类似,都有从File中获取策略Id的动作。那么新的问题又来了,INodeFile中的StoragePolicyID从何而来呢,有以下两个途径:
❑通过RPC接口主动设置。
❑没有主动设置的ID会继承父目录的策略,如果父目录还是没有设置策略,则会设置ID_UNSPECIFIED,继而会用DEFAULT(默认)存储策略进行替代,源码如下:
... public byte getStoragePolicyID() { byte id = getLocalStoragePolicyID(); if (id == ID_UNSPECIFIED) { return this.getParent() ! = null ? this.getParent().getStoragePolicyID() : id; } return id; } ...
综上,HDFS异构存储总的过程调用见图1-13。
1.2.6 HDFS异构存储策略的不足之处
前面花了很多的篇幅介绍了HDFS各种异构存储策略的特点、优势以及过程调用,那么是否这套机制是完美无缺的呢?答案当然不是,下面场景就不适合使用此机制。
用户A在HDFS上创建自己的存储目录/user/A,不设置任何的存储策略,也就是默认都存放在DISK类型的介质上。忽然有一天,他发现自己的数据已经不怎么使用了,想要设置其存储策略为COLD类型,于是他执行了相应策略的setStoragePolicy命令。那么这步命令操作完了是否意味着用户A的目的达到了呢?
图1-13 异构存储策略总的过程调用
问题就出在变更,目前HDFS上还不能对文件目录存储策略变更做出自动的数据迁移。这里需要用户额外执行hdfs -mover命令做文件目录的扫描。在mover命令扫描的过程中,如果发现文件目录的实际存储类型与其所设置的storagePolicy策略不同,将会进行数据块的迁移,将数据迁移到相对应的存储介质中。这里所指的文件目录存储策略变更有以下两类情况:
❑原先未设置StoragePolicy,后来进行了设置。
❑原先设置了A策略,后来又设置了B策略。
针对这个问题,社区目前已经有相关的JIRA在解决这个问题,HDFS-10285(Storage Policy Satisfier in Namenode),在这个JIRA里已经包含了详细的设计文档,如果这个问题被解决了,将会是一个很实用的功能。
1.2.7 HDFS存储策略的使用
本节最后介绍几个关于存储策略的使用命令,帮助大家真正学会运用这个强大的特性。输入hdfs storagepolicies -help,你会得到以下三大操作命令:
$ hdfs storagepolicies -help [-listPolicies] // 列出目前现有的存储策略 [-setStoragePolicy -path <path> -policy <policy>] // 对目标文件/目录设置存储策略
以下为此命令的必填参数:
<path> 需要设置存储策略的文件/目录路径 <policy> 对目标设置的存储策略 [-getStoragePolicy -path <path>] // 获取给定路径的存储策略
以下为此命令的必填参数:
<path> 需要获取存储策略的输入路径
在以上三大操作命令中,setStoragePolicy为设置命令,listPolicies和getStoragePolicy都是获取命令。最简单的使用方法是事先划分好冷热数据的存储目录,设置好对应的存储策略,后续使用相应的程序在对应分类目录下写数据,自动继承父目录的存储策略。在较新版的Hadoop发布版本中增加了数据迁移工具。此工具的重要用途在于它会扫描HDFS上的文件,判断文件是否满足其内部设置的存储策略;如果不满足,就会重新迁移数据到目标存储类型节点上。使用方式如下:
$ hdfs mover -help Usage:hdfsmover[-p<files/dirs>|-f<local file>]//hdfsmover数据迁移命令 -pl <files/dirs> // 需要被迁移的HDFS文件/目录的路径 -f <local file> // 需要被迁移的HDFS文件/目录对应的本地文件系统路径
其中一个参数针对HDFS的文件目录,另一个参数针对本地的文件。
HDFS异构存储功能的出现绝对是解决冷热数据存储问题的一把利器,希望通过本节内容的阐述能给大家带来全新的认识。
1.3 小结
本章介绍了HDFS的内存存储和异构存储,前者是后者的一种存储方式。HDFS的内存存储方式的难点在于它如何对数据进行持久化,其中还涉及LRU算法。而HDFS异构存储的重点在于理解此套机制的整体实现原理,通过对DataNode上的数据目录以及HDFS上的目标路径打标签的方式来进行数据的存储选择。