In [1]:
import org.archive.archivespark._
import org.archive.archivespark.functions._
import org.archive.archivespark.specific.warc._
In this example, the web archive dataset will be loaded from local WARC / CDX files (created in this recipe). However, any other Data Specification (DataSpec) could be used here too, in order to load your records of different types and from different local or remote sources.
In [2]:
val warcPath = "/data/helgeholzmann-de.warc.gz"
val cdxPath = warcPath + "/*.cdx.gz"
In [3]:
val records = ArchiveSpark.load(WarcSpec.fromFiles(cdxPath, warcPath))
For link extraction, we are not interested in videos, images, stylesheets and any other files except for webpages (mime type text/html), neither are we interested in webpages that were unavailable when they were crawled (status code == 200). Hence, we will filter out those records.
It is important to note that this filtering is done only based on metadata, so up to this point ArchiveSpark does not even touch the actual web archive records, which is the core efficiency feature of ArchiveSpark.
In [4]:
val pages = records.filter(r => r.mime == "text/html" && r.status == 200)
By looking at the first record in our remaining dataset, we can see that this indeed is of type text/html and was online (status 200) at the time of crawl:
In [5]:
pages.peekJson
Out[5]:
This is the point when ArchiveSpark actually accesses the full records in order to enrich our metadata records with the desired information. To do so, we define the required Enrichment Functions (Links
, LinkUrls
, LinkTexts
) based on existing ones (Html
, SURT
, HtmlAttribute
, HtmlText
).
Html.all
extracts all hyperlinks / anchors (tag a
) from the pages. This results in a list of multiple values, one for each link. From these we want to extract the link target (attribute href
) of each link. This can be done by changing the dependency of the HtmlAttribute
Enrichment Function using the ofEach
operation (see the docs for more details) Although this again will result in multiple values, it is only one for each link, so we use the single dependency operation of
to apply SURT
on these and convert the URLs into the SURT format. Similarly, we apply HtmlText
on each link to get the anchor text of the link.
In [6]:
val Links = Html.all("a")
val LinkUrls = SURT.of(HtmlAttribute("href").ofEach(Links))
val LinkTexts = HtmlText.ofEach(Links)
To enrich the filtered records in our dataset with the link information, we call enrich
for every Enrichment Function that we explicitely want to have in the dataset. As we are not interested in the raw a
tags, we do not enrich it with Links
here.
In [7]:
val pagesWithLinks = pages.enrich(LinkUrls).enrich(LinkTexts)
A look at the first record shows what we get:
In [8]:
pagesWithLinks.peekJson
Out[8]:
If we want to save our derived corpus with the link information in this JSON format as shown above, we could simply call .saveAsJson
. This preserves metadata of each records as well as the the lineage of each value by nexting derived values under their parents. JSON is a very universal format and can be read by many third-party tools to post-process this datset.
By adding a .gz extension to the output path, the data will be automatically compressed with GZip.
In [9]:
pagesWithLinks.saveAsJson("/data/pages-with-links.gz")
Instead of this output, we can also transform the dataset into a temporal edgelist to keep the hyperlink information only, with source URL, timestamp of the capture, destination URL of each link as well as the anchor text if available.
There are two preferred ways to achieve this with ArchiveSpark:
Values
that combines destination URL and text for each link. Then, in a map
operation, can access these values and create our very own output format by adding additional information, like source and timestmap.Values
for each link like before, but this time we include the source and timestamp in this value, so that we only need to flat map the values (flatMapValues
).
In [10]:
val LinkRepresentation = Values("link-dst-text", LinkUrls, LinkTexts).onEach(Links)
println
is used in the following to show the complete record and don't have Jupyter cut if off
In [11]:
println(pagesWithLinks.enrich(LinkRepresentation).peekJson)
Since LinkRepresentation
is originally a single value function that's been applied to all links, we need to convert it to a multi-value pointer by calling .multi
on it. Finally, we concatenate the link properties delimited by a tab (\t
).
In [12]:
val links = pagesWithLinks.enrich(LinkRepresentation).flatMap { record =>
record.valueOrElse(LinkRepresentation.multi, Seq.empty).map { case Seq(dst, text) =>
Seq(record.surtUrl, record.timestamp, dst, text).mkString("\t")
}
}
Print the first 10 lines of this dataset to see what we get:
In [13]:
links.take(10).foreach(println)
Save as text file (GZip compressed)
In [14]:
links.saveAsTextFile("/data/links.gz")
flatMapValues
(ArchiveSpark Operations)An enrichment function to derive values from the CDX metadata, we map from the pointer of root
. This requires two additional modules to be imported, i.e., pointers
and cdx
:
In [15]:
import org.archive.archivespark.model.pointers._
import org.archive.archivespark.sparkling.cdx._
In [16]:
val SurtURL = FieldPointer.root[WarcRecord, CdxRecord].map("surtUrl") { cdx: CdxRecord => cdx.surtUrl}
val Timestamp = FieldPointer.root[WarcRecord, CdxRecord].map("timestamp") { cdx: CdxRecord => cdx.timestamp}
val LinkRepresentation = Values("src-timestamp-dst-text", SurtURL, Timestamp, LinkUrls, LinkTexts).onEach(Links)
println
is used in the following to show the complete record and don't have Jupyter cut if off
In [17]:
println(pagesWithLinks.enrich(SurtURL).enrich(Timestamp).enrich(LinkRepresentation).peekJson)
We concatenate the link properties delimited by a tab (\t
) values before saving them as text:
In [18]:
val links = pagesWithLinks.enrich(SurtURL).enrich(Timestamp).flatMapValues(LinkRepresentation).map(_.mkString("\t"))
In [19]:
links.take(10).foreach(println)
In [20]:
links.saveAsTextFile("/data/links1.gz")
In [21]:
val srcDst = pages.enrich(LinkUrls).flatMap(r => r.valueOrElse(LinkUrls.multi, Seq.empty).map(dst => (r.surtUrl, dst)))
For Spark's GraphX the nodes (URLs) need to be assigned IDs:
In [22]:
val urlIdMap = srcDst.flatMap{case (src, dst) => Iterator(src, dst)}.distinct.zipWithUniqueId.collectAsMap
In [23]:
val ids = sc.broadcast(urlIdMap)
In [24]:
val edges = srcDst.map{case (src, dst) => (ids.value(src), ids.value(dst))}
In [25]:
import org.apache.spark.graphx._
val graph = Graph.fromEdgeTuples(edges, true)
In [26]:
graph.numVertices
Out[26]: