In [1]:
import org.archive.archivespark._
import org.archive.archivespark.functions._
import org.archive.archivespark.specific.warc._
In this example, the web archive dataset will be loaded from local WARC / CDX files (created in this recipe). However, any other Data Specification (DataSpec) could be used here too, in order to load your records of different types and from different local or remote sources.
In [2]:
val warcPath = "/data/helgeholzmann-de.warc.gz"
val cdxPath = warcPath + "/*.cdx.gz"
In [3]:
val records = ArchiveSpark.load(WarcSpec.fromFiles(cdxPath, warcPath))
Embeds are specific to webpages, so we can filter out videos, images, stylesheets and any other files except for webpages (mime type text/html), as well as webpages that were unavailable when they were crawled either (status code == 200).
It is important to note that this filtering is done only based on metadata, so up to this point ArchiveSpark does not even touch the actual web archive records, which is the core efficiency feature of ArchiveSpark.
In [4]:
val pages = records.filter(r => r.mime == "text/html" && r.status == 200)
By looking at the first record in our remaining dataset, we can see that this indeed is of type text/html and was online (status 200) at the time of crawl:
In [5]:
pages.peekJson
Out[5]:
As we want to consider the number of URLs mentioning a term as the term's frequency, we first need to make sure, every URL is only included once in the dataset. Therefore, we simply decide for the earliest snapshot of each URL. This should be cached to avoid recompution every time a record is accessed:
In [6]:
val earliest = pages.distinctValue(_.surtUrl) {(a, b) => if (a.time.isBefore(b.time)) a else b}.cache
In [7]:
earliest.count
Out[7]:
To extract the terms of a webpage, we have to keep in mind that a webpage consists of HTML code. Hence, using the StringContent
enrichment function would enrich our dataset with this HTML code. To parse the HTML and only keep the text, we provide the HtmlText
enrichment function. This can be used to extract the text of a single tag, e.g., HtmlText.of(Html.first("title"))
to get the title text of a page. By default though, HtmlText
extracts the entire text of the page.
For more details on the Enrichment Functions provided with ArchiveSpark and their usage, please read the docs.
In [8]:
earliest.enrich(HtmlText).peekJson
Out[8]:
In [9]:
val Terms = LowerCase.of(HtmlText).mapMulti("terms")(text => text.split("\\W+").distinct)
In [10]:
earliest.enrich(Terms).peekJson
Out[10]:
We can now use .flatMapValues
to get a plain list of the terms included in the dataset. To get rid of short stopwords like articles, we only keep those terms with a minimum length of 4 characters.
For more details on available ArchiveSpark operations, please read the docs.
In [11]:
val terms = earliest.flatMapValues(Terms).filter(_.length >= 4)
In [12]:
terms.take(10).foreach(println)
As we made sure before that every URL is included only once in the dataset and each term is included only once per record, we can simply count the terms, using Spark's .countByValue
. Finally, we sort the terms by count in descending order (negative count) and save them as CSV (comma-separated values):
In [13]:
val counts = terms.countByValue.toSeq.sortBy{case (term, count) => -count}
In [14]:
counts.take(10).foreach(println)
For saving the CSV file we can use IOUtil.writeLines
, which is included with the Sparkling library.
In [15]:
import org.archive.archivespark.sparkling.io._
In [16]:
IOUtil.writeLines("term_counts.csv", counts.map{case (term, count) => term + "," + count})
Out[16]:
The term_counts.csv
that is created in the same folder as this notebook will contain all terms. Now this CSV file can be loaded in a plotting tool of your choice, for example to plot a historgram of the term distribution.
Similar to the term frequencies as shown above we can also count the occurrences of named entities in the dataset.
An Enrichment Function to extract named entities is provided with ArchiveSpark, named Entities
. It uses Stanford's CoreNLP Named Entity Extractor. In order to use it you need to add edu.stanford.nlp:stanford-corenlp:3.5.1
with corresponding models to your classpath.
Another Enrichment Function that uses Yahoo's Fast Entity Linker (FEL) for more accurate Entity Linking with ArchiveSpark can be found here: FEL4ArchiveSpark
For more details on the Enrichment Functions and their use, please read the docs.
In [17]:
earliest.enrich(Entities).peekJson
Out[17]:
In [18]:
val locations = earliest.flatMapValues(Entities.child[Seq[String]]("locations"))
In [19]:
locations.take(10).foreach(println)
Please note:
Named Entity Extraction is a pretty expensive operation, depending on the size of the dataset, the following instruction may run for hours or even days.
In [20]:
val counts = locations.countByValue.toSeq.sortBy{case (term, count) => -count}
In [21]:
counts.take(10).foreach(println)
In [22]:
IOUtil.writeLines("location_counts.csv", counts.map{case (term, count) => term + "," + count})
Out[22]:
The use of .countByValue
automatically fetches / collects the counts for all available values to the local driver, which may lead to memory issues if the dataset is too big. Instead, the same operation can be implemented by a distributed .reduceByKey
operation, with a filter to ensure that only values with high counts are fetched in order to avoid memory overruns. This way, also the sorting can be achieved in a distributed fashion:
In [23]:
val termCounts = terms.map(term => (term, 1L)).reduceByKey(_ + _).filter{case (term, count) => count > 2}
In [24]:
val fetchedTermCounts = termCounts.sortBy{case (term, count) => -count}.collect
In [25]:
fetchedTermCounts.take(10).foreach(println)