深度剖析Hadoop HDFS
上QQ阅读APP看书,第一时间看更新

第一部分 核心设计篇

第1章 HDFS的数据存储

本章将从HDFS的数据存储开始说起,因为正是先有了数据的存储,才有后续的写入和管理等操作。HDFS的数据存储包括两块:一块是HDFS内存存储,另一块是HDFS异构存储。HDFS内存存储是一种十分特殊的存储方式,将会对集群数据的读写带来不小的性能提升,而HDFS异构存储则能帮助我们更加合理地把数据存到应该存的地方。

1.1 HDFS内存存储

HDFS的内存存储是HDFS所有数据存储方式中比较特殊的一种,与之后将会提到的HDFS缓存有一些相同之处:都用机器的内存作为存储数据的载体。不同之处在于:HDFS缓存需要用户主动设置目标待缓存的文件、目录,其间需要使用HDFS缓存管理命令。而HDFS内存存储策略:LAZY_PERSIST则直接将内存作为数据存放的载体,可以这么理解,此时节点的内存也充当了一块“磁盘”。只要将文件设置为内存存储方式,最终会将其存储在节点的内存中。综合地看,HDFS缓存更像是改进用户使用的一种功能,而HDFS内存存储则是从底层扩展了HDFS的数据存储方式。本节将对HDFS内存存储策略进行更细致的分析。

1.1.1 HDFS内存存储原理

对于内存存储的存储策略,可能很多人会存有这么几种看法:

❑数据临时维持在内存中,服务一停止,数据全部丢失。

❑数据存在于内存中,在服务停止时做持久化处理,最终将数据全部写入到磁盘。

仔细来看以上这2种观点,其实都有不小的瑕疵:

❑第一个观点,服务一旦停止,内存数据全部丢失,这是无法接受的,我们只能容忍内存中少量的数据丢失。这个观点的另一个问题是,内存的存储空间是有限的,在服务运行过程中如果不及时处理一部分数据,内存空间迟早会被耗尽。

❑第二个观点,在服务停止退出的时候做持久化操作,同样会面临上面提到的内存空间的限制问题。如果机器的内存足够大,数据可能会很多,那么最后写入磁盘的阶段速度会很慢。

所以一般情况下,通用的、比较好的做法是异步持久化,什么意思呢?在内存存储新数据的同时,持久化距离当前时刻最远(存储时间最早)的数据。换一个通俗的解释,好比有个内存数据块队列,在队列头部不断有新增的数据块插入,就是待存储的块,因为资源有限,需要把队列尾部的块,也就是更早些时间点的块持久化到磁盘中,这样才有空间存储新的块。然后形成这样的一个循环,新的块加入,老的块移除,保证了整体数据的更新。

HDFS的LAZY_PERSIST内存存储策略用的就是这套方法,原理如图1-1。

图1-1 LAZY_PERSIST策略原理图

上面描述的原理在图中的表示是第4个步骤和第6个步骤。第4步写数据到内存中,第6步异步地将数据写到磁盘。前面几个步骤是如何设置StorageType的操作,在下文中会具体提到。所以异步存储的大体步骤可以归纳如下:

1)对目标文件目录设置StoragePolicy为LAZY_PERSIST的内存存储策略。

2)客户端进程向NameNode发起创建/写文件的请求。

3)客户端请求到具体的DataNode后DataNode会把这些数据块写入RAM内存中,同时启动异步线程服务将内存数据持久化写到磁盘上。

内存的异步持久化存储是内存存储与其他介质存储不同的地方。这也是LAZY_PERSIST名称的源由,数据不是马上落盘,而是懒惰的、延时地进行处理。

1.1.2 Linux 虚拟内存盘

这里需要了解一个额外的知识点:Linux虚拟内存盘。之前笔者也一直有个疑惑,内存也可以当作一个块盘使用?内存不就是临时存数据用的吗?于是在学习此模块知识之前,特意查了相关的资料。其实在Linux中,的确有将内存模拟为一个块盘的技术,叫虚拟内存盘(RAM disk)。这是一种模拟的盘,实际数据都是存放在内存中的。虚拟内存盘可以在某些特定的内存式存储文件系统下结合使用,比如tmpfs、ramfs。关于tmpfs的具体内容,大家可以查阅维基百科等资料。通过此项技术,我们就可以将机器内存利用起来,作为一块独立的虚拟盘供DataNode使用了。

1.1.3 HDFS的内存存储流程分析

下面讲述本章的核心内容:HDFS内存存储的主要流程。不要小看这个存储策略,里面的过程可并不简单,在下面的内容中,笔者会给出比较多的过程图,帮助大家理解。

1. HDFS文件内存存储策略设置

要想让文件数据存储到内存中,一开始要做的操作是设置此文件的存储策略,即上面提到的LAZY_PERSIST,而不是使用默认的存储策略:StoragePolicy.DEFAULT,默认策略的存储介质是DISK类型的。设置存储策略的方法目前有以下3种:

第一种方法,通过命令行的方式,调用如下命令:

        hdfs storagepolicies -setStoragePolicy -path <path> -policy LAZY_PERSIST

这种方式比较方便、快速。

第二种方法,调用对应的程序方法,比如调用暴露在外部的create文件方法,但是得带上参数CreateFlag.LAZY_PERSIST。如下所示:

        FSDataOutputStream fos =
                fs.create(
                    path,
                    FsPermission.getFileDefault(),

                        EnumSet.of(CreateFlag.CREATE, CreateFlag.LAZY_PERSIST),
                        bufferLength,
                        replicationFactor,
                        blockSize,
                        null);

上述方式最终调用的是DFSClient的create同名方法,如下所示:

        // DFSClient创建文件方法
        public DFSOutputStream create(String src, FsPermission permission,
            EnumSet<CreateFlag> flag, short replication, long blockSize,
            Progressable progress, int buffersize, ChecksumOpt checksumOpt)
            throws IOException {
          return create(src, permission, flag, true,
              replication, blockSize, progress, buffersize, checksumOpt, null);
        }

方法经过RPC层层调用,经过FSNamesystem,最终会到FSDirWriteFileOp的startFile方法,在此方法内部,会有设置存储策略的动作:

        static HdfsFileStatus startFile(
            FSNamesystem fsn, FSPermissionChecker pc, String src,
            PermissionStatus permissions, String holder, String clientMachine,
            EnumSet<CreateFlag> flag, boolean createParent,
            short replication, long blockSize,
            EncryptionKeyInfo ezInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
            boolean logRetryEntry)
            throws IOException {
          assert fsn.hasWriteLock();
          boolean create = flag.contains(CreateFlag.CREATE);
          boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
          // 判断CreateFlag是否带有LAZY_PERSIST标识,来判断是否是内存存储策略
          boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
          ...
          // 在此设置策略
          setNewINodeStoragePolicy(fsd.getBlockManager(), newNode, iip,
                                    isLazyPersist);
          fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
          if (NameNode.stateChangeLog.isDebugEnabled()) {
            NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
                src + " inode " + newNode.getId() + " " + holder);
          }
          return FSDirStatAndListingOp.getFileInfo(fsd, src, false, isRawPath);
        }

这部分的过程调用见图1-2。

图1-2 LAZY_PERSIST策略设置流程图

还有一种方法是通过FileSystem的setStoragePolicy方法,不过此方法在还未发布的2.8版本中提供,如下所示:

        fs.setStoragePolicy(path, "LAZY_PERSIST");

这种方式的优点在于可以用程序动态地设置目标路径的存储方式。

以上就是存储策略的设置过程,这一部分还是非常直接明了的。

2. LAZY_PERSIST内存存储

当我们为文件设置了LAZY_PERSIST的存储方式之后,DataNode如何进行内存式的存储呢?笔者在下面会分模块、分角色进行介绍。

首先要介绍的是LAZY_PERSIST相关结构。在之前的内容中已经提到过,在数据存储的同时会有另外一批数据被异步地持久化,所以这里一定会涉及多个服务对象的合作。这些服务对象的指挥者是FsDatasetImpl,它是一个管理DataNode所有磁盘读写的管家。

在FsDatasetImpl中,与内存存储相关的服务对象有3个,如图1-3所示。

图1-3 LAZY_PERSIST相关服务对象

说明如下:

❑RamDiskAsyncLazyPersistService:此对象是异步持久化线程服务,针对每一个磁盘块设置一个对应的线程池,需要持久化到给定磁盘的数据块会被提交到对应的线程池中去。每个线程池的最大线程数为1。

❑LazyWriter:这是一个线程服务,此线程会不断地从数据块列表中取出数据块,将数据块加入到异步持久化线程池RamDiskAsyncLazyPersistService中去执行。

❑RamDiskReplicaLruTracker:是副本块跟踪类,此类中维护了所有已持久化、未持久化的副本以及总副本数据信息。所以当一个副本被最终存储到内存中后,相应地会有副本所属队列信息的变更。当节点内存不足时,会将最近最少被访问的副本块移除。

以上3者的紧密合作,最终实现HDFS的内存存储。下面是具体的角色介绍。

(1)RamDiskReplicaLruTracker

RamDiskReplicaLruTracker起到了一个中间人的角色,它内部维护了多个关系的数据块信息,主要是以下3类:

        public class RamDiskReplicaLruTracker extends RamDiskReplicaTracker {
          ...
          // blockpool Id对副本信息的映射图
          Map<String, Map<Long, RamDiskReplicaLru>> replicaMaps;
          // 待写入磁盘的副本队列
          Queue<RamDiskReplicaLru> replicasNotPersisted;
          // 已持久化写入磁盘的映射图
          TreeMultimap<Long, RamDiskReplicaLru> replicasPersisted;
          ...

这里的Queue<RamDiskReplicaLru>就是待存入内存存储队列。以上3个变量之间的关系见图1-4。

图1-4 RamDisk副本块结构关系图

RamDiskReplicaLruTracker中的方法操作绝大多数与这3个变量的增删改动相关,所以逻辑并不复杂,我们只需要了解这些方法有什么作用即可。笔者将方法分成了以下两类:第一类,异步持久化操作相关方法。如图1-5所示。

图1-5 异步持久化操作相关流程图

当节点重启或者有新的文件设置了LAZY_PERSIST策略后,就会有新的副本块存储到内存中,同时会加入到replicaNotPersisted队列中。经过中间的dequeueNextReplicaToPersist方法,取出下一个将被持久化的副本块,进行写磁盘的操作。在持久化的过程中将调用recordStartLazyPersist、recordEndLazyPersist这两个方法,标志着持久化状态的变更。

第二类,异步持久化操作无直接关联方法。方法如下:

1)discardReplica:当检测到不再需要某副本的时候(包括副本已被删除,或已损坏的情况),可以从内存中移除、撤销副本。

2)touch:恰好与Linux中的touch命令同名,此方法意味着访问了一次某特定的副本块,并会更新此副本块的lastUesdTime(最近一次使用时间)。lastUesdTime会在后面提到的LRU算法中起到关键的作用。

3)getNextCandidateForEviction:此方法在DataNode内存空间不足,需要内存额外预留出空间给新的副本块时被调用。此方法会根据所设置的eviction scheme模式,选择需要被移除的块,默认的策略模式是LRU策略。

这里反复提到一个名词LRU, LRU的全称是Least Recently Used,意为最近最少使用算法。getNextCandidateForEviction方法采用此算法的好处是保证了现有副本块的一个活跃度,把最近很久没有访问过的块给移除掉。对于这个操作,我们有必要了解其中的细节。

首先touch方法会更新副本块最近访问的时间:

        synchronized void touch(final String bpid,
                                  final long blockId) {
          Map<Long, RamDiskReplicaLru> map = replicaMaps.get(bpid);
          RamDiskReplicaLru ramDiskReplicaLru = map.get(blockId);
          ...

          // 更新最近访问时间戳,并重新插入数据
          if (replicasPersisted.remove(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru)) {
            ramDiskReplicaLru.lastUsedTime = Time.monotonicNow();
            replicasPersisted.put(ramDiskReplicaLru.lastUsedTime, ramDiskReplicaLru);
          }
        }
        // 第二步获取候选移除块
        synchronized RamDiskReplicaLru getNextCandidateForEviction() {
          // 获取replicasPersisted迭代器进行遍历
          final Iterator<RamDiskReplicaLru> it = replicasPersisted.values().iterator();
          while (it.hasNext()) {
            // 因为replicasPersisted已经根据时间排好序了,所以取出当前的块进行移除即可
            final RamDiskReplicaLru ramDiskReplicaLru = it.next();
            it.remove();
            Map<Long, RamDiskReplicaLru> replicaMap =
                replicaMaps.get(ramDiskReplicaLru.getBlockPoolId());
            if (replicaMap ! = null && replicaMap.get(ramDiskReplicaLru.getBlockId()) ! = null) {
              return ramDiskReplicaLru;
            }
            // 如果副本不存在,则继续下一个副本
          }
          return null;
        }

这里比较有意思的是,根据已持久化块的访问时间来进行筛选移除,而不是直接在内存块对象中记录访问时间,然后进行排序和移除。最后在内存中移除与候选块属于同一副本信息的块并释放内存空间:

        // 从内存中移除副本块信息直到满足需要字节数的大小
        public void evictBlocks(long bytesNeeded) throws IOException {
          int iterations = 0;
          final long cacheCapacity = cacheManager.getCacheCapacity();
          // 当检测到内存空间不满足外界需要的大小时
          while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
                  (cacheCapacity - cacheManager.getCacheUsed()) < bytesNeeded) {
            // 获取待移除副本信息
            RamDiskReplica replicaState = ramDiskReplicaTracker.getNextCandidate
              ForEviction();
            if (replicaState == null) {
              break;
            }
            if (LOG.isDebugEnabled()) {
              LOG.debug("Evicting block " + replicaState);
            }

              ...
              // 移除内存中的相关块并释放空间
              removeOldReplica(replicaInfo, newReplicaInfo, blockFile, metaFile,
                  blockFileUsed, metaFileUsed, bpid);
              }
            }
          }

(2)LazyWriter

LazyWriter是一个线程服务,它是一个发动机,循环不断地从队列中取出待持久化的数据块,提交到异步持久化服务中去。其中主要的run方法如下所示:

        public void run() {
          int numSuccessiveFailures = 0;
          while (fsRunning && shouldRun) {
            try {
              // 取出新的副本块并提交到异步服务中,返回是否提交成功的布尔值
              numSuccessiveFailures = saveNextReplica() ? 0 : (numSuccessiveFailures + 1);
              // 如果所有的持久化操作失败,则进行睡眠等待,避免短时间内连续的重试
              if (numSuccessiveFailures >= ramDiskReplicaTracker.numReplicas
                NotPersisted()) {
                Thread.sleep(checkpointerInterval * 1000);
                numSuccessiveFailures = 0;
              }
            } catch (InterruptedException e) {
              LOG.info("LazyWriter was interrupted, exiting");
              break;
            } catch (Exception e) {
              LOG.warn("Ignoring exception in LazyWriter:", e);
            }
          }
        }
        之后,进入saveNextReplica方法的处理:
        private boolean saveNextReplica() {
          RamDiskReplica block = null;
          FsVolumeReference targetReference;
          FsVolumeImpl targetVolume;
          ReplicaInfo replicaInfo;
          boolean succeeded = false;
          try {
            // 从队列中取出新的待持久化的块
            block = ramDiskReplicaTracker.dequeueNextReplicaToPersist();
            if (block ! = null) {
              synchronized (FsDatasetImpl.this) {
                ...

                  // 提交到异步服务中去
                  asyncLazyPersistService.submitLazyPersistTask(
                      block.getBlockPoolId(), block.getBlockId(),
                      replicaInfo.getGenerationStamp(), block.getCreationTime(),
                      replicaInfo.getMetaFile(), replicaInfo.getBlockFile(),
                      targetReference);
                  }
                }
              }
              succeeded = true;
            } catch(IOException ioe) {
              LOG.warn("Exception saving replica " + block, ioe);
            } finally {
              if (! succeeded && block ! = null) {
                LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
                // 进行副本块提交失败处理,此副本块将会再次提交到待持久化队列中
                onFailLazyPersist(block.getBlockPoolId(), block.getBlockId());
              }
            }
            return succeeded;
          }

LazyWriter线程服务的流程图可以归纳为图1-6。

图1-6 LazyWriter服务流程图

我们结合LazyWriter和RamDiskReplicaTracker跟踪服务,就可以得到下面一个完整的流程(暂且不考虑RamDiskAsyncLazyPersistService的内部执行),如图1-7所示。

(3)RamDiskAsyncLazyPersistService

最后一部分异步服务的内容相对就比较简单了,主要围绕着Volume磁盘和Executor线程池这两部分的内容,秉持着下面一个原则:

一个磁盘服务对应一个线程池,并且一个线程池的最大线程数也只有1个。

线程池列表定义如下:

图1-7 异步持久化流程图

        class RamDiskAsyncLazyPersistService {
        ...
          private Map<File, ThreadPoolExecutor> executors
              = new HashMap<File, ThreadPoolExecutor>();
        ...

这里的File代表一个磁盘上的目录,个人认为这里完全可以用String字符串替代。既可以减少存储空间,又直观明了。从这里可以看出磁盘服务与线程池一对一的关系了。

当服务启动的时候,就会有新的磁盘目录加入,如下代码所示:

        synchronized void addVolume(File volume) {
          if (executors == null) {
            throw new RuntimeException("AsyncLazyPersistService is already shutdown");
          }
          ThreadPoolExecutor executor = executors.get(volume);
          // 如果当前已存在此磁盘目录对应的线程池,则抛异常
          if (executor ! = null) {
            throw new RuntimeException("Volume " + volume + " is already existed.");
          }
          // 否则进行添加
          addExecutorForVolume(volume);
        }
        之后,进入addExecutorForVolume方法:
        private void addExecutorForVolume(final File volume) {
          ...
          // 新建线程池,最大线程执行数为1
          ThreadPoolExecutor executor = new ThreadPoolExecutor(
              CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
              THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
              new LinkedBlockingQueue<Runnable>(), threadFactory);
          executor.allowCoreThreadTimeOut(true);
          // 加入到executors中,以volume作为key

          executors.put(volume, executor);
        }

还有一个需要注意的地方是提交执行方法submitLazyPersistTask,如下所示:

        void submitLazyPersistTask(String bpId, long blockId,
            long genStamp, long creationTime,
            File metaFile, File blockFile,
            FsVolumeReference target) throws IOException {
          if (LOG.isDebugEnabled()) {
            LOG.debug("LazyWriter schedule async task to persist RamDisk block pool id: "
                + bpId + " block id: " + blockId);
          }
          // 获取需要持久化的目标磁盘实例
          FsVolumeImpl volume = (FsVolumeImpl)target.getVolume();
          File lazyPersistDir   = volume.getLazyPersistDir(bpId);
          if (! lazyPersistDir.exists() && ! lazyPersistDir.mkdirs()) {
            FsDatasetImpl.LOG.warn("LazyWriter failed to create " + lazyPersistDir);
            throw new IOException("LazyWriter fail to find or create lazy persist dir: "
                + lazyPersistDir.toString());
          }
          // 新建此服务Task
          ReplicaLazyPersistTask lazyPersistTask = new ReplicaLazyPersistTask(
              bpId, blockId, genStamp, creationTime, blockFile, metaFile,
              target, lazyPersistDir);
          // 提交到对应volume的线程池中执行
          execute(volume.getCurrentDir(), lazyPersistTask);
        }

如果在上述执行的过程中发生失败,会调用失败处理的方法,并会重新将此副本块插入到replicateNotPersisted队列中,等待下一次的持久化:

        public void onFailLazyPersist(String bpId, long blockId) {
          RamDiskReplica block = null;
          block = ramDiskReplicaTracker.getReplica(bpId, blockId);
          if (block ! = null) {
            LOG.warn("Failed to save replica " + block + ". re-enqueueing it.");
            // 重新插入队列操作
            ramDiskReplicaTracker.reenqueueReplicaNotPersisted(block);
          }
        }

其他如removeVolume等方法实现比较简单,这里不做过多介绍。图1-8是RamDisk-AsyncLazyPersistService总的结构图。

以上3部分描述了LAZT_PERSIST下的队列式内存数据块持久化服务、异步持久化服务的内部运行逻辑和LRU预留内存空间算法策略。

图1-8 RamDiskAsyncLazyPersistService相关结构图

1.1.4 LAZY_PERSIST内存存储的使用

介绍完原理部分之后,下面介绍具体的配置使用。

第一步,要使用LAZY_PERSIST内存存储策略,需要有对应的存储介质,内存存储介质对应的类型是RAM_DISK。

在使用RAM_DISK之前,需要完成虚拟内存盘的配置工作,这里以tmpfs文件系统为例进行介绍。在默认情况下,tmpfs是被挂载到/dev/shm,并且大小是32GB。也就是说,在此目录下的数据实质上是存在于内存中的。但是有的时候,我们可能会想挂载到自己想要挂载的目录下,而且我们也想对内存的使用大小进行有效的控制,可以使用下面的命令进行这2方面的设置:

        sudo mount -t tmpfs -o size=16g tmpfs /mnt/dn-tmpfs/

以上操作的意思是将tmpfs挂载到目录/mnt/dn-tmpfs,并且限制内存使用大小为16GB。最后,建议在/etc/fstab文件中将这层挂载关系写入,这可以让机器在重启之后自动创建好挂载关系。

首先需要将机器中已经完成好的虚拟内存盘配置到dfs.datanode.data.dir中,其次还要带上RAM_DISK标签,以此表明此目录对应的存储介质为RAM_DISK,配置样例如下:

        <property>
          <name>dfs.datanode.data.dir</name>
          <value>/grid/0, /grid/1, /grid/2, [RAM_DISK]/mnt/dn-tmpfs</value>
        </property>

注意,这个标签是必须要打上的,否则HDFS默认的都是DISK。

第二步就是设置具体的文件策略类型。

注意

❑ 确保HDFS异构存储策略没有被关闭,默认是开启的,配置项是dfs.storage. policy.enabled。

❑ 确认dfs.datanode.max.locked.memory是否设置了足够大的内存值,是否已是DataNode能承受的最大内存大小。内存值过小会导致内存中的总的可存储的数据块变少,但如果超过DataNode能承受的最大内存大小的话,部分内存块会被直接移出。

FsDatasetAsyncDiskService

在FsDatasetImpl类中,还有一个与内存存储异步持久化相类似的服务FsDataset-AsyncDiskService。在类的实现逻辑上,该服务与RamDiskAsyncLazyPersistService有许多相似之处,同样包含许多执行线程池,并且每个线程池对应一个存储目录。不过在具体的执行内容上,它主要做两类异步任务:

❑文件目录的异步删除。

❑异步执行SyncFileRange请求操作。SyncFileRange的全称是sync a file segment with disk,它的作用在于数据做多次更新后,对其进行一次性的写出,提高IO的效率。

FsDatasetAsyncDiskService类位于包org.apache.hadoop.hdfs.server.datanode. fsdataset.impl下,感兴趣的读者可以自行研究。

在HDFS异构存储方式中,除了内存存储之外,其实还有另外一类存储方式也尤为重要,就是HDFS的Archival Storage。Archival Storage指的是一种高密度的存储方式,以此解决集群数据规模增长带来的存储空间不足的问题。通常用于Archival Storage的节点不需要很好的计算性能,一般用于冷数据的存储。HDFS的Archival Storage的具体设计与实现,可以参阅相关JIRA, HDFS-6584(Support Archival Storage)。

1.2 HDFS异构存储

Hadoop在2.6.0版本中引入了一个新特性:异构存储。异构存储关键在于“异构”两个字。异构存储可以根据各个存储介质读写特性的不同发挥各自的优势。一个很适用的场景就是上节提到的冷热数据的存储。针对冷数据,采用容量大的、读写性能不高的存储介质存储,比如最普通的磁盘。而对于热数据而言,可以采用SSD的方式进行存储,这样就能保证高效的读性能,在速率上甚至能做到十倍或百倍于普通磁盘的读写速度。换句话说,HDFS异构存储特性的出现使得我们不需要搭建2套独立的集群来存放冷热2类数据,在一套集群内就能完成。所以这个功能还是有非常大的实用价值的。本节就带领大家全面了解HDFS的异构存储,包括异构存储的类型、存储策略、HDFS如何做到智能化的异构存储等。

1.2.1 异构存储类型

以下是在HDFS中声明的Storage Type:

❑RAM_DISK

❑SSD

❑DISK

❑ARCHIVE

HDFS中定义了这4种异构存储类型,SSD、DISK一看就知道是什么意思,这里看一下其余的两个。RAM_DISK其实就是内存,而ARCHIVE并没有特指哪种存储介质,主要指的是高密度存储介质,用于解决数据扩容的问题。这4种类型定义在StorageType类中,如下所示:

        public enum StorageType {
          // 根据存储的速度,从快到慢
          RAM_DISK(true),
          SSD(false),
          DISK(false),
          ARCHIVE(false);
          ...

其中true或者false代表此类存储类型是否为transient特性。transient的意思是转瞬即逝的,并非持久化的。在上述4类介质中,只有内存存储才是transient特性的。在HDFS中,如果没有主动声明数据目录存储类型,默认都是DISK类型。这4类存储介质之间一个很大的区别在于读写速度,从上到下依次减慢。所以将热数据存在内存中或是SSD中会是不错的选择,而将冷数据存放于DISK和ARCHIVE类型的介质中会更好。在HDFS中,StorageType的设定非常重要。那么如何让HDFS知道集群中的数据存储目录分别是哪种类型的存储介质呢?这就需要在配置属性时主动声明,HDFS并没有自动检测识别的功能。配置属性dfs.datanode.data.dir可以对本地对应存储目录进行设置,同时带上一个存储类型标签,声明此目录用的是哪种类型的存储介质,例子如下:

        [SSD]file:///grid/dn/ssd0

如果目录前没有带上[SSD]/[DISK]/[ARCHIVE]/[RAM_DISK]这4种类型中的任何一种,则默认是DISK类型。图1-9是存储介质结构图。

图1-9 HDFS存储介质类型

1.2.2 异构存储原理

了解完异构存储类型后,我们有必要了解一下HDFS异构存储的实现原理。本节会结合部分HDFS源码进行阐述。HDFS异构存储可总结为以下三点:

❑DataNode通过心跳汇报自身数据存储目录的StorageType给NameNode。

❑随后NameNode进行汇总并更新集群内各个节点的存储类型情况。

❑待复制文件根据自身设定的存储策略信息向NameNode请求拥有此类型存储介质的DataNode作为候选节点。

从以上3点来看,HDFS异构存储原理并不复杂。下面结合部分源码,来一步步跟踪内部的过程细节。

1. DataNode存储目录汇报

首先是数据存储目录的解析与心跳汇报过程。在FsDatasetImpl的构造函数中对dataDir进行存储目录的解析,生成了StorageType的List列表:

        // FsDatasetImple初始构造函数
        FsDatasetImpl(DataNode datanode, DataStorage storage, Configuration conf
            ) throws IOException {
          ...
          String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
          Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
          List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
              dataLocations, storage);
          ...

真正调用的是DataNode的getStorageLocations方法:

        public static List<StorageLocation> getStorageLocations(Configuration conf) {
          // 获取dfs.datanode.data.dir配置中的多个目录地址字符串

          Collection<String> rawLocations =
              conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
          List<StorageLocation> locations =
              new ArrayList<StorageLocation>(rawLocations.size());
          for(String locationString : rawLocations) {
            final StorageLocation location;
            try {
              // 解析为对应的StorageLocation
              location = StorageLocation.parse(locationString);
            } catch (IOException ioe) {
              LOG.error("Failed to initialize storage directory " + locationString
                  + ". Exception details: " + ioe);
              // 此处忽略异常
              continue;
            } catch (SecurityException se) {
              LOG.error("Failed to initialize storage directory " + locationString
                            + ". Exception details: " + se);
              // 此处忽略异常
              continue;
            }
            // 将解析好的StorageLocation加入到列表中
            locations.add(location);
          }
          return locations;
        }

当然我们最关心如何解析配置并最终得到对应存储类型的过程,即下面这行操作所执行的内容:

        location = StorageLocation.parse(locationString);
        StorageLocation的解析方法如下:
        public static StorageLocation parse(String rawLocation)
            throws IOException, SecurityException {
          // 采用正则匹配的方式进行解析
          Matcher matcher = regex.matcher(rawLocation);
          StorageType storageType = StorageType.DEFAULT;
          String location = rawLocation;
          if (matcher.matches()) {
            String classString = matcher.group(1);
            location = matcher.group(2);
            if (! classString.isEmpty()) {
              storageType =
                  StorageType.valueOf(StringUtils.toUpperCase(classString));
            }
          }

          return new StorageLocation(storageType, new Path(location).toUri());
        }

这里的StorageType.DEFAULT就是DISK,在StorageType中定义如下:

        public static final StorageType DEFAULT = DISK;

后续这些解析好的存储目录以及对应的存储介质类型会加入到storageMap中,如下所示:

        private void addVolume(Collection<StorageLocation> dataLocations,
            Storage.StorageDirectory sd) throws IOException {
          final File dir = sd.getCurrentDir();
          final StorageType storageType =
              getStorageTypeFromLocations(dataLocations, sd.getRoot());
          ...
          synchronized (this) {
            volumeMap.addAll(tempVolumeMap);
            storageMap.put(sd.getStorageUuid(),
                new DatanodeStorage(sd.getStorageUuid(),
                    DatanodeStorage.State.NORMAL,
                    storageType));
            ...
        }

storageMap存储了目录到类型的映射关系,可以说是非常细粒度的。更重要的是,这些信息会被DataNode组织成StorageReport通过心跳的形式上报给NameNode。于是就来到了第一阶段的下半过程:

        public StorageReport[] getStorageReports(String bpid)
            throws IOException {
          List<StorageReport> reports;
          synchronized (statsLock) {
            List<FsVolumeImpl> curVolumes = getVolumes();
            reports = new ArrayList<>(curVolumes.size());
            for (FsVolumeImpl volume : curVolumes) {
              try (FsVolumeReference ref = volume.obtainReference()) {
                // 获取磁盘存储信息,并生成磁盘报告实例
                StorageReport sr = new StorageReport(volume.toDatanodeStorage(),
                    false,
                    volume.getCapacity(),
                    volume.getDfsUsed(),
                    volume.getAvailable(),
                    volume.getBlockPoolUsed(bpid));
                // 将报告实例加入到报告列表中
                reports.add(sr);
              } catch (ClosedChannelException e) {
                continue;

                }
              }
            }
            // 返回报告列表
            return reports.toArray(new StorageReport[reports.size()]);
          }

以上是StorageReport的组织过程,它最终被BPServiceActor的sendHeartBeat调用,发送给NameNode,如下所示:

        HeartbeatResponse sendHeartBeat() throws IOException {
          // 获取存储类型情况报告信息
          StorageReport[] reports =
              dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
          if (LOG.isDebugEnabled()) {
            LOG.debug("Sending heartbeat with " + reports.length +
                      " storage reports from service actor: " + this);
          }
          // 获取坏磁盘数据信息
          VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
              .getVolumeFailureSummary();
          int numFailedVolumes = volumeFailureSummary ! = null ?
                volumeFailureSummary.getFailedStorageLocations().length : 0;
          // 还有DataNode自身的存储容量信息,最后发送给NameNode
          return bpNamenode.sendHeartbeat(bpRegistration,
              reports,
              dn.getFSDataset().getCacheCapacity(),
              dn.getFSDataset().getCacheUsed(),
              dn.getXmitsInProgress(),
              dn.getXceiverCount(),
              numFailedVolumes,
              volumeFailureSummary);
        }

2.存储心跳信息的更新处理

现在来到了第二阶段的心跳处理过程。心跳处理在DatanodeManager的handleHeartbeat中进行:

        // 心跳处理方法
        public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
            StorageReport[] reports, final String blockPoolId,
            long cacheCapacity, long cacheUsed, int xceiverCount,
            int maxTransfers, int failedVolumes,
            VolumeFailureSummary volumeFailureSummary) throws IOException {
          synchronized (heartbeatManager) {
            synchronized (datanodeMap) {
              DatanodeDescriptor nodeinfo = null;
              ...

                heartbeatManager.updateHeartbeat(nodeinfo, reports,
                                                  cacheCapacity, cacheUsed,
                                                  xceiverCount, failedVolumes,
                                                  volumeFailureSummary);
                ...

最终在heartbeatManager中会调用到DatanodeDescription对象的updateHeartbeatState方法,该方法会更新Storage的信息,如下所示:

        // 处理心跳中统计值相关的操作
        public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
            long cacheUsed, int xceiverCount, int volFailures,
            VolumeFailureSummary volumeFailureSummary) {
          ...
          for (StorageReport report : reports) {
            DatanodeStorageInfo storage = updateStorage(report.getStorage());
            if (checkFailedStorages) {
              failedStorageInfos.remove(storage);
            }
            storage.receivedHeartbeat(report);
            // 进行统计计数的更新统计
            totalCapacity += report.getCapacity();
            totalRemaining += report.getRemaining();
            totalBlockPoolUsed += report.getBlockPoolUsed();
            totalDfsUsed += report.getDfsUsed();
          }
          rollBlocksScheduled(getLastUpdateMonotonic());
          ...

3.目标存储介质类型节点的请求

各个DataNode心跳信息都更新完毕之后,有目标存储介质需求的待复制文件块就会向NameNode请求DataNode,这部分处理在FSNamesystem的getAdditionDatanode中进行:

        // 获取剩余块方法
        LocatedBlock getAdditionalDatanode(String src, long fileId,
            final ExtendedBlock blk, final DatanodeInfo[] existings,
            final String[] storageIDs,
            final Set<Node> excludes,
            final int numAdditionalNodes, final String clientName
            ) throws IOException {
            ...
            final INodeFile file = checkLease(src, clientName, inode, fileId);
            clientMachine = file.getFileUnderConstructionFeature().getClientMachine();
            clientnode = blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
            preferredblocksize = file.getPreferredBlockSize();
            // 获取待复制文件的存储策略Id,对应的就是存储策略信息类型
            storagePolicyID = file.getStoragePolicyID();

            // 寻找存储目录信息
            final DatanodeManager dm = blockManager.getDatanodeManager();
            // 获取已存在节点的存储目录列表信息
            chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs));
          } finally {
            readUnlock();
          }
          ...
          // 选择满足需求的节点
          final DatanodeStorageInfo[] targets = blockManager.chooseTarget4AdditionalDatanode(
              src, numAdditionalNodes, clientnode, chosen,
              excludes, preferredblocksize, storagePolicyID);
          final LocatedBlock lb = new LocatedBlock(blk, targets);
          blockManager.setBlockToken(lb, AccessMode.COPY);
          return lb;
        }

目标存储节点信息就被设置到了具体块的信息中。这里的target类型为Datanode-StorageInfo,代表的是DataNode中的一个dataDir存储目录。上述代码中具体blockManager如何根据给定的候选DatanodeStorageInfo存储目录和存储策略来选择出目标节点,就是下一节将要重点阐述的存储介质选择策略。本节最后给出HDFS的异构存储过程调用的简单流程图,如图1-10所示。

图1-10 异构存储过程图

1.2.3 块存储类型选择策略

在现有的HDFS中,我们可以对块的网络拓朴位置进行策略的选择,同样,对于数据的存储介质,HDFS也有对应的若干种策略。对于一个完整的存储类型选择策略,有如下的

基本信息定义:

        // 块存储类型选择策略对象
        @InterfaceAudience.Private
        public class BlockStoragePolicy {
          public static final Logger LOG = LoggerFactory.getLogger(BlockStoragePolicy
              .class);
          // 策略唯一标识Id
          private final byte id;
          // 策略名称
          private final String name;
          // 对于一个新块,存储副本块的可选存储类型信息组
          private final StorageType[] storageTypes;
          // 对于第一个创建块,fallback情况时的可选存储类型
          private final StorageType[] creationFallbacks;
          // 对于块的其余副本,fallback情况时的可选存储类型
          private final StorageType[] replicationFallbacks;
          // 当创建文件的时候,是否继承祖先目录信息的策略,主要用于主动设置策略的时候
          private boolean copyOnCreateFile;
          ...

这里出现了fallback的情况,什么叫做fallback的情况呢?即当前存储类型不可用的时候,退一级选择使用的存储类型。

相应的逻辑代码如下:

        public List<StorageType> chooseStorageTypes(final short replication,
            final Iterable<StorageType> chosen,
            final EnumSet<StorageType> unavailables,
            final boolean isNewBlock) {
          ...
          for(int i = storageTypes.size() - 1; i >= 0; i--) {
            // 获取当前需要的存储类型
            final StorageType t = storageTypes.get(i);
            // 如果当前的存储类型是在不可用的存储类型列表中,选择fallback的情况
            if (unavailables.contains(t)) {
              // 根据是否为新块还是普通的副本块,选择相应的fallbackStorageType
              final StorageType fallback = isNewBlock?
                  getCreationFallback(unavailables)
                  : getReplicationFallback(unavailables);
              if (fallback == null) {
                removed.add(storageTypes.remove(i));
              } else {
                storageTypes.set(i, fallback);
              }
            }
          }
          ...

在getFallback方法中会选取第一个满足条件的fallback的StorageType:

        private static StorageType getFallback(EnumSet<StorageType> unavailables,
            StorageType[] fallbacks) {
          for(StorageType fb : fallbacks) {
            // 如果找到满足条件的StorageType,立即返回
            if (! unavailables.contains(fb)) {
              return fb;
            }
          }
          return null;
        }

当然这些都只是单一的存储类型选择策略。HDFS在使用的时候也不是新建一个StoragePolicy对象直接调用,而是从BlockStoragePolicySuite策略集合中获取策略。

1.2.4 块存储策略集合

块存储策略集合是BlockStoragePolicySuite。在此类内部定义了6种策略,不仅仅分为冷热数据两种类型,其详细策略描述可见源代码中的解释。类策略名称如下:

❑HOT

❑COLD

❑WARM

❑ALL_SSD

❑ONE_SSD

❑LAZY_PERSIST

在这6种策略中,前三种策略和后三种策略可以看作是两大类。前三种策略是根据冷热数据的角度来区分的,后三种策略是根据存放盘的性质来区分的。策略倒是划分出来了,但是这些不同的策略之间的主要区别在哪里呢,答案就是候选存储类型组。

在创建BlockStoragePolicySuite的时候,对这些策略都进行了构造,如下所示:

        // 块存储策略集的构造初始化
        public static BlockStoragePolicySuite createDefaultSuite() {
          final BlockStoragePolicy[] policies =
              new BlockStoragePolicy[1 << ID_BIT_LENGTH];
          final byte lazyPersistId = HdfsConstants.MEMORY_STORAGE_POLICY_ID;
          // LAZY_PERSIST策略构造
          policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId,
              HdfsConstants.MEMORY_STORAGE_POLICY_NAME,
              new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
              new StorageType[]{StorageType.DISK},
              new StorageType[]{StorageType.DISK},
              true);     // Cannot be changed on regular files, but inherited.

          final byte allssdId = HdfsConstants.ALLSSD_STORAGE_POLICY_ID;
          // ALL_SSD策略构造
          policies[allssdId] = new BlockStoragePolicy(allssdId,
              HdfsConstants.ALLSSD_STORAGE_POLICY_NAME,
              new StorageType[]{StorageType.SSD},
              new StorageType[]{StorageType.DISK},
              new StorageType[]{StorageType.DISK});
          final byte onessdId = HdfsConstants.ONESSD_STORAGE_POLICY_ID;
          // ONE_SSD策略构造
          policies[onessdId] = new BlockStoragePolicy(onessdId,
              HdfsConstants.ONESSD_STORAGE_POLICY_NAME,
              new StorageType[]{StorageType.SSD, StorageType.DISK},
              new StorageType[]{StorageType.SSD, StorageType.DISK},
              new StorageType[]{StorageType.SSD, StorageType.DISK});
          final byte hotId = HdfsConstants.HOT_STORAGE_POLICY_ID;
          // HOT策略构造
          policies[hotId] = new BlockStoragePolicy(hotId,
              HdfsConstants.HOT_STORAGE_POLICY_NAME,
              new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
              new StorageType[]{StorageType.ARCHIVE});
          final byte warmId = HdfsConstants.WARM_STORAGE_POLICY_ID;
          // WARM策略构造
          policies[warmId] = new BlockStoragePolicy(warmId,
              HdfsConstants.WARM_STORAGE_POLICY_NAME,
              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE},
              new StorageType[]{StorageType.DISK, StorageType.ARCHIVE});
          final byte coldId = HdfsConstants.COLD_STORAGE_POLICY_ID;
          // CLOD策略构造
          policies[coldId] = new BlockStoragePolicy(coldId,
              HdfsConstants.COLD_STORAGE_POLICY_NAME,
              new StorageType[]{StorageType.ARCHIVE}, StorageType.EMPTY_ARRAY,
              StorageType.EMPTY_ARRAY);
          return new BlockStoragePolicySuite(hotId, policies);
        }

在这些策略对象的参数中,第三个参数起决定性作用,因为第三个参数会被返回给副本块作为候选存储类型。在storageTypes参数中,有时可能只有1个类型声明,例如ALL_SSD策略只有如下StorageType:

        new StorageType[]{StorageType.SSD}

而ONE_SSD却有两个:

        new StorageType[]{StorageType.SSD, StorageType.DISK}

这里面其实是有原因的。因为块有多副本机制,每个策略要为所有的副本都返回相应的StorageType,如果副本数超过候选的StorageType数组时应怎么处理,答案在下面这个方法中:

        public List<StorageType> chooseStorageTypes(final short replication) {
          final List<StorageType> types = new LinkedList<StorageType>();
          int i = 0, j = 0;
          // 从前往后依次匹配存储类型与对应的副本下标相匹配,同时要过滤掉
          // transient属性的存储类型
          for (; i < replication && j < storageTypes.length; ++j) {
            if (! storageTypes[j].isTransient()) {
              types.add(storageTypes[j]);
              ++i;
            }
          }
          // 获取最后一个存储类型,统一作为多余副本的存储类型
          final StorageType last = storageTypes[storageTypes.length - 1];
          if (! last.isTransient()) {
            for (; i < replication; i++) {
              types.add(last);
            }
          }
          return types;
        }

这样的话,ONE_SSD就必然只有第一个块的副本块是此类型的,其余副本则是DISK类型存储,而ALL_SSD则将会全部是SSD的存储。图1-11给出存储策略集合的结构图。

图1-11 存储策略集合

上述策略中有一个策略在前面提到过,就是LAZY_PERSIST。此策略在执行的时候会先将数据写到内存中,然后再持久化。大家可以试试此策略,看看性能到底如何。

1.2.5 块存储策略的调用

分析完块存储策略的种类之后,我们看看HDFS在哪些地方设置了这些策略。

首先,我们要知道HDFS的默认策略是哪种,默认策略如下:

        @VisibleForTesting
        public static BlockStoragePolicySuite    createDefaultSuite() {
          ...
          return new BlockStoragePolicySuite(hotId, policies);
        }
        ...
        public BlockStoragePolicySuite(byte defaultPolicyID,
            BlockStoragePolicy[] policies) {
          this.defaultPolicyID = defaultPolicyID;
          this.policies = policies;
        }

可以看出,这就是HOT的策略。也就是说,在默认情况下,HDFS把集群中的数据都看成是经常访问的数据。然后进一步查看getPolicy的方法调用,如图1-12所示。

图1-12 getPolicy方法调用

我们以方法chooseTarget4NewBlock为例,追踪一下上游的调用过程。

        public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
            final int numOfReplicas, final Node client,
            final Set<Node> excludedNodes,
            final long blocksize,
            final List<String> favoredNodes,
            final byte storagePolicyID) throws IOException {
          List<DatanodeDescriptor> favoredDatanodeDescriptors =
              getDatanodeDescriptors(favoredNodes);
          final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy
            (storagePolicyID);
          ...

在父方法中获取了StoragePolicyID策略ID,往上追踪,来到了FSNamesystem的get-NewBlockTargets方法:

        DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
            String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
            List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
            ...
            replication = pendingFile.getFileReplication();
            storagePolicyID = pendingFile.getStoragePolicyID();
          } finally {
            readUnlock();
          }
          if (clientNode == null) {
            clientNode = getClientNode(clientMachine);
          }
          // 为新分配的块选择目标节点
          return getBlockManager().chooseTarget4NewBlock(
              src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
              storagePolicyID);
        }

于是我们看到StoragePolicyID是从INodeFile中获取而来的。这与上文中目标节点请求的过程类似,都有从File中获取策略Id的动作。那么新的问题又来了,INodeFile中的StoragePolicyID从何而来呢,有以下两个途径:

❑通过RPC接口主动设置。

❑没有主动设置的ID会继承父目录的策略,如果父目录还是没有设置策略,则会设置ID_UNSPECIFIED,继而会用DEFAULT(默认)存储策略进行替代,源码如下:

        ...
        public byte getStoragePolicyID() {
          byte id = getLocalStoragePolicyID();
          if (id == ID_UNSPECIFIED) {
            return this.getParent() ! = null ?
                this.getParent().getStoragePolicyID() : id;
          }
        return id;
        }
        ...

综上,HDFS异构存储总的过程调用见图1-13。

1.2.6 HDFS异构存储策略的不足之处

前面花了很多的篇幅介绍了HDFS各种异构存储策略的特点、优势以及过程调用,那么是否这套机制是完美无缺的呢?答案当然不是,下面场景就不适合使用此机制。

用户A在HDFS上创建自己的存储目录/user/A,不设置任何的存储策略,也就是默认都存放在DISK类型的介质上。忽然有一天,他发现自己的数据已经不怎么使用了,想要设置其存储策略为COLD类型,于是他执行了相应策略的setStoragePolicy命令。那么这步命令操作完了是否意味着用户A的目的达到了呢?

图1-13 异构存储策略总的过程调用

问题就出在变更,目前HDFS上还不能对文件目录存储策略变更做出自动的数据迁移。这里需要用户额外执行hdfs -mover命令做文件目录的扫描。在mover命令扫描的过程中,如果发现文件目录的实际存储类型与其所设置的storagePolicy策略不同,将会进行数据块的迁移,将数据迁移到相对应的存储介质中。这里所指的文件目录存储策略变更有以下两类情况:

❑原先未设置StoragePolicy,后来进行了设置。

❑原先设置了A策略,后来又设置了B策略。

针对这个问题,社区目前已经有相关的JIRA在解决这个问题,HDFS-10285(Storage Policy Satisfier in Namenode),在这个JIRA里已经包含了详细的设计文档,如果这个问题被解决了,将会是一个很实用的功能。

1.2.7 HDFS存储策略的使用

本节最后介绍几个关于存储策略的使用命令,帮助大家真正学会运用这个强大的特性。输入hdfs storagepolicies -help,你会得到以下三大操作命令:

        $ hdfs storagepolicies -help
        [-listPolicies]  // 列出目前现有的存储策略
        [-setStoragePolicy -path <path> -policy <policy>]  // 对目标文件/目录设置存储策略

以下为此命令的必填参数:

        <path>     需要设置存储策略的文件/目录路径
        <policy>  对目标设置的存储策略
        [-getStoragePolicy -path <path>]  // 获取给定路径的存储策略

以下为此命令的必填参数:

        <path>  需要获取存储策略的输入路径

在以上三大操作命令中,setStoragePolicy为设置命令,listPolicies和getStoragePolicy都是获取命令。最简单的使用方法是事先划分好冷热数据的存储目录,设置好对应的存储策略,后续使用相应的程序在对应分类目录下写数据,自动继承父目录的存储策略。在较新版的Hadoop发布版本中增加了数据迁移工具。此工具的重要用途在于它会扫描HDFS上的文件,判断文件是否满足其内部设置的存储策略;如果不满足,就会重新迁移数据到目标存储类型节点上。使用方式如下:

        $ hdfs mover -help
        Usage:hdfsmover[-p<files/dirs>|-f<local file>]//hdfsmover数据迁移命令
          -pl <files/dirs>  // 需要被迁移的HDFS文件/目录的路径
          -f <local file>  // 需要被迁移的HDFS文件/目录对应的本地文件系统路径

其中一个参数针对HDFS的文件目录,另一个参数针对本地的文件。

HDFS异构存储功能的出现绝对是解决冷热数据存储问题的一把利器,希望通过本节内容的阐述能给大家带来全新的认识。

1.3 小结

本章介绍了HDFS的内存存储和异构存储,前者是后者的一种存储方式。HDFS的内存存储方式的难点在于它如何对数据进行持久化,其中还涉及LRU算法。而HDFS异构存储的重点在于理解此套机制的整体实现原理,通过对DataNode上的数据目录以及HDFS上的目标路径打标签的方式来进行数据的存储选择。