Analyzing term / entity distributions in a dataset


In [1]:
import org.archive.archivespark._
import org.archive.archivespark.functions._
import org.archive.archivespark.specific.warc._

Loading the dataset

In this example, the web archive dataset will be loaded from local WARC / CDX files (created in this recipe). However, any other Data Specification (DataSpec) could be used here too, in order to load your records of different types and from different local or remote sources.


In [2]:
val warcPath = "/data/helgeholzmann-de.warc.gz"
val cdxPath = warcPath + "/*.cdx.gz"

In [3]:
val records = ArchiveSpark.load(WarcSpec.fromFiles(cdxPath, warcPath))

Filtering records

Embeds are specific to webpages, so we can filter out videos, images, stylesheets and any other files except for webpages (mime type text/html), as well as webpages that were unavailable when they were crawled either (status code == 200).

It is important to note that this filtering is done only based on metadata, so up to this point ArchiveSpark does not even touch the actual web archive records, which is the core efficiency feature of ArchiveSpark.


In [4]:
val pages = records.filter(r => r.mime == "text/html" && r.status == 200)

By looking at the first record in our remaining dataset, we can see that this indeed is of type text/html and was online (status 200) at the time of crawl:


In [5]:
pages.peekJson


Out[5]:
{
    "record" : {
        "redirectUrl" : "-",
        "timestamp" : "20190528152652",
        "digest" : "sha1:HCHVDRUSN7WDGNZFJES2Y4KZADQ6KINN",
        "originalUrl" : "https://www.helgeholzmann.de/",
        "surtUrl" : "de,helgeholzmann)/",
        "mime" : "text/html",
        "compressedSize" : 2087,
        "meta" : "-",
        "status" : 200
    }
}

URL deduplication

As we want to consider the number of URLs mentioning a term as the term's frequency, we first need to make sure, every URL is only included once in the dataset. Therefore, we simply decide for the earliest snapshot of each URL. This should be cached to avoid recompution every time a record is accessed:


In [6]:
val earliest = pages.distinctValue(_.surtUrl) {(a, b) => if (a.time.isBefore(b.time)) a else b}.cache

In [7]:
earliest.count


Out[7]:
3

Counting terms

To extract the terms of a webpage, we have to keep in mind that a webpage consists of HTML code. Hence, using the StringContent enrichment function would enrich our dataset with this HTML code. To parse the HTML and only keep the text, we provide the HtmlText enrichment function. This can be used to extract the text of a single tag, e.g., HtmlText.of(Html.first("title")) to get the title text of a page. By default though, HtmlText extracts the entire text of the page.

For more details on the Enrichment Functions provided with ArchiveSpark and their usage, please read the docs.


In [8]:
earliest.enrich(HtmlText).peekJson


Out[8]:
{
    "record" : {
        "redirectUrl" : "-",
        "timestamp" : "20190528153841",
        "digest" : "sha1:M3YAC4HWZEWOUBTUWVZG6TLTOCBAFX7G",
        "originalUrl" : "https://www.helgeholzmann.de/contact",
        "surtUrl" : "de,helgeholzmann)/contact",
        "mime" : "text/html",
        "compressedSize" : 2001,
        "meta" : "-",
        "status" : 200
    },
    "payload" : {
        "string" : {
            "html" : {
                "html" : {
                    "text" : "Helge Holzmann - @helgeho Home Research Publications Private Projects Contact Helge Holzmann I am a researcher and PhD candidate at the L3S Research Center in Hannover, Germany. My main research interest is on Web archives and related topics, such as big data processing, graph analysis...

Turn text into terms

As a very simple normalization, we convert the text into lowercase, before we split it up into single distinct terms:


In [9]:
val Terms = LowerCase.of(HtmlText).mapMulti("terms")(text => text.split("\\W+").distinct)

In [10]:
earliest.enrich(Terms).peekJson


Out[10]:
{
    "record" : {
        "redirectUrl" : "-",
        "timestamp" : "20190528153841",
        "digest" : "sha1:M3YAC4HWZEWOUBTUWVZG6TLTOCBAFX7G",
        "originalUrl" : "https://www.helgeholzmann.de/contact",
        "surtUrl" : "de,helgeholzmann)/contact",
        "mime" : "text/html",
        "compressedSize" : 2001,
        "meta" : "-",
        "status" : 200
    },
    "payload" : {
        "string" : {
            "html" : {
                "html" : {
                    "text" : {
                        "lowercase" : {
                            "terms" : [
                                "helge",
                                "holzmann",
                                "helgeho",
                                "home",
                                "rese...

Compute term frequencies (number of records / URLs)

We can now use .flatMapValues to get a plain list of the terms included in the dataset. To get rid of short stopwords like articles, we only keep those terms with a minimum length of 4 characters.

For more details on available ArchiveSpark operations, please read the docs.


In [11]:
val terms = earliest.flatMapValues(Terms).filter(_.length >= 4)

In [12]:
terms.take(10).foreach(println)


helge
holzmann
helgeho
home
research
publications
private
projects
contact
researcher

As we made sure before that every URL is included only once in the dataset and each term is included only once per record, we can simply count the terms, using Spark's .countByValue. Finally, we sort the terms by count in descending order (negative count) and save them as CSV (comma-separated values):


In [13]:
val counts = terms.countByValue.toSeq.sortBy{case (term, count) => -count}

In [14]:
counts.take(10).foreach(println)


(researcher,3)
(reserved,3)
(github,3)
(topics,3)
(holzmann,3)
(germany,3)
(private,3)
(arxiv,3)
(research,3)
(email,3)

For saving the CSV file we can use IOUtil.writeLines, which is included with the Sparkling library.


In [15]:
import org.archive.archivespark.sparkling.io._

In [16]:
IOUtil.writeLines("term_counts.csv", counts.map{case (term, count) => term + "," + count})


Out[16]:
246

The term_counts.csv that is created in the same folder as this notebook will contain all terms. Now this CSV file can be loaded in a plotting tool of your choice, for example to plot a historgram of the term distribution.

Counting named entities

Similar to the term frequencies as shown above we can also count the occurrences of named entities in the dataset.

An Enrichment Function to extract named entities is provided with ArchiveSpark, named Entities. It uses Stanford's CoreNLP Named Entity Extractor. In order to use it you need to add edu.stanford.nlp:stanford-corenlp:3.5.1 with corresponding models to your classpath.

Another Enrichment Function that uses Yahoo's Fast Entity Linker (FEL) for more accurate Entity Linking with ArchiveSpark can be found here: FEL4ArchiveSpark

For more details on the Enrichment Functions and their use, please read the docs.


In [17]:
earliest.enrich(Entities).peekJson


Out[17]:
{
    "record" : {
        "redirectUrl" : "-",
        "timestamp" : "20190528153841",
        "digest" : "sha1:M3YAC4HWZEWOUBTUWVZG6TLTOCBAFX7G",
        "originalUrl" : "https://www.helgeholzmann.de/contact",
        "surtUrl" : "de,helgeholzmann)/contact",
        "mime" : "text/html",
        "compressedSize" : 2001,
        "meta" : "-",
        "status" : 200
    },
    "payload" : {
        "string" : {
            "html" : {
                "html" : {
                    "entities" : {
                        "persons" : [
                            "Helge",
                            "Holzmann"
                        ],
                        "organizations" : [
                        ],
                        "locations" : [
                            "...

In [18]:
val locations = earliest.flatMapValues(Entities.child[Seq[String]]("locations"))

In [19]:
locations.take(10).foreach(println)


Hannover
Germany
MA
Pisa
USA
London
Greece
Quebec
Valetta
Boston

Please note:

Named Entity Extraction is a pretty expensive operation, depending on the size of the dataset, the following instruction may run for hours or even days.


In [20]:
val counts = locations.countByValue.toSeq.sortBy{case (term, count) => -count}

In [21]:
counts.take(10).foreach(println)


(Hannover,3)
(Germany,3)
(MA,1)
(Pisa,1)
(USA,1)
(London,1)
(Greece,1)
(Quebec,1)
(Valetta,1)
(Boston,1)

In [22]:
IOUtil.writeLines("location_counts.csv", counts.map{case (term, count) => term + "," + count})


Out[22]:
28

Caveats

The use of .countByValue automatically fetches / collects the counts for all available values to the local driver, which may lead to memory issues if the dataset is too big. Instead, the same operation can be implemented by a distributed .reduceByKey operation, with a filter to ensure that only values with high counts are fetched in order to avoid memory overruns. This way, also the sorting can be achieved in a distributed fashion:


In [23]:
val termCounts = terms.map(term => (term, 1L)).reduceByKey(_ + _).filter{case (term, count) => count > 2}

In [24]:
val fetchedTermCounts = termCounts.sortBy{case (term, count) => -count}.collect

In [25]:
fetchedTermCounts.take(10).foreach(println)


(helgeho,3)
(main,3)
(interest,3)
(arxiv,3)
(center,3)
(data,3)
(hannover,3)
(home,3)
(archives,3)
(email,3)