Gather all raw data files associated with a particular NDT5 UUID

Steps:

  • Fetch the row from BQ and extract:
    • GCS file name. Extract from the name:
      • Server shortname
      • Pusher datestring and timestamp
      • Check if the date is too old
    • From the row array, constrct tcpinfo tsg annotations
    • Fetch the raw file
  • mkdir UUID
  • gsutil ls pcap path using pusher datestring and shortname
  • Select and fetch likely file (slightly earlier than ndt5 pusher timestamp)
  • system tar tvf and search UUID to find full path and file name
  • Extract file, mv to target dir and rmdir extras
  • tcptrace to extract xplots
  • Merge tcpinfo tsg annotations w/ b2a_tsg

In [1]:
import os
import subprocess
import math
import pandas as pd
import BQhelper as bq
from google.cloud import storage

DataDir='/data/'  # Typically mounted at $HOME/data/
TarDir=DataDir+'TARFILES/'
os.makedirs(TarDir, exist_ok=True)

UnitTest = False
Verbose = True

In [2]:
# TEST CASES.    These are manually curated points of interst
UUID1="ndt-r5mmc_1572346210_00000000000142D4"
# 2019-12-12 72.208.51.92 AZ
# SELECT * FROM `mlab-sandbox.mm_unified_testing.ndt_unified_ndt5_downloads` WHERE a.UUID = 'ndt-r5mmc_1572346210_00000000000142D4'
# gs://archive-measurement-lab/ndt/ndt5/2019/12/12/20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz
# gs://archive-measurement-lab/ndt/tcpinfo/2019/12/12/20191212T172147.966587Z-tcpinfo-mlab2-lax06-ndt.tgz
# 

# Deep dive into 80.0.65.231, after the trasnsition
# See row 901 in 2020-04-25 Dissect 80.0.65.231across transition (adjust limits)
# 2019-12-28 07:37:54.054766 UTC
UUID2="ndt-m92kv_1573028939_0000000000056D7A"

if UnitTest:
    Tuuid = UUID2

In [3]:
# Helper functions to parse a pusher data file names

if UnitTest:
    pf="gs://archive-measurement-lab/ndt/ndt5/2019/12/12/20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz"

def parseFullName(pf):
    """Parse the full name from an archived pusher tar file name
    e.g. ndt/ndt5/2019/12/12/20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz
    """   
    return('/'.join(pf.split('/')[3:]))

def parseDir(pf):
    """Parse the gcs directory from an archived pusher tar file name
    e.g. ndt/ndt5/2019/12/12/
    """
    return('/'.join(pf.split('/')[3:-1]))

def parseFile(pf):
    """Parse the gcs file from an archived pusher tar file name
    e.g. 20191212T160135.233060Z-ndt5-mlab2-lax06-ndt.tgz
    """
    return(pf.split('/')[-1])

def parseDate(pf):
    """Parse and reformat the archive date from a pusher tar file name
    e.g. 2019-12-12 (NB: this is NOT the test date)
    """
    return('-'.join(pf.split('/')[-4:-1]))

def parseTime(pf):
    """Parse the timestamp from an archived pusher tar file name
    e.g. 20191212T160135
    """
    return(pf.split('/')[-1].split('.')[0])
    
def parseShortName(pf):
    """Parse the server shortname from an archived pusher tar file name
    e.g. mlab2-lax06
    """
    return('-'.join(pf.split('-')[-3:-1]))

if UnitTest:
    print(pf)
    print('FullName', parseFullName(pf))
    print('Dir', parseDir(pf))
    print('File', parseFile(pf))
    print('Date', parseDate(pf))
    print('Time', parseTime(pf))
    print('ShortNane', parseShortName(pf))

In [4]:
# Find all the Details for any given UUID
# Including other UUIDs assocated with the same test
# and all gcs paths.

# TODO: how to generalize

QueryDetails="""
SELECT
  ParseInfo.TaskFileName,
  result.C2S.UUID AS c2sUUID,
  result.S2C.UUID AS s2cUUID,
  result.Upload.UUID AS uploadUUID,
  result.Download.UUID AS downloadUUID,
  result.Control.UUID AS controlUUID,
FROM `mlab-oti.ndt.ndt5`
WHERE
  result.C2S.UUID = '{UUID}' OR
  result.S2C.UUID = '{UUID}' OR
  result.Upload.UUID = '{UUID}' OR
  result.Download.UUID = '{UUID}' OR
  result.Control.UUID = '{UUID}'
"""

def getTestDetails(uuid):
    """Get details on where to find raw data associated with a test UUID
    Returns:
        the location of the primary raw data in gcs; and
        a list of connection UUIDs associated wiht the test.
    """
    q=bq.run_query(QueryDetails, UUID=uuid)
    if (len(q['TaskFileName'])) > 1:
            print("Warning: getTestDetails: (%d)"%len(q['TaskFileName']))
    ret=[]
    for c in ['c2sUUID', 's2cUUID', 'uploadUUID','downloadUUID', 'controlUUID']:
        if (q[c][0] is not None
            and q[c][0] not in ret):
                ret.append(q[c][0])
        
    return(q['TaskFileName'][0], ret)

# Minimal test:
if UnitTest:
    TtaskFile, T_UUIDs = getTestDetails(Tuuid)
    print (TtaskFile)
    # gs://archive-measurement-lab/ndt/ndt5/2019/12/28/20191228T080102.072817Z-ndt5-mlab1-lhr04-ndt.tgz
    print (T_UUIDs) # ['ndt-m92kv_1573028939_0000000000056D7A', 'ndt-m92kv_1573028939_0000000000056D78']

In [5]:
def listArchive(path):
    """List the files in a given GCS path
    """
    client = storage.Client(project='mlab-sandbox')
    bucket = client.get_bucket('archive-measurement-lab')
    r = bucket.list_blobs(prefix=path)
    return [i.name for i in r]

# Minimal test
if UnitTest:
    T_archive = '/'.join(TtaskFile.split('/')[3:-1])
    print (T_archive) # ndt/ndt5/2019/12/28
    blobs = listArchive(T_archive)
    print(len(blobs)) # 4947

In [6]:
# Fetch an GCS Blob from the MLab archive
# TODO: Add support for non-production archives

def fetchBlob(blob, dest, bucket='archive-measurement-lab'):
    """Fetch a blob of data gsc
    NB: blob excludes the bucket: e.g. 'gs://archive-measurement-lab/'
    """
    try:
        os.stat(dest)
        return True
    except:
        pass
    client = storage.Client(project='mlab-sandbox')
    bucket = client.get_bucket(bucket)
    blob = bucket.get_blob(blob)
    with open(dest, 'wb') as file_obj:
        client.download_blob_to_file(blob, file_obj)

# Minimal test
if UnitTest:
    t = '/'.join(TtaskFile.split('/')[3:])
    print(t)
    # ndt/ndt5/2019/12/28/20191228T080102.072817Z-ndt5-mlab1-lhr04-ndt.tgz
    Ttarfile = TarDir+'/test.ndt5.tgz'
    fetchBlob(t, Ttarfile)
    # Check for test.ndt5.tgz in TarDir

In [7]:
def searchTar(uuids, tarFile):
    """Search for uuids in a tarfile, return file name if found"""
    proc=subprocess.Popen(["tar", "tf", tarFile], stdout=subprocess.PIPE, text=True)
    try:
        outs, errs = proc.communicate(timeout=10)
    except subprocess.TimeoutExpired:
        proc.kill()
        outs, errs = proc.communicate()
    res = []
    for l in outs.split():
        for uuid in uuids:
            if uuid in l:
                if Verbose:
                    print ("Found: "+l)
                res.append(l)
    return res

def TarExtractFile(tarFile, targetDir, files):
    """Extract files from tarFile into targetDir
    """
    args=["tar", "--extract", "-f", tarFile,  "--directory="+TarDir, *files]
    if UnitTest:
        print (args)
    subprocess.run(args).check_returncode()
    for file in files:
        os.replace(TarDir+file, targetDir+file.split('/')[-1])
    # TODO: rmdir 2019/.... (empty)
    return [f.split('/')[-1] for f in files]

if UnitTest:
    os.makedirs(DataDir+"test/", exist_ok=True)
    print (T_UUIDs[0])
    # ndt-m92kv_1573028939_0000000000056D7A
    print (searchTar(['xxx'], TarDir+'test.ndt5.tgz'))
    # None
    T_ndt5_json=searchTar(T_UUIDs, TarDir+'test.ndt5.tgz')
    # Found: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.json
    print (T_ndt5_json)
    # 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.json
    print (TarExtractFile(TarDir+'test.ndt5.tgz', DataDir+"test/", T_ndt5_json))
    # ndt-m92kv_1573028939_0000000000056D78.json

In [12]:
# New platform only


def getPusherObject(TargetDir, UUIDs, shortname, timeHint, gcsDir):
    """
    TargetDir - Results
    UUIDs - List of UUIDs that we want
    timeHint - test time stamp, to facilitate locating the data
    gcsDir - Where to find the archive, w/o the bucket name
    """
    blobs=listArchive(gcsDir)
    blobs = [x for x in blobs if shortname in x]
    if Verbose:
        print ("Available server-day pusher files:", len(blobs))
    otime = ""
    ix = -1
    for i, b in enumerate(blobs):
        btime = parseTime(b)
        if btime < otime:
            print ("Warning: non-monitonic timestamps %s>%s"%(otime, time))
        if btime <= timeHint:
            ix = i
        otime = btime
    filelist=[]
    for offset in [0, -1, 1, 2]:
        if ix+offset >= 0 and ix+offset < len(blobs):
            blob=blobs[ix+offset]
            if Verbose:
                print ('Trying:', ix+offset, blob)
            tarFile=TarDir+blob.split('/')[-1]
            fetchBlob(blob, tarFile)
            fl=searchTar(UUIDs, tarFile)
            if len(fl) > 0:
                if Verbose:
                    print('Extacting:',' '.join(fl))
                TarExtractFile(tarFile, TargetDir, fl)
                filelist.extend(fl)
    return filelist


if UnitTest:
    T_pcapDir=parseDir(TtaskFile).replace('ndt5','pcap')
    T_shortname=parseShortName(TtaskFile)  # mlab1-lhr04
    T_testTime=parseTime(TtaskFile)
    print(T_pcapDir, T_testTime)
    
    print(getPusherObject(DataDir+"test/", T_UUIDs, T_shortname, T_testTime, T_pcapDir));

In [9]:
# prototype main


def GatherRawNDT5(uuid):
    """With just (any) UUID, gather all test data
    """
    NDTtaskFile, UUIDs = getTestDetails(uuid)

    if NDTtaskFile == '':
        print ("Failed to find Archive")
    if Verbose:
        print('UUIDs: '+' '.join(UUIDs))
        print("gcsNDTtaskFile: "+NDTtaskFile)


    # Make the target dir after the lookup to avoid debris from failures
    # Stabilize UUIDs
    # UUIDs=sorted(UUIDs)
    if UUIDs[0] != uuid:
        uuid = UUIDs[0]
        print ('Updating base UUID (probably the control connection):',uuid)
    TargetDir = DataDir+uuid+'/'
    os.makedirs(TargetDir, exist_ok=True)
    
    # Guess pcap dir and search for likely tar files
    shortname=parseShortName(NDTtaskFile)  # mlab1-lhr04
    timeHint=parseTime(NDTtaskFile)
    gcsDir=parseDir(NDTtaskFile)
    print (shortname, timeHint, gcsDir)
    ret={}
    dataSets=['ndt5','ndt7','pcap','tcpinfo','traceeroute','web100']
    for d in dataSets:
        print ("Checking for %s archives"%d)
        r=getPusherObject(TargetDir, UUIDs, shortname, timeHint, gcsDir.replace('ndt5',d))
        if r is not None and len(r) > 0:
            print ('Found',len(r), d,'archives')
            ret[d]=r
    return(ret)

if UnitTest:
    r=GatherRawNDT5(UUID2)
    print (r)

In [13]:
r=GatherRawNDT5(UUID2)


UUIDs: ndt-m92kv_1573028939_0000000000056D7A ndt-m92kv_1573028939_0000000000056D78
gcsNDTtaskFile: gs://archive-measurement-lab/ndt/ndt5/2019/12/28/20191228T080102.072817Z-ndt5-mlab1-lhr04-ndt.tgz
mlab1-lhr04 20191228T080102 ndt/ndt5/2019/12/28
Checking for ndt5 archives
Available server-day pusher files: 12
Trying: 3 ndt/ndt5/2019/12/28/20191228T080102.072817Z-ndt5-mlab1-lhr04-ndt.tgz
Found: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.json
Extacting: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.json
Trying: 2 ndt/ndt5/2019/12/28/20191228T060100.223473Z-ndt5-mlab1-lhr04-ndt.tgz
Trying: 4 ndt/ndt5/2019/12/28/20191228T100102.654064Z-ndt5-mlab1-lhr04-ndt.tgz
Trying: 5 ndt/ndt5/2019/12/28/20191228T120130.853831Z-ndt5-mlab1-lhr04-ndt.tgz
Found 1 ndt5 archives
Checking for ndt7 archives
Available server-day pusher files: 0
Checking for pcap archives
Available server-day pusher files: 99
Trying: 18 ndt/pcap/2019/12/28/20191228T074635.485037Z-pcap-mlab1-lhr04-ndt.tgz
Found: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.pcap.gz
Found: 2019/12/28/ndt-m92kv_1573028939_0000000000056D7A.pcap.gz
Extacting: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.pcap.gz 2019/12/28/ndt-m92kv_1573028939_0000000000056D7A.pcap.gz
Trying: 17 ndt/pcap/2019/12/28/20191228T070009.058070Z-pcap-mlab1-lhr04-ndt.tgz
Trying: 19 ndt/pcap/2019/12/28/20191228T080522.979468Z-pcap-mlab1-lhr04-ndt.tgz
Trying: 20 ndt/pcap/2019/12/28/20191228T082536.204542Z-pcap-mlab1-lhr04-ndt.tgz
Found 2 pcap archives
Checking for tcpinfo archives
Available server-day pusher files: 17
Trying: 3 ndt/tcpinfo/2019/12/28/20191228T080020.563296Z-tcpinfo-mlab1-lhr04-ndt.tgz
Found: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.00000.jsonl.zst
Found: 2019/12/28/ndt-m92kv_1573028939_0000000000056D7A.00000.jsonl.zst
Extacting: 2019/12/28/ndt-m92kv_1573028939_0000000000056D78.00000.jsonl.zst 2019/12/28/ndt-m92kv_1573028939_0000000000056D7A.00000.jsonl.zst
Trying: 2 ndt/tcpinfo/2019/12/28/20191228T060017.042393Z-tcpinfo-mlab1-lhr04-ndt.tgz
Trying: 4 ndt/tcpinfo/2019/12/28/20191228T100023.800221Z-tcpinfo-mlab1-lhr04-ndt.tgz
Trying: 5 ndt/tcpinfo/2019/12/28/20191228T112721.525295Z-tcpinfo-mlab1-lhr04-ndt.tgz
Found 2 tcpinfo archives
Checking for traceeroute archives
Available server-day pusher files: 0
Checking for web100 archives
Available server-day pusher files: 0

In [14]:
print (r)


{'ndt5': ['2019/12/28/ndt-m92kv_1573028939_0000000000056D78.json'], 'pcap': ['2019/12/28/ndt-m92kv_1573028939_0000000000056D78.pcap.gz', '2019/12/28/ndt-m92kv_1573028939_0000000000056D7A.pcap.gz'], 'tcpinfo': ['2019/12/28/ndt-m92kv_1573028939_0000000000056D78.00000.jsonl.zst', '2019/12/28/ndt-m92kv_1573028939_0000000000056D7A.00000.jsonl.zst']}