Introduction

To address options for portability of PERCEIVE Hadoop Ecosystem and apply big data technology for PERCEIVE project, this notebook aims to build an Apache Nifi + Kite sdk + Apache Spark pipeline, and show the potential value and usage of this data pipeline.

This article is consisted of four parts. For the first part, we will briefly review the characteristics of PERCEIVE data and introduce HDFS, Kite sdk and Apache NiFi. Next, we will introduce how to setup the environment and relevant components, and build dependency between each component. The third part will show a couple NiFi dataflow examples to demonstrate the fundamental capabilities of Nifi.The fourth part will address the advantages of Nifi and show why the pipeline can potentially contribute to PERCEIVE project in the future.

1. Overview

To reach proactively identify upcoming cybersecurity threats, PERCEIVE intends to download and analyze data from Mailing Lists, Blogs, Twitter, Knowledge sources etc. From the perspective of big data defined in 3 Vs: Velocity, Volume and Variety, the sources we are collecting right now vary in these 3 dimensions:

  • Velocity:
    • CVEs have over 90k entries as of today and are released daily.
    • CAPEC and CWE are only made available through releases, just like software. In general, new versions are released every three or four months.
    • Mailing Lists vary a lot since it is up to who is posting and the activity.
  • Variety:
    • CVE is a very simple xml file, so that it could be easily represented by a CSV table.
    • CAPEC and CWE are very rich in structure in their XML. It is very hard to represent as a single CSV if not impossible.
    • Mailing list is a HTML file, but we also now have parsers that transform it in various things.

Although our data are not same as the data hold by social media company in terms of big data 3V dimensions, the traditional relational database cannot satisfy our mission requirements, such as various data structures, data provenance and semi-streaming data analysis. Therefore, the main purpose of this notebook is to bring Hadoop ecosystem to PERCEIVE, and investigate which big data tools within Hadoop ecosystem can be competent for present and future requirements of PERCEIVE.

Before moving to the next part, here is a brief introduction about HDFS, Kite sdk, Apache NiFi, and how they can contribute to PERCEIVE.

a. Hadoop Distributed File System (HDFS)

HDFS is a core of Apache Hadoop and provides a distributed file system that is designed to run on large clusters of small computer machines in reliable, fault-tolerant manner. HDFS uses a master/slave architecture where master consists of a single NameNode that manages the file system metadata and one or more slave DataNodes that store the actual data.

As shown in the following graph, a file in a HDFS namespace is split into several blocks and those blocks are stored in a set of DataNodes. The NameNode determines the mapping of blocks to the DataNodes. The DataNodes takes care of read and write operation with the file system. They also take care of block creation, deletion and replication based on instruction given by NameNode.

From a high level, the following characteristics of HDFS are very important for PERCEIVE data:

  • Various file format:
    • HDFS is a file system. In other words, ideally, HDFS supports any file format. All PERCEIVE data can be stored in HDFS without any format conversion.
  • Resilient
    • Data stored in HDFS are safe. The file system will continue to function even if a node fails. Hadoop accomplishes this by duplicating data across nodes.
  • High compatibility with other components of Hadoop ecosystem
    • HDFS and YARN are the most central components of Apache Hadoop. All other Hadoop ecosystem components are built on HDFS to realize diverse functions, such as data warehouse, ETL, analysis, etc. For PERCEIVE, HDFS is the essential cornerstone to fully leverage the power of Hadoop big data ecosystem.

HDFS provides a shell like any other file system and a list of commands are available to interact with the file system. A few commands will be shown in the end. More details about HDFS can be found here [1].

b. Kite sdk:

The Kite SDK is a high-level data API that makes it easy to put data into Hadoop and to work with data once it's loaded. With Kite, it will be much convenient to maintain datasets and relevant metadata through Avro schema. Currently, Kite can work with file formats, including CSV, JSON, Avro, and Parquet. With Kite sdk, you can define how your data is stored, including Hive, HDFS, the local file system, HBase, Amazon S3, and compress data: Snappy (default), Deflate, Bzip2, and Lzo. For fully leverage Kite sdk, the best way is to use Kite Dataset command line interface (CLI),which provides utility commands to perform essential tasks such as creating a schema and datset, importing data from a CSV file, and viewing the results. Basically, Kite sdk stores the data based on the schema either defined by users or automatically inferred by Kite sdk.

To get the first impression about how Kite works, here we will use an example to show the difference of loading data into HDFS without and through Kite CLI:

Directly use hdfs command to load csv data to hdfs:

# directly use hdfs command to load csv data to hdfs
hdfs dfs -put file://home/yueliu/Dcouments/Full%20Disclosure/2002/Email%20Details/Full_Disclosure_Mailing_List_Aug2002.csv /test

# check the result
hdfs dfs -ls /test

Use Kite sdk to load csv file to hdfs:

# use kite to infer a schema from Aug2002.csv. Kite sdk has requirements for the header of csv file and the header cannot be empty or with "."
./kite-dataset csv-schema Aug2002.csv --class emailDetail -o emailDtail.avsc

# use kite to create a dataset in hdfs based on the schema we just created
./kite-dataset create dataset:hdfs://localhost:9000/test/kite/Aug2002 --schema emailDetail.avsc

# use kite to import Aug2002.csv to the dataset we created during the last step
./kite-dataset csv-import file://home/yueliu/Dcouments/Full%20Disclosure/2002/Email%20Details/Aug2002.csv dataset:hdfs://localhost:9000/test/kite/Aug2002

# check the result
hdfs dfs -ls /test/kite

Compared to data ingestion without Kite, the way how kite sdk works seems more complex, since it has to infer a schema at first. However, as shown in the follow, the schema plays an significant role to record metadata about the file names and datatypes for a dataset. After creating a schema and knowing how to store dataset, Kite sdk will import data into different places based on various types of URI, such as HDFS, Hive, AWS S3, etc. Through read the example in [2], user should realize more about the efficiency of using Kite, especially how Kite makes the process of ingesting, converting and publishing to Hive in Parquet format easily.

Example of schema:

{
  "type":"record",
  "name":"Movie",
  "namespace":"org.kitesdk.examples.data",
  "fields":[
    {"name":"id","type":"int"},
    {"name":"title","type":"string"},
    {"name":"release_date","type":"string"},
    {"name":"imdb_url","type":"string"}
  ]
}

c. Apache NiFi:

As the amount of data increases, it will become much difficult to evaluate the quality of and trust in data within a complex ETL and analysis framework. In other words, when data grows to terabytes or more, we may lost the whole map, such as difficult distinguishing between the raw data and the processed data, low ability to track how a dataset is processed during ETL and analysis procedures, etc. As systems are added and data flows everywhere, a sea of scripts all over the place within HDFS would be madness.

Therefore, data provenance is the main reason we want to introduce Apache NiFi into the overall pipeline. NiFi is an easy to use, powerful, and reliable dataflow tool that enables the automation of data flow between systems. The interface can be found in the following graph.

Here the key features of NiFi that are important for PERCEIVE [3]:

  • Data Provenance
    • NiFi automatically records, indexes, and makes available provenance data as objects flow through the system even across fan-in, fan-out, transformations, and more. This information becomes extremely critical in supporting compliance, troubleshooting, optimization, and other scenarios. We will discuss the details in the end of this article.
  • Visual Command and control

    • Dataflows can become quite complex. Being able to visualize those flows and express them visually can help greatly to reduce that complexity and to identify areas that need to be simplified. NiFi enables not only the visual establishment of dataflows but it does so in real-time. Rather than being design and deploy it is much more like molding clay. If you make a change to the dataflow that change immediately takes effect. Changes are fine-grained and isolated to the affected components. You don't need to stop an entire flow or set of flows just to make some specific modification.
  • Desgiend for Extension

    • NiFi is at its core built for extension and as such it is a platform on which dataflow processes can execute and interact in a predictable and repeatable manner. Specifically, besides the hundreds of easy-to-use processors, NiFi also supports custom processor that can run user-defined Python script.

We will discuss how to use NiFi and the specific advantages of NiFi later through the example.

2. Setting up the environment

2.1 Installation

This tutorial was tested using the following environment and components:

(Since the installation for the above components can easily be found through google, we will simply put the link here)

If following the above tutorials to install these environment and components, please pay attention for the version used in these tutorials.

2.2 Building the dependency

a. Dependency between Nifi and Spark

"Apache Spark has become wildly popular for processing large quantities of data. One of the key features that Spark provides is the ability to process data in either a batch processing mode or a streaming mode with very little change to your code".[5]

However, in many context, operating on the data as soon as it is available can provides great benefits. Therefore, with the dependency between Apache Nifi and Apache Spark, the Spark application can directly perform streaming analysis and process data from Nifi.

In [5] article, it introduces how to incorporate the Apache Nifi Receiver with the Spark application in Java. If your Spark application is built on Java and maintained by Maven, it is pretty easy to follow this article and add the receiver to the application's POM (Project Object Model).

However, since most analysis jobs in PERCEIVE are using Python language, it is still a problem to built dependency through Python.

b. Working with Kite sdk within Apache Nifi

Although Kite is designed to work within Kite CLI, Nifi provides the following kite processors that realize part of Kite sdk functions, making most jobs out of Kite CLI:

Type Version Tags Description
ConvertAvroSchema 1.3.0 convert,kite,avro Converts recrods from one Avro schema to another
ConvertCSVToAvro 1.3.0 csv,kite,avro Converts CSV files to Avro according to an Avro Schema
ConvertJSONToAvro 1.3.0 json,kite,avro Converts JSON files to Avro according to an Avro Schema
InferAvroSchema 1.3.0 schema, infer,csv,json,kite,avro Examines the contents of the incoming FlowFile to infer an Avro schema
StoreInKiteDataset 1.3.0 hive,hdfs,hadoop,kite,hbase Stores Avro records in a Kite dataset

However, some critical jobs have to be done manually within Kite CLI, such as manually creating an Avro schema, updating a new Avro schema for current files, creating a hive table with parquet format, and creating a partition strategy for datasets.

3. Examples of Apache NiFi

Through a few examples, this section will demonstrate several fundamental capabilities of Nifi, how to work with NiFi and Kite sdk together, and how to configure several core processors of NiFi. These examples will accomplish the following tasks:

  • Load files from local and web URL
  • Filter files based on file format
  • Add custom attributes using NiFi Expression Language
  • Split files based on month
  • Store files into different folders within HDFS based on file attribute
  • Work with Kite sdk processors within NiFi

3.1 Load files from local to HDFS

The NiFi flow is shown below. At a very high-level, this dataflow will generate all files from local system and store into different month folders within HDFS. To realize dynamically configure the output path, we use NiFi Expression Language to add a custom attribute for each file based on file name. We will discuss the details and how to configure for each processor below.

Step 1. GetFile processor

In my case, this processor reads all files under my input directory. Important configurations for this processor are:

Configuration Description Value in the example
Input Directory The input directory from which to pull files /home/yueliu/Documents/Full Disclosure/2002
Keep source file Whether to keep the original file after generating false
File Filter which can use regular expression to filter files [^\.].*

Step 2. RouteOnAttribute processor

In NiFi, each FlowFile has a minimum set of attribute: filename, path, uuid, entryDate, lineageStartDate, and fileSize. The detail explanation of these common attribute can be found here [7]. RouteOnAttribute processor will route FlowFile based on the attributes that it contains. In this case, this processor will route FlowFile to the success relationship if the file name is end with "csv", while route FlowFile to the unmatched relationship if the condition is not qualified.

Configuration Description Value in the example
Routing Strategy Specifies how to determine which relationship to use when evaluating the Expression Language Route To Property name
fileFormat The property I added that is used to evaluate file attribute ${filename:endsWith('csv')}

User can add any number of property to generate appropriate FlowFiles.

Step 3. UpdateAttribute processor

UpdateAttribute processor is used to add or update any number of user-defined attributes to a FlowFile. It is useful for adding statically configured values, as well as deriving attribute values dynamically by using the Expression Language.

In this example, we use two UpdateAttribute processors to add a custom attribute to represent the month information for txt and csv FlowFile based on their file names. To better understand the NiFi Expression Language used below, one txt file name is '2002_Aug_1.txt', while one csv file name is 'Full_Disclosure_Mainling_List_Aug2002.csv'. More details about NiFi Expression Language can be found here [8].

Add_Month_Attribute_For_CSV processor:

Configuration Description Value in the example
month we add this attribute for each FlowFile based on NiFi Expression Language ${filename:getDelimitedField(5,'_'):substring(0,3)}

Configuration of the connection between RouteOnAttribute and Add_Month_Attribute_For_CSV: fileFormat relationship should be checked

Add_Month_Attribute_For_TXT processor:

Configuration Description Value in the example
month we add this attribute for each FlowFile based on NiFi Expression Language ${filename:getDelimitedField(2,'_')}

Configuration of the connection between RouteOnAttribute and Add_Month_Attribute_For_TXT: unmatched relationship should be checked

Step 4. PutHDFS processor:

This processor will write FlowFile to Hadoop Distributed File System (hdfs). Given the custom attribute "month" we have created before for each FlowFile, we can use this attribute as a dynamic path for each FlowFile, thus organizing all FlowFiles within month folders in HDFS.

Configuration Description Value in the example
Hadoop Configuration Resources A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here /home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml
Directory The parent HDFS directory to which files should be written. The directory will be created if it does not exist. /sample/${month}

Step 5. Check within HDFS

Use the following code in terminal to check the result of the above NiFi dataflow:

hadoop fs -ls /sample/

3.2 Load xml file from URL to HDFS

The NiFi flow is shown below. At a very high-level, this dataflow will a XML file from a URL and store into HDFS. In this example, we add a custom attribute to represent the file format to help you understand how attribute works in NiFi. We will discuss the details and how to configure for each processor below.

Step 1. GetHTTP processor

GetHTTP processor can fetches data from an HTTP URL and write the data to the content of a FlowFile. In this example, we pull a XML file from the link below.

Configuration Description Value in the example
URL The URL to pull from http://capec.mitre.org/data/xml/capec_v2.11.xml

Step 2. UpdateAttribute processor

In this example, we use the Expression Language to retrieve format information from FlowFile name and assign to a custom attribute named "fileFormat".

Configuration Description Value in the example
fileFormat we add this attribute for each FlowFile based on NiFi Expression Language ${filename:substringAfterLast('.')}

Step 3. PutHDFS processor:

This processor will write FlowFile to Hadoop Distributed File System (hdfs). Given the custom attribute "fileFormat" we have created before for each FlowFile, we can use this attribute as an output path for each FlowFile in HDFS.

Configuration Description Value in the example
Hadoop Configuration Resources A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here /home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml
Directory The parent HDFS directory to which files should be written. The directory will be created if it does not exist. /sample/${fileFormat}

Step 4. Check within HDFS

Use the following code in terminal to check the result of the above NiFi dataflow:

hadoop fs -ls /sample/

Here is the result of the above two NiFi dataflows:

3.3 Work with Kite sdk processors within NiFi

The NiFi dataflow is shown below. At a very high-level, this dataflow will generate csv file from local system, use kite sdk to infer Avro schema from incoming csv files and store these files to HDFS. We will discuss the details and how to configure for each processor below.

Step 1. GetFile processor

Through regular expression, this processor reads only csv files under my input directory. Important configurations for this processor are:

Configuration Description Value in the example
Input Directory The input directory from which to pull files /home/yueliu/Documents/Full Disclosure/2002
Keep source file Whether to keep the original file after generating false
File Filter which can use regular expression to filter files .+\.csv

Step 2. InferAvroSchema processor

In this example, this processor will infer an Avro schema for each incoming FlowFile and output the schema as a new custom attribute. Since in our example, there is an incomplete header for each csv file, we will skip the first line and use our defined header.

"If we did have a header in every file, we can easily set Get CSV Header definition from Data to “true” and let NiFi determine the schema (make sure you skip a line on the next processor if you are doing that, otherwise you will have the headers ingested as well). CSV Header Skip Count is important if you have a custom header and you want to ignore whatever headers you previously have in your CSVs". [10]

Configuration Description Value in the example
Schema Output Destination Control if Avro schema is written as a new flowfile attribute "inferred.avro.schema" or written in the flowfile content. flowfile-attribute
Input Content Type csv, json, or use mim.type value csv
CSV Header Definition The header for incoming csv file id,year,month,k,title,author,dateStamp
Get CSV Header Definition From data Whether to get CSV header directly by reading the first line of incoming csv file false
CSV Header Line Skip County The number of lines to skip 1

Step 3. ConvertCSVToAvro

Here we capture the flowfile generated by the previous processor using the ${inferred.avro.schema} parameter. Since in our example, there is an incomplete header for each csv file, we will skip the header.

Configuration Description Value in the example
Record schema Outgoing Avro schema for each recrod created from a CSV row ${inferred.avro.schema}
Use CSV header line whether to use the first line as a header false
Lines to skip Number of lines to skip before reading header or data 1

Step 4. PutFile processor

In this example, we will use this processor to retrieve all FlowFiles that are failed, unmatched or incompatible with InferAvro processor, ConvertCSVToAvro and StoreInKiteDataset processor.

Configuration Description Value in the example
Directory The directory to which files should be written /home/yueliu/Documents/failure

For the connections between the PutFailureFile processor and the three above processors, we check all relationships except "success", so that NiFi will route all FlowFiles that are qualified with the relationships we check to PutFailureFile processor.

Step 5. StoreInKiteDataset processor

This processor is much like how importing the data using Kite CLI. StoreInKiteDataset processor will store the incoming FlowFile based on how user define the data URI. However, before loading the data into the hive table, we have to use Kite CLI to create a dataset container. More details can be found in this link [12].

Configuration Description Value in the example
Hadoop configuration files A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here /home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml
Target dataset URI URI that identifies a Kite dataset where and how a data will be stored dataset:hive://user/hive/warehouse/mailingList

4. Advantages of Apache NiFi

Through answering the following questions, we will address the advantages of NiFi and show why the pipeline can potentially contribute to PERCEIVE project in the future:

  • Is the raw data still there?

    It depends on the configuration of processors. For example, user can define the contribution of GetFile processor and set "false" for "Keep source file" property, thus keeping the raw data within the input directory.

  • Is the processed data now sitting in HDFS processed?

    Yes. Through the above screenshot of HDFS, all processed data are sitting within month subfolders in HDFS.

  • How to add multiple files?

    If all files are stored in one directory, it only needs one Get processor to retrieve these files. If files are stored separately or stored in different systems, it might need multiple Get processors to generate files and then process.

  • If we do have multiple files, are we running in parallel?

    In GetFile processor, there is a property named "batch size" that let user to define the maximum number of files to pull in each iteration. For multiple Get processors, they will be running in parallel if user run them at the same time. For example, in the first picture, GetFile processor and GetHTTP processor are running at the same time if we run them together.

  • What do I need to do to run NiFi?

    NiFi only needs Java environment to run.

  • What is going on after I click to execute the NiFi Pipeline?

    From the high level, FlowFile will be passed through multiple processors, routed to different flows based on the configuration of processors, analyzed/processed, and then sent to local systems or other big data products.

  • NiFi best selling point is data provenance. What can we gain by using that versus just doing everything with NiFi? What are the pros? What are the cons? Is it worthwhile the overhead of a new project?

    "NiFi keeps a very granular level of detail about each piece of data that it ingests. As the data is processed through the system and is transformed, routed, split, aggregated, and distributed to other endpoints, this information is all stored within NiFi’s Provenance Repository"". Through "Data Provenance" from the Global Menu, there is a table to list the provenance events that track each FlowFile. Also,as shown below, NiFi can track how one specific FlowFile is processed through data flow.

    As introduced at the beginning, NiFi is a dataflow tool that is compatible with multiple big data products and cloud systems. Therefore, through using NiFi, it can easily represent the big picture of how data are processed through complex analysis, process, split, aggregation, interaction, and distribution with other products. Therefore, it is much easy to manage and make a change for the overall processing. In other words, the big advantage of NiFi is that the analysis procedures are visualized and thus can be used for multiple times. Therefore, NiFi is more suitable for the projects that have streaming data. If the set of processing procedures within NiFi is used for only one time, it may waste more time on building NiFi processors

Reference

[1] Hadoop Introduction https://www.tutorialspoint.com/hadoop/hadoop_introduction.htm

[2] How-to: Ingest Data Quickly Using the Kite CLI http://blog.cloudera.com/blog/2014/12/how-to-ingest-data-quickly-using-the-kite-cli/

[3] High Level Overview of Key NiFi Features https://docs.hortonworks.com/HDPDocuments/HDF1/HDF-1.2/bk_Overview/content/high-level-overview-of-key-nifi-features.html

[4] HDF/NiFi to convert row-formatted text files to columnar Parquet and ORC: https://community.hortonworks.com/articles/70257/hdfnifi-to-convert-row-formatted-text-files-to-col.html

[5] Stream Processing: NiFi and Spark https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark

[6]NiFi data provence: https://blogs.apache.org/nifi/entry/basic_dataflow_design

[7] Nifi common attribute: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_getting-started-with-apache-nifi/content/common-attributes.html

[8] NiFi Expression Language and function: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_getting-started-with-apache-nifi/content/ExpressionLanguage.html

[9] Converting CSV To Avro with Apache NiFi: https://community.hortonworks.com/articles/28341/converting-csv-to-avro-with-apache-nifi.html

[10] Stream data into HIVE like a Boss using NiFi HiveStreaming - Olympics 1896-2008: https://community.hortonworks.com/articles/52856/stream-data-into-hive-like-a-king-using-nifi.html

[11] Using NiFi to ingest and transform RSS feeds to HDFS using an external config file: https://community.hortonworks.com/articles/48816/nifi-to-ingest-and-transform-rss-feeds-to-hdfs-usi.html

[12] Using the Kite Command Line Interface to Create a Dataset http://kitesdk.org/docs/1.1.0/Using-the-Kite-CLI-to-Create-a-Dataset.html