Introduction to Hadoop Distributed File System (HDFS)

Linh B. Ngo

Overview

  • 2002: Doug Cutting and Mike Carafella started a project to build an open-source search engine called Nutch. A component of this project was a web crawler that can crawl and index the Internet.
  • 2003: Google released a research paper on its in-house data storage system called Google File System (GFS).
  • 2004: Google released another research paper on the programming approach to process data stored on GFS, called MapReduce.
  • 2005: Cutting and Carafelle rebuilt the underlying file management system and processing framework of Nutch based on the architectural design of Google's GFS and MapReduce.
  • 2006: The adaptations of Google's GFS and MapReduce were converted into a single open source project called Hadoop, which was sponsored by Yahoo and led by Doug Cutting.
  • 2007: Yahoo maintains a 1000-node production cluster.
  • 2008: Hadoop becomes the platform of Yahoo's web index. Hadoop wins record for world fastest system to sort one terabyte of data (209 seconds using a 910-node cluster). Hadoop becomes a top-level open source project of Apache Foundation. First Hadoop commercial distributor led by a former Google employee, Cloudera, is founded.
  • 2009: Hadoop sorts one terabyte of data in 62 seconds and one petabyte of data in 16.25 hours using a 3800-node cluster. Second Hadoop commercial distributor, MapR, is formed.
  • 2011: Yahoo spins off its own Hadoop comnmercial distributor, Hortonworks.
  • 2012: Apache Hadoop 1.0 is released.

Corresponding Component Names

  • Google File Systems (GFS): Hadoop Distributed File System (HDFS)
  • Google MapReduce: Hadoop MapReduce
  • Google BigTable: Apache HBase

Corresponding Component Names

  • GFS Master: NameNode
  • Chunkserver: DataNode
  • Chunks: Blocks

Apache Hadoop Project

  • Hadoop Distributed File System
  • YARN (Yet Another Resource Negotiator)
  • Hadoop MapReduce

Hadoop Distributed File Systems

Design Assumptions and Goals

  • Hardware failure is the norm rather than the exception
  • Streaming data access
    • Not for general purpose applications
    • For batch processing rather than interactive use
    • For high throughput of data access rather than low latency of data access
  • Large data sets (terabytes in size)
  • Simple coherency model (write once read many)
  • Moving computation is cheaper than moving data
  • Portability across heterogeneous hardware and software platform

HDFS Architecture

Master/Slave Architecture

  • Master: NameNode
  • Workers: Data Node

NameNode

  • manages the file system namespace
  • regulates access to files by clients
  • executes file system namespace operations
  • determines the mapping of blocks to DataNodes
  • keeps track of DataNodes status

DataNode:

  • one per node in the cluster
  • manages storage attached to the node
  • serves read and write requests clients
  • sends regular heart-beat messageas back to the NameNode for verification

Files and Directories:

  • HDFS file system namespace is exposed to users (think VFS layer, but not quite the same)
  • Operations include opening, closing, and renaming files and directories.
  • HDFS allows user data to be stored in files

Files and Directories:

  • Internally, a file is split into one or more blocks
  • Blocks have equal and fixed size and are stored in a set of DataNodes
  • NameNode determines the mapping of blocks to DataNodes.
  • DataNodes perform block creation, deletion, and replication upon instruction from the NameNode.

Data Replication:

  • "Failure is the norm and not the exception"
  • Blocks are replicated for fault tolerance.

Data Replication:

  • The block size and replication factor are configurable per file.
  • NameNode makes all decisions regarding replication of blocks.

Replica Placement

  • Placement of replicas is critical to HDFS reliability and performance.
  • The purpose of a rack-aware replica placement policy is to improve data reliability, availability, and network bandwidth utilization.

Replica Placement: Hardware Settings

  • Cluster of computers that commonly spread across many racks.
  • Racks are connected via network switches.
  • In most cases, network bandwidth between machines in the same rack is greater than network bandwidth between machines in different racks.

Placement Policy: Simple and non-optimal

  • place replicas on unique racks.
  • Advantage:
    • prevents losing data when an entire rack fails
    • allows use of bandwidth from multiple racks when reading data
    • evenly distributes replicas in the cluster
  • Disadvantage:
    • increases the cost of writes because a write needs to transfer blocks to multiple racks.

Placement Policy: HDFS default policy

  • Default replication factor: 3
    • First replica on one node in the local rack,
    • Second replica on a different node in the local rack,
    • Third replica on a different node in a different rack.
  • Reduces the inter-rack write traffic
  • Does not loose data reliablity and availability guarantees
  • Does not evenly distribute across the racks:
    • One third of replicas are on one node,
    • two thirds of replicas are on one rack, and
    • the other third are evenly distributed across the remaining racks.

Placement Policy: HDFS default policy

  • Replication factor greater than 3:

    • Random determination of placement
    • Rack limit: $\frac{replicas - 1}{racks + 2}$
  • DataNodes are not allowed to have multiple replicas of the same block

  • Maximum number of replicas created is the total number of DataNodes

Demo

Run the following (lngo/goram)

$ ssh -p 22 lngo@ms0225.utah.cloudlab.us
$ sudo su
$ /opt/hadoop-3.1.1/bin/hdfs dfs -ls /
$ /opt/hadoop-3.1.1/bin/hdfs fsck /test/t8.shakespeare.txt -blocks -files -racks
$ /opt/hadoop-3.1.1/bin/hdfs fsck /test2/t8.shakespeare.txt -blocks -files -racks
  • To view available commands

    $ /opt/hadoop-3.1.1/bin/hdfs
  • On each data node, the actual data blocks are located at

    /tmp/hadoop-root/dfs/data/current

HDFS Writes

  • Staging
  • Pipelining

Staging

  • HDFS client caches the file data into a local buffer
  • Application writes are transparently redirected to this local buffer
  • When the local file accumulates data worth over one chunk size, the client contacts the NameNode
  • NameNode
    • inserts the file name into the file system hierarchy and allocates a data block for it.
    • responds to the client request with the identity of the DataNode and the destination data block.
  • Client flushes the chunk of data from the local buffer to the specified DataNode.
  • When a file is closed:
    • the remaining un-flushed data in the local buffer is transferred to the DataNode.
    • The client then tells the NameNode that the file is closed.
    • NameNode commits the file creation operation into a persistent store.
    • If the NameNode dies before the file is closed, the file is lost.

Pipelining

  • Replication factor of three.
  • Pipelining the writing and replicating process of individual chunks of each block
  • List of DataNodes comes from NameNode
  • The client then flushes the data chunk to the first DataNode.
  • For each DataNode:
    • receiving the data in small portions,
    • writes each portion to its local repository, and
    • transfers that portion to the second DataNode in the list.

HDFS Reads

  • Parallel reads on blocks
  • Applications take advantage of parallel I/O via MapReduce programming approach

Hadoop 1.0: JobTracker and TaskTracker

First resource manager/job controller on HDFS

  • Specifically tied to the MapReduce programming model
  • Data-aware (via NameNode interactions)
  • Task-aware (via TaskTracker interactions)
  • Rely on heartbeat messages, which also contain slot availability information from TaskTrackers:
    • System health
    • Execution availability
    • Job progress
  • Manage job execution process
  • Rerun tasks lost due to node failures – Speculative execution
  • Single point of failure

Execution Progress

  • MapReduce job is submitted to the JobTracker
  • JobTracker determines data location via NameNode
  • JobTracker locates TaskTracker nodes
    • With available slots -Nearest to the data location
  • JobTracker sends job (tasks of job: Map, Reduce, Shuffle) to the selected TaskTrackers
  • TaskTrackers spawn separate JVMs to run tasks (Map, Reduce, Shuffle) for the job
  • TaskTrackers maintain detailed log for progress, output, and exit codes
  • Heartbeat messages are sent frequently from TaskTrackers to JobTracker
    • Alive
    • Slot availability
  • If no heartbeat is received within a preconfigured duration, the tasks assigned to that TaskTracker are resubmitted to another TaskTracker
  • JobTracker updates status report when job is completed

Hadoop 2.0: YARN (Yet Another Resource Negotiator)

Conceptual Design/Differences

  • Pure scheduler: limited to arbitrating available resources in the system
  • Pluggable scheduler: multiple scheduling algorithms
  • Job management is handle by ApplicationMaster
  • Resource Model:
    • Resource-name (hostname, rackname)
    • Memory (MB)
    • CPU (cores)
  • ResourceRequest:
    • resource-name, priority, resource-requirement, number-of-containers
    • Container: the resource allocation

Step 1: A client program submits the application

 

Step 2: Resource Manager negotiates a container to start the Application Master and then launches the Application Master

Step 3: The Application Master, on boot-up, registers with the Resource Manager. This allows the client to query the Resource Manager for details to directly interact with its Application Master

Step 4: Application Master negotiates resource containers via the resource-request protocol

Step 5: After successful allocations, the Application Master launches the container by providing the container launch specification to the Node Manager. This includes command line to launch, environment variables, local resources (jars, shared-objects, ...), and security-related tokens.

Step 6: The application code executing within the container then provides logging info back to its ApplicationMaster via an application-specific protocol.

Step 7: During the application execution, the client that submitted the program communicates directly with the Application Master to get status, progress, updates via an application-specific protocol.

Step 8: Upon completion, the Application Master deregisters with the ResourceManager and shuts down, allowing its own container to be repurposed.

Comparing File System Models