To address options for portability of PERCEIVE Hadoop Ecosystem and apply big data technology for PERCEIVE project, this notebook aims to build an Apache Nifi + Kite sdk + Apache Spark pipeline, and show the potential value and usage of this data pipeline.
This article is consisted of four parts. For the first part, we will briefly review the characteristics of PERCEIVE data and introduce HDFS, Kite sdk and Apache NiFi. Next, we will introduce how to setup the environment and relevant components, and build dependency between each component. The third part will show a couple NiFi dataflow examples to demonstrate the fundamental capabilities of Nifi.The fourth part will address the advantages of Nifi and show why the pipeline can potentially contribute to PERCEIVE project in the future.
To reach proactively identify upcoming cybersecurity threats, PERCEIVE intends to download and analyze data from Mailing Lists, Blogs, Twitter, Knowledge sources etc. From the perspective of big data defined in 3 Vs: Velocity, Volume and Variety, the sources we are collecting right now vary in these 3 dimensions:
Although our data are not same as the data hold by social media company in terms of big data 3V dimensions, the traditional relational database cannot satisfy our mission requirements, such as various data structures, data provenance and semi-streaming data analysis. Therefore, the main purpose of this notebook is to bring Hadoop ecosystem to PERCEIVE, and investigate which big data tools within Hadoop ecosystem can be competent for present and future requirements of PERCEIVE.
Before moving to the next part, here is a brief introduction about HDFS, Kite sdk, Apache NiFi, and how they can contribute to PERCEIVE.
HDFS is a core of Apache Hadoop and provides a distributed file system that is designed to run on large clusters of small computer machines in reliable, fault-tolerant manner. HDFS uses a master/slave architecture where master consists of a single NameNode that manages the file system metadata and one or more slave DataNodes that store the actual data.
As shown in the following graph, a file in a HDFS namespace is split into several blocks and those blocks are stored in a set of DataNodes. The NameNode determines the mapping of blocks to the DataNodes. The DataNodes takes care of read and write operation with the file system. They also take care of block creation, deletion and replication based on instruction given by NameNode.
From a high level, the following characteristics of HDFS are very important for PERCEIVE data:
HDFS provides a shell like any other file system and a list of commands are available to interact with the file system. A few commands will be shown in the end. More details about HDFS can be found here [1].
The Kite SDK is a high-level data API that makes it easy to put data into Hadoop and to work with data once it's loaded. With Kite, it will be much convenient to maintain datasets and relevant metadata through Avro schema. Currently, Kite can work with file formats, including CSV, JSON, Avro, and Parquet. With Kite sdk, you can define how your data is stored, including Hive, HDFS, the local file system, HBase, Amazon S3, and compress data: Snappy (default), Deflate, Bzip2, and Lzo. For fully leverage Kite sdk, the best way is to use Kite Dataset command line interface (CLI),which provides utility commands to perform essential tasks such as creating a schema and datset, importing data from a CSV file, and viewing the results. Basically, Kite sdk stores the data based on the schema either defined by users or automatically inferred by Kite sdk.
To get the first impression about how Kite works, here we will use an example to show the difference of loading data into HDFS without and through Kite CLI:
Directly use hdfs command to load csv data to hdfs:
# directly use hdfs command to load csv data to hdfs
hdfs dfs -put file://home/yueliu/Dcouments/Full%20Disclosure/2002/Email%20Details/Full_Disclosure_Mailing_List_Aug2002.csv /test
# check the result
hdfs dfs -ls /test
Use Kite sdk to load csv file to hdfs:
# use kite to infer a schema from Aug2002.csv. Kite sdk has requirements for the header of csv file and the header cannot be empty or with "."
./kite-dataset csv-schema Aug2002.csv --class emailDetail -o emailDtail.avsc
# use kite to create a dataset in hdfs based on the schema we just created
./kite-dataset create dataset:hdfs://localhost:9000/test/kite/Aug2002 --schema emailDetail.avsc
# use kite to import Aug2002.csv to the dataset we created during the last step
./kite-dataset csv-import file://home/yueliu/Dcouments/Full%20Disclosure/2002/Email%20Details/Aug2002.csv dataset:hdfs://localhost:9000/test/kite/Aug2002
# check the result
hdfs dfs -ls /test/kite
Compared to data ingestion without Kite, the way how kite sdk works seems more complex, since it has to infer a schema at first. However, as shown in the follow, the schema plays an significant role to record metadata about the file names and datatypes for a dataset. After creating a schema and knowing how to store dataset, Kite sdk will import data into different places based on various types of URI, such as HDFS, Hive, AWS S3, etc. Through read the example in [2], user should realize more about the efficiency of using Kite, especially how Kite makes the process of ingesting, converting and publishing to Hive in Parquet format easily.
Example of schema:
{
"type":"record",
"name":"Movie",
"namespace":"org.kitesdk.examples.data",
"fields":[
{"name":"id","type":"int"},
{"name":"title","type":"string"},
{"name":"release_date","type":"string"},
{"name":"imdb_url","type":"string"}
]
}
As the amount of data increases, it will become much difficult to evaluate the quality of and trust in data within a complex ETL and analysis framework. In other words, when data grows to terabytes or more, we may lost the whole map, such as difficult distinguishing between the raw data and the processed data, low ability to track how a dataset is processed during ETL and analysis procedures, etc. As systems are added and data flows everywhere, a sea of scripts all over the place within HDFS would be madness.
Therefore, data provenance is the main reason we want to introduce Apache NiFi into the overall pipeline. NiFi is an easy to use, powerful, and reliable dataflow tool that enables the automation of data flow between systems. The interface can be found in the following graph.
Here the key features of NiFi that are important for PERCEIVE [3]:
Visual Command and control
Desgiend for Extension
We will discuss how to use NiFi and the specific advantages of NiFi later through the example.
This tutorial was tested using the following environment and components:
(Since the installation for the above components can easily be found through google, we will simply put the link here)
If following the above tutorials to install these environment and components, please pay attention for the version used in these tutorials.
"Apache Spark has become wildly popular for processing large quantities of data. One of the key features that Spark provides is the ability to process data in either a batch processing mode or a streaming mode with very little change to your code".[5]
However, in many context, operating on the data as soon as it is available can provides great benefits. Therefore, with the dependency between Apache Nifi and Apache Spark, the Spark application can directly perform streaming analysis and process data from Nifi.
In [5] article, it introduces how to incorporate the Apache Nifi Receiver with the Spark application in Java. If your Spark application is built on Java and maintained by Maven, it is pretty easy to follow this article and add the receiver to the application's POM (Project Object Model).
However, since most analysis jobs in PERCEIVE are using Python language, it is still a problem to built dependency through Python.
Although Kite is designed to work within Kite CLI, Nifi provides the following kite processors that realize part of Kite sdk functions, making most jobs out of Kite CLI:
| Type | Version | Tags | Description |
|---|---|---|---|
| ConvertAvroSchema | 1.3.0 | convert,kite,avro | Converts recrods from one Avro schema to another |
| ConvertCSVToAvro | 1.3.0 | csv,kite,avro | Converts CSV files to Avro according to an Avro Schema |
| ConvertJSONToAvro | 1.3.0 | json,kite,avro | Converts JSON files to Avro according to an Avro Schema |
| InferAvroSchema | 1.3.0 | schema, infer,csv,json,kite,avro | Examines the contents of the incoming FlowFile to infer an Avro schema |
| StoreInKiteDataset | 1.3.0 | hive,hdfs,hadoop,kite,hbase | Stores Avro records in a Kite dataset |
However, some critical jobs have to be done manually within Kite CLI, such as manually creating an Avro schema, updating a new Avro schema for current files, creating a hive table with parquet format, and creating a partition strategy for datasets.
Through a few examples, this section will demonstrate several fundamental capabilities of Nifi, how to work with NiFi and Kite sdk together, and how to configure several core processors of NiFi. These examples will accomplish the following tasks:
The NiFi flow is shown below. At a very high-level, this dataflow will generate all files from local system and store into different month folders within HDFS. To realize dynamically configure the output path, we use NiFi Expression Language to add a custom attribute for each file based on file name. We will discuss the details and how to configure for each processor below.
In my case, this processor reads all files under my input directory. Important configurations for this processor are:
| Configuration | Description | Value in the example |
|---|---|---|
| Input Directory | The input directory from which to pull files | /home/yueliu/Documents/Full Disclosure/2002 |
| Keep source file | Whether to keep the original file after generating | false |
| File Filter | which can use regular expression to filter files | [^\.].* |
In NiFi, each FlowFile has a minimum set of attribute: filename, path, uuid, entryDate, lineageStartDate, and fileSize. The detail explanation of these common attribute can be found here [7]. RouteOnAttribute processor will route FlowFile based on the attributes that it contains. In this case, this processor will route FlowFile to the success relationship if the file name is end with "csv", while route FlowFile to the unmatched relationship if the condition is not qualified.
| Configuration | Description | Value in the example |
|---|---|---|
| Routing Strategy | Specifies how to determine which relationship to use when evaluating the Expression Language | Route To Property name |
| fileFormat | The property I added that is used to evaluate file attribute | ${filename:endsWith('csv')} |
User can add any number of property to generate appropriate FlowFiles.
UpdateAttribute processor is used to add or update any number of user-defined attributes to a FlowFile. It is useful for adding statically configured values, as well as deriving attribute values dynamically by using the Expression Language.
In this example, we use two UpdateAttribute processors to add a custom attribute to represent the month information for txt and csv FlowFile based on their file names. To better understand the NiFi Expression Language used below, one txt file name is '2002_Aug_1.txt', while one csv file name is 'Full_Disclosure_Mainling_List_Aug2002.csv'. More details about NiFi Expression Language can be found here [8].
Add_Month_Attribute_For_CSV processor:
| Configuration | Description | Value in the example |
|---|---|---|
| month | we add this attribute for each FlowFile based on NiFi Expression Language | ${filename:getDelimitedField(5,'_'):substring(0,3)} |
Configuration of the connection between RouteOnAttribute and Add_Month_Attribute_For_CSV: fileFormat relationship should be checked
Add_Month_Attribute_For_TXT processor:
| Configuration | Description | Value in the example |
|---|---|---|
| month | we add this attribute for each FlowFile based on NiFi Expression Language | ${filename:getDelimitedField(2,'_')} |
Configuration of the connection between RouteOnAttribute and Add_Month_Attribute_For_TXT: unmatched relationship should be checked
This processor will write FlowFile to Hadoop Distributed File System (hdfs). Given the custom attribute "month" we have created before for each FlowFile, we can use this attribute as a dynamic path for each FlowFile, thus organizing all FlowFiles within month folders in HDFS.
| Configuration | Description | Value in the example |
|---|---|---|
| Hadoop Configuration Resources | A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here | /home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml |
| Directory | The parent HDFS directory to which files should be written. The directory will be created if it does not exist. | /sample/${month} |
The NiFi flow is shown below. At a very high-level, this dataflow will a XML file from a URL and store into HDFS. In this example, we add a custom attribute to represent the file format to help you understand how attribute works in NiFi. We will discuss the details and how to configure for each processor below.
GetHTTP processor can fetches data from an HTTP URL and write the data to the content of a FlowFile. In this example, we pull a XML file from the link below.
| Configuration | Description | Value in the example |
|---|---|---|
| URL | The URL to pull from | http://capec.mitre.org/data/xml/capec_v2.11.xml |
In this example, we use the Expression Language to retrieve format information from FlowFile name and assign to a custom attribute named "fileFormat".
| Configuration | Description | Value in the example |
|---|---|---|
| fileFormat | we add this attribute for each FlowFile based on NiFi Expression Language | ${filename:substringAfterLast('.')} |
This processor will write FlowFile to Hadoop Distributed File System (hdfs). Given the custom attribute "fileFormat" we have created before for each FlowFile, we can use this attribute as an output path for each FlowFile in HDFS.
| Configuration | Description | Value in the example |
|---|---|---|
| Hadoop Configuration Resources | A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here | /home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml |
| Directory | The parent HDFS directory to which files should be written. The directory will be created if it does not exist. | /sample/${fileFormat} |
The NiFi dataflow is shown below. At a very high-level, this dataflow will generate csv file from local system, use kite sdk to infer Avro schema from incoming csv files and store these files to HDFS. We will discuss the details and how to configure for each processor below.
Through regular expression, this processor reads only csv files under my input directory. Important configurations for this processor are:
| Configuration | Description | Value in the example |
|---|---|---|
| Input Directory | The input directory from which to pull files | /home/yueliu/Documents/Full Disclosure/2002 |
| Keep source file | Whether to keep the original file after generating | false |
| File Filter | which can use regular expression to filter files | .+\.csv |
In this example, this processor will infer an Avro schema for each incoming FlowFile and output the schema as a new custom attribute. Since in our example, there is an incomplete header for each csv file, we will skip the first line and use our defined header.
"If we did have a header in every file, we can easily set Get CSV Header definition from Data to “true” and let NiFi determine the schema (make sure you skip a line on the next processor if you are doing that, otherwise you will have the headers ingested as well). CSV Header Skip Count is important if you have a custom header and you want to ignore whatever headers you previously have in your CSVs". [10]
| Configuration | Description | Value in the example |
|---|---|---|
| Schema Output Destination | Control if Avro schema is written as a new flowfile attribute "inferred.avro.schema" or written in the flowfile content. | flowfile-attribute |
| Input Content Type | csv, json, or use mim.type value | csv |
| CSV Header Definition | The header for incoming csv file | id,year,month,k,title,author,dateStamp |
| Get CSV Header Definition From data | Whether to get CSV header directly by reading the first line of incoming csv file | false |
| CSV Header Line Skip County | The number of lines to skip | 1 |
Here we capture the flowfile generated by the previous processor using the ${inferred.avro.schema} parameter. Since in our example, there is an incomplete header for each csv file, we will skip the header.
| Configuration | Description | Value in the example |
|---|---|---|
| Record schema | Outgoing Avro schema for each recrod created from a CSV row | ${inferred.avro.schema} |
| Use CSV header line | whether to use the first line as a header | false |
| Lines to skip | Number of lines to skip before reading header or data | 1 |
In this example, we will use this processor to retrieve all FlowFiles that are failed, unmatched or incompatible with InferAvro processor, ConvertCSVToAvro and StoreInKiteDataset processor.
| Configuration | Description | Value in the example |
|---|---|---|
| Directory | The directory to which files should be written | /home/yueliu/Documents/failure |
For the connections between the PutFailureFile processor and the three above processors, we check all relationships except "success", so that NiFi will route all FlowFiles that are qualified with the relationships we check to PutFailureFile processor.
This processor is much like how importing the data using Kite CLI. StoreInKiteDataset processor will store the incoming FlowFile based on how user define the data URI. However, before loading the data into the hive table, we have to use Kite CLI to create a dataset container. More details can be found in this link [12].
| Configuration | Description | Value in the example |
|---|---|---|
| Hadoop configuration files | A file or comma separated list of files which contains the Hadoop file system configuration. We usually put the path of core-site.xml file here | /home/yueliu/Documents/hadoop-2.8.1/etc/hadoop/core-site.xml |
| Target dataset URI | URI that identifies a Kite dataset where and how a data will be stored | dataset:hive://user/hive/warehouse/mailingList |
Through answering the following questions, we will address the advantages of NiFi and show why the pipeline can potentially contribute to PERCEIVE project in the future:
Is the raw data still there?
It depends on the configuration of processors. For example, user can define the contribution of GetFile processor and set "false" for "Keep source file" property, thus keeping the raw data within the input directory.
Is the processed data now sitting in HDFS processed?
Yes. Through the above screenshot of HDFS, all processed data are sitting within month subfolders in HDFS.
How to add multiple files?
If all files are stored in one directory, it only needs one Get processor to retrieve these files. If files are stored separately or stored in different systems, it might need multiple Get processors to generate files and then process.
If we do have multiple files, are we running in parallel?
In GetFile processor, there is a property named "batch size" that let user to define the maximum number of files to pull in each iteration. For multiple Get processors, they will be running in parallel if user run them at the same time. For example, in the first picture, GetFile processor and GetHTTP processor are running at the same time if we run them together.
What do I need to do to run NiFi?
NiFi only needs Java environment to run.
What is going on after I click to execute the NiFi Pipeline?
From the high level, FlowFile will be passed through multiple processors, routed to different flows based on the configuration of processors, analyzed/processed, and then sent to local systems or other big data products.
NiFi best selling point is data provenance. What can we gain by using that versus just doing everything with NiFi? What are the pros? What are the cons? Is it worthwhile the overhead of a new project?
"NiFi keeps a very granular level of detail about each piece of data that it ingests. As the data is processed through the system and is transformed, routed, split, aggregated, and distributed to other endpoints, this information is all stored within NiFi’s Provenance Repository"". Through "Data Provenance" from the Global Menu, there is a table to list the provenance events that track each FlowFile. Also,as shown below, NiFi can track how one specific FlowFile is processed through data flow.
As introduced at the beginning, NiFi is a dataflow tool that is compatible with multiple big data products and cloud systems. Therefore, through using NiFi, it can easily represent the big picture of how data are processed through complex analysis, process, split, aggregation, interaction, and distribution with other products. Therefore, it is much easy to manage and make a change for the overall processing. In other words, the big advantage of NiFi is that the analysis procedures are visualized and thus can be used for multiple times. Therefore, NiFi is more suitable for the projects that have streaming data. If the set of processing procedures within NiFi is used for only one time, it may waste more time on building NiFi processors
[1] Hadoop Introduction https://www.tutorialspoint.com/hadoop/hadoop_introduction.htm
[2] How-to: Ingest Data Quickly Using the Kite CLI http://blog.cloudera.com/blog/2014/12/how-to-ingest-data-quickly-using-the-kite-cli/
[3] High Level Overview of Key NiFi Features https://docs.hortonworks.com/HDPDocuments/HDF1/HDF-1.2/bk_Overview/content/high-level-overview-of-key-nifi-features.html
[4] HDF/NiFi to convert row-formatted text files to columnar Parquet and ORC: https://community.hortonworks.com/articles/70257/hdfnifi-to-convert-row-formatted-text-files-to-col.html
[5] Stream Processing: NiFi and Spark https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark
[6]NiFi data provence: https://blogs.apache.org/nifi/entry/basic_dataflow_design
[7] Nifi common attribute: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_getting-started-with-apache-nifi/content/common-attributes.html
[8] NiFi Expression Language and function: https://docs.hortonworks.com/HDPDocuments/HDF3/HDF-3.0.0/bk_getting-started-with-apache-nifi/content/ExpressionLanguage.html
[9] Converting CSV To Avro with Apache NiFi: https://community.hortonworks.com/articles/28341/converting-csv-to-avro-with-apache-nifi.html
[10] Stream data into HIVE like a Boss using NiFi HiveStreaming - Olympics 1896-2008: https://community.hortonworks.com/articles/52856/stream-data-into-hive-like-a-king-using-nifi.html
[11] Using NiFi to ingest and transform RSS feeds to HDFS using an external config file: https://community.hortonworks.com/articles/48816/nifi-to-ingest-and-transform-rss-feeds-to-hdfs-usi.html
[12] Using the Kite Command Line Interface to Create a Dataset http://kitesdk.org/docs/1.1.0/Using-the-Kite-CLI-to-Create-a-Dataset.html