Steps:
In [1]:
import os
import subprocess
import math
import pandas as pd
import BQhelper as bq
from google.cloud import storage
DataDir='/data/' # Typically mounted at $HOME/data/
TarDir=DataDir+'TARFILES/'
os.makedirs(TarDir, exist_ok=True)
UnitTest = False
Verbose = True
In [2]:
# TEST CASES. These are manually curated points of interst
UUID1="ndt-r5mmc_1572346210_00000000000142D4"
# 2019-12-12 72.208.51.92 AZ
# SELECT * FROM `mlab-sandbox.mm_unified_testing.ndt_unified_ndt5_downloads` WHERE a.UUID = 'ndt-r5mmc_1572346210_00000000000142D4'
# gs://archive-measurement-lab/ndt/ndt5/2019/12/12/20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz
# gs://archive-measurement-lab/ndt/tcpinfo/2019/12/12/20191212T172147.966587Z-tcpinfo-mlab2-lax06-ndt.tgz
#
# Deep dive into 80.0.65.231, after the trasnsition
# See row 901 in 2020-04-25 Dissect 80.0.65.231across transition (adjust limits)
# 2019-12-28 07:37:54.054766 UTC
UUID2="ndt-m92kv_1573028939_0000000000056D7A"
if UnitTest:
Tuuid = UUID2
In [3]:
# Helper functions to parse a pusher data file names
if UnitTest:
pf="gs://archive-measurement-lab/ndt/ndt5/2019/12/12/20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz"
def parseFullName(pf):
"""Parse the full name from an archived pusher tar file name
e.g. ndt/ndt5/2019/12/12/20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz
"""
return('/'.join(pf.split('/')[3:]))
def parseDir(pf):
"""Parse the gcs directory from an archived pusher tar file name
e.g. ndt/ndt5/2019/12/12/
"""
return('/'.join(pf.split('/')[3:-1]))
def parseFile(pf):
"""Parse the gcs file from an archived pusher tar file name
e.g. 20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz
"""
return(pf.split('/')[-1])
def parseDate(pf):
"""Parse and reformat the archive date from a pusher tar file name
e.g. 2019-12-12 (NB: this is NOT the test date)
"""
return('-'.join(pf.split('/')[-4:-1]))
def parseTime(pf):
"""Parse the timestamp from an archived pusher tar file name
e.g. 20191212T160135
"""
return(pf.split('/')[-1].split('.')[0])
def parseShortName(pf):
"""Parse the server shortname from an archived pusher tar file name
e.g. mlab2-lax06
"""
return('-'.join(pf.split('-')[-3:-1]))
if UnitTest:
print(pf)
print('FullName', parseFullName(pf))
print('Dir', parseDir(pf))
print('File', parseFile(pf))
print('Date', parseDate(pf))
print('Time', parseTime(pf))
print('ShortNane', parseShortName(pf))
In [4]:
# Find all the Details for any given UUID
# Including other UUIDs assocated with the same test
# and all gcs paths.
# TODO: how to generalize
QueryDetails="""
SELECT
ParseInfo.TaskFileName,
result.C2S.UUID AS c2sUUID,
result.S2C.UUID AS s2cUUID,
result.Upload.UUID AS uploadUUID,
result.Download.UUID AS downloadUUID,
result.Control.UUID AS controlUUID,
FROM `mlab-oti.ndt.ndt5`
WHERE
result.C2S.UUID = '{UUID}' OR
result.S2C.UUID = '{UUID}' OR
result.Upload.UUID = '{UUID}' OR
result.Download.UUID = '{UUID}' OR
result.Control.UUID = '{UUID}'
"""
def getTestDetails(uuid):
"""Get details on where to find raw data associated with a test UUID
Returns:
the location of the primary raw data in gcs; and
a list of connection UUIDs associated wiht the test.
"""
q=bq.run_query(QueryDetails, UUID=uuid)
if (len(q['TaskFileName'])) > 1:
print("Warning: getTestDetails: (%d)"%len(q['TaskFileName']))
ret=[]
for c in ['c2sUUID', 's2cUUID', 'uploadUUID','downloadUUID', 'controlUUID']:
if (q[c][0] is not None
and q[c][0] not in ret):
ret.append(q[c][0])
return(q['TaskFileName'][0], ret)
# Minimal test:
if UnitTest:
TtaskFile, T_UUIDs = getTestDetails(Tuuid)
print (TtaskFile)
# gs://archive-measurement-lab/ndt/ndt5/2019/12/28/20191228T080102.072817Z-ndt5-mlab1-lhr04-ndt.tgz
print (T_UUIDs) # ['ndt-m92kv_1573028939_0000000000056D7A', 'ndt-m92kv_1573028939_0000000000056D78']
In [5]:
def listArchive(path):
"""List the files in a given GCS path
"""
client = storage.Client(project='mlab-sandbox')
bucket = client.get_bucket('archive-measurement-lab')
r = bucket.list_blobs(prefix=path)
return [i.name for i in r]
# Minimal test
if UnitTest:
T_archive = '/'.join(TtaskFile.split('/')[3:-1])
print (T_archive) # ndt/ndt5/2019/12/28
blobs = listArchive(T_archive)
print(len(blobs)) # 4947
In [6]:
# Fetch an GCS Blob from the MLab archive
# TODO: Add support for non-production archives
def fetchBlob(blob, dest, bucket='archive-measurement-lab'):
"""Fetch a blob of data gsc
NB: blob excludes the bucket: e.g. 'gs://archive-measurement-lab/'
"""
try:
os.stat(dest)
return True
except:
pass
client = storage.Client(project='mlab-sandbox')
bucket = client.get_bucket(bucket)
blob = bucket.get_blob(blob)
with open(dest, 'wb') as file_obj:
client.download_blob_to_file(blob, file_obj)
# Minimal test
if UnitTest:
t = '/'.join(TtaskFile.split('/')[3:])
print(t)
# ndt/ndt5/2019/12/28/20191228T080102.072817Z-ndt5-mlab1-lhr04-ndt.tgz
Ttarfile = TarDir+'/test.ndt5.tgz'
fetchBlob(t, Ttarfile)
# Check for test.ndt5.tgz in TarDir
In [7]:
def searchTar(uuids, tarFile):
"""Search for uuids in a tarfile, return file name if found"""
proc=subprocess.Popen(["tar", "tf", tarFile], stdout=subprocess.PIPE, text=True)
try:
outs, errs = proc.communicate(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
res = []
for l in outs.split():
for uuid in uuids:
if uuid in l:
if Verbose:
print ("Found: "+l)
res.append(l)
return res
def TarExtractFile(tarFile, targetDir, files):
"""Extract files from tarFile into targetDir
"""
args=["tar", "--extract", "-f", tarFile, "--directory="+TarDir, *files]
if UnitTest:
print (args)
subprocess.run(args).check_returncode()
for file in files:
os.replace(TarDir+file, targetDir+file.split('/')[-1])
# TODO: rmdir 2019/.... (empty)
return [f.split('/')[-1] for f in files]
if UnitTest:
os.makedirs(DataDir+"test/", exist_ok=True)
print (T_UUIDs[0])
# ndt-m92kv_1573028939_0000000000056D7A
print (searchTar(['xxx'], TarDir+'test.ndt5.tgz'))
# None
T_ndt5_json=searchTar(T_UUIDs, TarDir+'test.ndt5.tgz')
# Found: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.json
print (T_ndt5_json)
# 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.json
print (TarExtractFile(TarDir+'test.ndt5.tgz', DataDir+"test/", T_ndt5_json))
# ndt-m92kv_1573028939_0000000000056D78.json
In [12]:
# New platform only
def getPusherObject(TargetDir, UUIDs, shortname, timeHint, gcsDir):
"""
TargetDir - Results
UUIDs - List of UUIDs that we want
timeHint - test time stamp, to facilitate locating the data
gcsDir - Where to find the archive, w/o the bucket name
"""
blobs=listArchive(gcsDir)
blobs = [x for x in blobs if shortname in x]
if Verbose:
print ("Available server-day pusher files:", len(blobs))
otime = ""
ix = -1
for i, b in enumerate(blobs):
btime = parseTime(b)
if btime < otime:
print ("Warning: non-monitonic timestamps %s>%s"%(otime, time))
if btime <= timeHint:
ix = i
otime = btime
filelist=[]
for offset in [0, -1, 1, 2]:
if ix+offset >= 0 and ix+offset < len(blobs):
blob=blobs[ix+offset]
if Verbose:
print ('Trying:', ix+offset, blob)
tarFile=TarDir+blob.split('/')[-1]
fetchBlob(blob, tarFile)
fl=searchTar(UUIDs, tarFile)
if len(fl) > 0:
if Verbose:
print('Extacting:',' '.join(fl))
TarExtractFile(tarFile, TargetDir, fl)
filelist.extend(fl)
return filelist
if UnitTest:
T_pcapDir=parseDir(TtaskFile).replace('ndt5','pcap')
T_shortname=parseShortName(TtaskFile) # mlab1-lhr04
T_testTime=parseTime(TtaskFile)
print(T_pcapDir, T_testTime)
print(getPusherObject(DataDir+"test/", T_UUIDs, T_shortname, T_testTime, T_pcapDir));
In [9]:
# prototype main
def GatherRawNDT5(uuid):
"""With just (any) UUID, gather all test data
"""
NDTtaskFile, UUIDs = getTestDetails(uuid)
if NDTtaskFile == '':
print ("Failed to find Archive")
if Verbose:
print('UUIDs: '+' '.join(UUIDs))
print("gcsNDTtaskFile: "+NDTtaskFile)
# Make the target dir after the lookup to avoid debris from failures
# Stabilize UUIDs
# UUIDs=sorted(UUIDs)
if UUIDs[0] != uuid:
uuid = UUIDs[0]
print ('Updating base UUID (probably the control connection):',uuid)
TargetDir = DataDir+uuid+'/'
os.makedirs(TargetDir, exist_ok=True)
# Guess pcap dir and search for likely tar files
shortname=parseShortName(NDTtaskFile) # mlab1-lhr04
timeHint=parseTime(NDTtaskFile)
gcsDir=parseDir(NDTtaskFile)
print (shortname, timeHint, gcsDir)
ret={}
dataSets=['ndt5','ndt7','pcap','tcpinfo','traceeroute','web100']
for d in dataSets:
print ("Checking for %s archives"%d)
r=getPusherObject(TargetDir, UUIDs, shortname, timeHint, gcsDir.replace('ndt5',d))
if r is not None and len(r) > 0:
print ('Found',len(r), d,'archives')
ret[d]=r
return(ret)
if UnitTest:
r=GatherRawNDT5(UUID2)
print (r)
In [13]:
r=GatherRawNDT5(UUID2)
In [14]:
print (r)