In [3]:
import de.l3s.archivespark.ArchiveSpark
import de.l3s.archivespark.enrich.functions._
import de.l3s.archivespark.nativescala.implicits._
import de.l3s.archivespark.implicits._

In [6]:
val collection = "ArchiveIt-Collection-2950"

In [7]:
val cdxPath = s"/data/hackathon/$collection/cdx/*.cdx.gz"
val warcPath = s"/data/hackathon/$collection/warc"

In [8]:
val rdd = ArchiveSpark.hdfs(cdxPath, warcPath)(sc)

In [10]:
rdd.filter(r => r.status == 200 && r.mime == "text/html").take(1).head.toJsonString


Out[10]:
{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111222043804",
    "digest":"YVOEIYJ45I7QNNFBQTCPKIQAQJIE4B46",
    "originalUrl":"http://english.cntv.cn/program/newsupdate/20110504/109544.shtml",
    "surtUrl":"cn,cntv,english)/program/newsupdate/20110504/109544.shtml",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}

In [11]:
val onlineHtml = rdd.filter(r => r.status == 200 && r.mime == "text/html")

In [71]:
val sample = onlineHtml.sample(false, .001, 83173).cache

In [28]:
sample.map(r => (r.surtUrl, 1)).reduceByKey(_ + _).sortBy{case (url, count) => -count}.take(10)


Out[28]:
Array((com,youtube)/watch?v=mod2jngttoa,3), (com,twitter)/occupylongbeach,3), (com,facebook)/family/xiao/ow/fk,3), (com,twitter)/occupy_paradise,3), (org,occupywallst)/forum/knowledge-of-the-cosmos,3), (org,occupywallst)/article/deliver-your-message-1,3), (org,occupywallst)/article/hate-men-will-pass-and-dictators-die-inspiring-wor,3), (org,occupywallst)/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t,3), (com,twitter)/avunitanon,3), (com,twitter)/occupy_dc,3))

In [29]:
val url = "org,occupywallst)/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t"
sample.filter(r => r.surtUrl == url).toJsonStrings.collect.foreach(println)


{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120131023246",
    "digest":"RDCZJAX4KT4FKFLVATTTNA5K7QT6OYTF",
    "originalUrl":"http://occupywallst.org/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t/",
    "surtUrl":"org,occupywallst)/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}
{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120210023849",
    "digest":"XE73PZTGIUVMM45T64MEUHTGGL6W7HN5",
    "originalUrl":"http://occupywallst.org/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t/",
    "surtUrl":"org,occupywallst)/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}
{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20120215041157",
    "digest":"M4HRHZZGMMA3VF4HCNTLGT5VYZU2GROT",
    "originalUrl":"http://occupywallst.org/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t/",
    "surtUrl":"org,occupywallst)/forum/come-over-to-lobby-democracy-and-stay-for-awhile-t",
    "mime":"text/html",
    "meta":"-",
    "status":200
  }
}

In [30]:
val latest = sample.map(r => (r.surtUrl, r)).reduceByKey((a, b) => Seq(a, b).maxBy(r => r.originalUrl)).map{case (url, r) => r}.cache

In [31]:
val payload = latest.enrich(Payload)

In [32]:
println(payload.take(1).head.toJsonString)


{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111217070544",
    "digest":"SRHAQ26XQRBWJBOBCJJXFSO4YCIAOT5F",
    "originalUrl":"http://twitter.com/intent/session?original_referer=http%3A%2F%2Fpeopleslibrary.wordpress.com%2F2011%2F10%2F08%2Fscanning-books%2F%3Fshare%3Dtwitter&related=wordpressdotcom&return_to=%2Fintent%2Ftweet%3Fvia%3Dwordpressdotcom%26related%3Dwordpressdotcom%26text%3DScanning%2BBooks%26url%3Dhttp%253A%252F%252Fwp.me%252Fp1TGQI-4R&text=Scanning+Books&url=http%3A%2F%2Fwp.me%2Fp1TGQI-4R&via=wordpressdotcom",
    "surtUrl":"com,twitter)/intent/session?original_referer=http://peopleslibrary.wordpress.com/2011/10/08/scanning-books/?share=twitter&related=wordpressdotcom&related=wordpressdotcom&return_to=/intent/tweet?via=wordpressdotcom&text=scanning+books&text=scanning+books&url=http://wp.me/p1tgqi-4r&url=http://wp.me/p1tgqi-4r&via=wordpressdotcom",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "recordHeader":{
    "WARC-Target-URI":"http://twitter.com/intent/session?original_referer=http%3A%2F%2Fpeopleslibrary.wordpress.com%2F2011%2F10%2F08%2Fscanning-books%2F%3Fshare%3Dtwitter&related=wordpressdotcom&return_to=%2Fintent%2Ftweet%3Fvia%3Dwordpressdotcom%26related%3Dwordpressdotcom%26text%3DScanning%2BBooks%26url%3Dhttp%253A%252F%252Fwp.me%252Fp1TGQI-4R&text=Scanning+Books&url=http%3A%2F%2Fwp.me%2Fp1TGQI-4R&via=wordpressdotcom",
    "WARC-Date":"2011-12-17T07:05:44Z",
    "WARC-IP-Address":"199.59.149.198",
    "WARC-Type":"response",
    "Content-Length":"5766",
    "WARC-Payload-Digest":"sha1:SRHAQ26XQRBWJBOBCJJXFSO4YCIAOT5F",
    "Content-Type":"application/http; msgtype=response",
    "absolute-offset":"0",
    "WARC-Record-ID":"<urn:uuid:66db4aea-e9fc-4f05-8c22-37c774f89954>",
    "reader-identifier":"ARCHIVEIT-2950-WEEKLY-MMMYDA-20111217062147-00005-crawling211.us.archive.org-6680.warc.gz"
  },
  "httpHeader":{
    "Vary":"Accept-Encoding",
    "Last-Modified":"Sat, 17 Dec 2011 07:05:44 GMT",
    "X-Action-Name":"show",
    "Server":"tfe",
    "X-Runtime":"0.01448",
    "X-Controller-Class":"Tfw::SessionsController",
    "X-Frame-Options":"SAMEORIGIN",
    "X-XSS-Protection":"1; mode=block",
    "Expires":"Tue, 31 Mar 1981 05:00:00 GMT",
    "Connection":"Keep-Alive",
    "Content-Length":"4626",
    "X-Revision":"DEV",
    "Cache-Control":"no-cache, no-store, must-revalidate, pre-check=0, post-check=0",
    "Content-Type":"text/html; charset=utf-8",
    "Pragma":"no-cache",
    "Date":"Sat, 17 Dec 2011 07:05:44 GMT",
    "X-MID":"b94e51ed2e7560ba60e8e2baefc27aa5dee22e35",
    "Set-Cookie":"_twitter_sess=BAh7CzoMY3NyZl9pZCIlMTFlMjAyNzg0ZTc0NWE4MjEyYjViZjFjNDY3ZmEz%250ANTE6FWluX25ld191c2VyX2Zsb3cwOg5yZXR1cm5fdG8iOWh0dHA6Ly90d2l0%250AdGVyLmNvbS9vY2N1cHlkYW1lc3RyP190d2l0dGVyX25vc2NyaXB0PTE6D2Ny%250AZWF0ZWRfYXRsKwipVTtKNAEiCmZsYXNoSUM6J0FjdGlvbkNvbnRyb2xsZXI6%250AOkZsYXNoOjpGbGFzaEhhc2h7AAY6CkB1c2VkewA6B2lkIiUwMjI1YzI2ZDA2%250AYzM3MDFkMGRjZTllZDY3NjAzMzY1OA%253D%253D--b57d5ab2c95dfce95e6b86e932a952d172b0e767; domain=.twitter.com; path=/; HttpOnly",
    "ETag":"\"64b31a03b692366dd4b3efd0c9f60d35\"",
    "X-Transaction":"3c3b6cda5ed3fd61",
    "Status":"200 OK"
  },
  "payload":"bytes(length: 4626)"
}

In [40]:
payload.mapEnrich(Payload, "WARC-IP-Address") {p => p.WARC-IP-Address}.mapValues[String]("recordHeader.WARC-IP-Address").take(1).head


Out[40]:
Name: Compile Error
Message: <console>:47: error: value WARC is not a member of Array[Byte]
              payload.mapEnrich(Payload, "WARC-IP-Address") {p => p.WARC-IP-Address}.mapValues[String]("recordHeader.WARC-IP-Address").take(1).head
                                                                    ^
StackTrace: 

In [75]:
val payloadWithIp = sample.mapEnrich[Map[String, String], String](Payload, "recordHeader", "ip") {header => header("WARC-IP-Address")}

In [73]:
payloadWithIp.map(r => (r.status, r.originalUrl, r.get[String]("recordHeader.ip").get)).take(2).head


Out[73]:
(200,http://blog.alexanderhiggins.com/2010/06/28/ixtoc-oil-spill-undeground-blowout-caused-oil-leak-cracks-seafloor/?replytocom=16455,199.27.134.227)

In [79]:
val TitleText = HtmlText.of(Html.first("title"))

In [80]:
val payloadWithIpAndTitle = payloadWithIp.enrich(TitleText).filterExists(TitleText)

In [89]:
payloadWithIpAndTitle.mapValues(TitleText).take(10).foreach(println)


Underground Blowout During IXTOC Caused Oil to Leak From Cracks In Seafloor
Breaking: Second Nuclear Reactor In Japan Explodes
SMOKING GUN: PREPARATION FOR WAR ON THE PEOPLE
Dylan Ratigan's Epic Rant on the International Banking Cartel Thieves and Political Corruption
Russell Simmons Joins #OccupyWallStreet Saying He Will Bring Hundreds Of Thousands
Fox Journalists Hit With Batons, Maced By NYPD At #OccupyWallStreet Protests
US Army Preparing To Crush #OccupyWallStreet
The Alyona Show: In case you missed it ? full show 8/12/11
32 Pictures Of Police Brutality From Occupy Wall Street Protests
Watch Live: #OccupyChicago Arrests Now Under Way On Livestream #ows #OccupyWallStreet

In [1]:
payloadWithIpAndTitle.map(r => (r.status, r.originalUrl, r.get[String]("recordHeader.ip").get)).take(2).head


Out[1]:
(200,http://blog.alexanderhiggins.com/2010/06/28/ixtoc-oil-spill-undeground-blowout-caused-oil-leak-cracks-seafloor/?replytocom=16455,199.27.134.227)

In [2]:
payloadWithIpAndTitle.take(1).head.toJsonString


Out[2]:
{
  "record":{
    "redirectUrl":"-",
    "timestamp":"20111222211210",
    "digest":"JB6CU5J5ADHAUKM5KFTXL4UXEHZHGNOU",
    "originalUrl":"http://blog.alexanderhiggins.com/2010/06/28/ixtoc-oil-spill-undeground-blowout-caused-oil-leak-cracks-seafloor/?replytocom=16455",
    "surtUrl":"com,alexanderhiggins,blog)/2010/06/28/ixtoc-oil-spill-undeground-blowout-caused-oil-leak-cracks-seafloor?replytocom=16455",
    "mime":"text/html",
    "meta":"-",
    "status":200
  },
  "recordHeader":{
    "ip":"199.27.134.227"
  },
  "payload":{
    "string":{
      "html":{
        "title":{
          "text":"Underground Blowout During IXTOC Caused Oil to Leak From Cracks In Seafloor"
        }
      }
    }
  }
}

In [8]:
val output = payloadWithIpAndTitle.map(r => ("\"" + r.timestamp + "\"", "\"" + r.status + "\"", "\"" + r.originalUrl + "\"", "\"" + r.get[String]("recordHeader.ip").get + "\"", "\"" + r.get[String]("payload.string.html.title.text").get + "\""))

In [9]:
import org.apache.hadoop.io.compress.GzipCodec
output.saveAsTextFile("sandbox.gz", classOf[GzipCodec])

In [ ]: