Statistics of Stable


In [ ]:
import json, re, pprint, os, datetime
import pandas as pd
import numpy as np
import matplotlib

pltsettings = {
    "figure.figsize" : (5.0, 4.0),
    "pgf.texsystem" : "pdflatex",
    "font.family": "sans",
    "font.serif": [],                   # use latex default serif font
    #"font.sans-serif": ["DejaVu Sans"], # use a specific sans-serif font
}
matplotlib.rcParams.update(pltsettings)

import matplotlib.pyplot as plt
#import seaborn as sns # makes exports ugly

print plt.style.available
plt.style.use ('seaborn-whitegrid')

Pantone540=(0,.2,0.34901)
Pantone301=(0,0.32156,0.57647)
TUMred=(0.898,0.2039,0.0941)
TUMgreen=(0.5686,0.6745,0.4196)
TUMgray=(0.61176,0.61568,0.62352)
TUMlightgray=(0.85098,0.85490,0.85882)

In [ ]:
# get path to files
print datetime.datetime.now()
unitstats_file = os.environ.get('unitstats', '/tmp/unitstats.log')

unitstats_file="/home/becker/async/StratoX.git.rcs/obj/unitstats.log"
if not os.path.exists(unitstats_file):
    print("ERROR: File {} does not exist.".format(unitstats_file))
    exit(-1)
FOLDER=os.path.split(unitstats_file)[0]
print "file=" + str(unitstats_file)

In [ ]:
def read_json(fname):
    """
    reads JSOn and returns (totals, units), tuple of dicts
    """
    units=None
    totals=None
    with open(fname,'r') as f:
        endofunits = False
        for line in f:
            match = re.search(r'^TOTALS', line)        
            if match:                
                endofunits = True
            if not endofunits:
                try:
                    if not units:
                        units = json.loads(line)
                    else:
                        print "error: units appearing multiple times"
                except:
                    pass
            else:
                try:
                    if not totals:
                        totals = json.loads(line)
                    else:
                        print "error: totals appearing multiple times"
                except:
                    pass
    #print units
    # unpack units (list of dicts) => dict
    tmp=units
    units={}
    for u in tmp:
        name=u.keys()[0]
        stats=u[name]        
        units[name]=stats       
    return (totals, units)
            
#######
(totals, units) = read_json(unitstats_file)
print "TOTALS: "
pprint.pprint(totals)

Property Occurence


In [ ]:
#### plot success per VC type; rows are VCs, columns are "success", "fail"
def pretty_vcname(VCname):
    parts = VCname.split("_");
    if parts[0] == "VC":
        parts = parts[1:]    
    return " ".join(parts).title()

counters={pretty_vcname(k) : v['cnt'] for k,v in totals["rules"].iteritems()}
df_cnt = pd.DataFrame(counters,index=['count']).T
#print df2.head()
# sort by occurence
df_cnt.sort_values(by='count', ascending=False, inplace=True)
ax=df_cnt.plot.bar(color=[Pantone301],legend=False);
ax.set_ylabel('count')
#ax.grid()
plt.savefig(FOLDER + os.sep + 'props.pdf', bbox_inches='tight')
plt.show()

num_vc = sum([v for k,v in counters.iteritems() if k != 'Uninitialized'])
print "########################"    
print "# Percentage of each VC"
print "########################"
for k,v in counters.iteritems():
    if k != 'Uninitialized': print k + ": {0:0.1f}".format(100.0*v/num_vc)

Success Ratio of Properties (Normalized)


In [ ]:
# plot success per VC type; rows are VCs, columns are "success", "fail"
VCs={pretty_vcname(k) : {'proven' : 100.0*v['proven'] / v['cnt'], 'unsuccessful': 100.0*(v['cnt']-v['proven'])/v['cnt'] } for k,v in totals["rules"].iteritems()}
df2 = pd.DataFrame(VCs).T
#print df2.head()
df2.sort_values(by='proven', inplace=True, ascending=False)
exclude_columns=['unsuccessful']
ax=df2.ix[:,df2.columns.difference(exclude_columns)].plot.bar(stacked=True,color=[Pantone301],legend=False,figsize=(5,2.5));
ax.set_ylim(50,100)
ax.set_ylabel('success [%]')
ax.set_title('Verification success per VC type')

plt.savefig(FOLDER + os.sep + 'vc_success.pdf', bbox_inches='tight')
plt.show()

Success Ratio by unit


In [ ]:
unit_success = {k : {'success': v['success'], 'cnt' : v['props']} for k,v in units.iteritems()}
df_usucc = pd.DataFrame(unit_success).T
df_usucc.sort_values(by='success', inplace=True, ascending=False)
exclude_columns=['cnt']
ax=df_usucc.ix[df_usucc.cnt>0,df_usucc.columns.difference(exclude_columns)].plot.bar(color=[Pantone301],legend=False);
ax.set_ylabel('success')
ax.grid()
plt.show()

In [ ]:
unit_cnt = {k : { 'proven' : v['proven'], 'unsuccessful' : v['props'] - v['proven'], 'cnt':v['props'], 'success' : 100.0 if v['props']==0 else (100.0*v['proven'])/v['props']} for k,v in units.iteritems() if v['props']>0}
df_ucnt = pd.DataFrame(unit_cnt).T
# filter those where cnt=0


print df_ucnt.head()
df_ucnt.sort_values(by=['cnt','proven'], inplace=True, ascending=False)
exclude_columns=['cnt','success']

ax=df_ucnt.ix[df_ucnt.cnt>0,df_ucnt.columns.difference(exclude_columns)].plot.bar(stacked=True,color=[TUMgreen,TUMred],figsize=(7,4));              
ax.set_ylabel('cnt')
ax.set_xlabel('unit')
#ax.grid()
plt.savefig(FOLDER + os.sep + 'units_props.pdf', bbox_inches='tight')
plt.show()

Failure Ratio does not depend on #props:


In [ ]:
df_ucnt.plot.scatter(x='cnt', y='success');
plt.show()

Coverage per Unit

Show that HAL/BSP are just spec and app is full SPARK TODO: Units which have SPARK-Mode off in spec -> they do not appear here, which is unfair

SPARK_Mode

Tri-state:

  • On (Must be in SPARK)
  • Off (forbid GNATprove to analyze this)
  • Auto (implicit: take whatever is compliant to SPARK as SPARK; ignore rest)

What does "spec" mean?

Does it mean that there is a contract, or only that the spec is in a scope with SPARK mode on?

Mixed Unit

Does it count non-SPARK subs in the body?

Non-SPARK Unit

Does GNATprove count the specs even?

  • yes, but only SPARK-compliant specs. E.g., it skips functions with side effects
  • FAT_Filesystem.Directories: lot of functions. Ents=0

We need to count entities ourselves, somehow.


In [ ]:
filter=['stm32','cortex','generic']#,'hil.','driver','hal','register']
unit_cov = {u : {'body': v["coverage"], 'spec': 100.0*v['spec'] / v['ents'] if v['ents'] > 0 else 0, 'ents' : v['ents'], 'skip': 100.0*v['skip']/v['ents']} for u,v in units.iteritems() if (not any(f in u for f in filter) and v['ents']>0)}
df_ucov = pd.DataFrame(unit_cov).T
df_ucov.sort_values(by=['body','spec','skip'], inplace=True, ascending=False)
exclude_columns=['ents']

ax=df_ucov.ix[:,df_ucov.columns.difference(exclude_columns)].plot.bar(stacked=True,figsize=(13,3),color=[TUMgreen, "black", TUMlightgray]);
ax.set_ylabel('coverage')
plt.savefig(FOLDER + os.sep + 'units_cov.pdf', bbox_inches='tight')
plt.show()
print units["fat_filesystem.directories"]

Flows by unit


In [ ]:
filter=['stm32','cortex','generic']
unit_flow = {u : {'cnt': v['flows'], 'success': v['flows_success'] } for u,v in units.iteritems() if (not any(f in u for f in filter) and v['flows']>0)}
df_flows = pd.DataFrame(unit_flow).T
df_flows.sort_values(by=['cnt'], inplace=True, ascending=False)
exclude_columns=[]
ax=df_flows.ix[:,df_flows.columns.difference(exclude_columns)].plot.bar();
ax.set_ylabel('success')
plt.show()

by task? not yet possible

specific check by package


In [ ]:
def plot_vc (vcname):
    """
    Plot success for a specific VC in all units
    """
    res = {}
    for u,v in units.iteritems():
        for r, rv in v['rules'].iteritems():
            if r == vcname and rv and 'cnt' in rv and rv['cnt'] > 0:
                res[u] = { 'cnt' : rv['cnt'], 'proven' : rv['proven'], 'fail' : rv['cnt'] - rv['proven'] }

    df_vc = pd.DataFrame(res).T
    print df_vc.head()
    df_vc.sort_values(by=['cnt','proven'], inplace=True, ascending=False)
    exclude_columns=['cnt']

    ax=df_vc.ix[:,df_vc.columns.difference(exclude_columns)].plot.bar(stacked=True,figsize=(13,7),color=[TUMred, TUMgreen]);
    ax.set_ylabel('num')
    ax.set_title('Results for ' + pretty_vcname(vcname))
    #ax.grid()
    plt.savefig(FOLDER + os.sep + 'units' + vcname + '.pdf', bbox_inches='tight')
    plt.show()

print(totals["rules"].keys()[0])
for vc in totals["rules"].keys():
    plot_vc(vc)

Analysis Time per Module and VC Type (only SPARK 17+)

Because the entry "check_tree" is only available in GNATprove 2017 and later.


In [ ]:
#print str(units['mission']['details_proofs'])
# 'check_tree' : [{'proof_attempts':{ <solver> : {'steps:..,'result':..,'time':..}}, 'transformations': {}}]

def parse_checktree(checktree):
    """
    decend into check_tree and return:
     - time sum of all proof attempts
     - count how often each solver was used
     - find most steps needed
    """
    ret = {'time': 0.0, 'solvers': {}, 'maxsteps': 0, 'result' : 'undefined'}
    howproved_counted = ['interval','flow']
    if not checktree:         
        return ret
    
    def strip_result(r):
        """
        Remove additional information from result and return a result class in {Valid, ...}
        """
        # somehow step limits are reported not as "Step limit exceeded"; but often as "Failure (steps:<n>)"
        # we remap them to the expected strings
        match = re.search(r'\w+ \(([^\)]+)\)', r)
        if match:            
            detail = match.group(1)
            if 'steps' in detail:
                return 'Step limit exceeded'
            elif 'time' in detail: # never seen this
                return 'Timeout'
            elif 'resource limit' in detail: # this must be why3-cpulimit. Either timeout or memout.
                return 'Resource limit exceeded'
            else:
                return detail
        else:
            return r
    
    result = set()
    for chk in checktree:
            #print "chk=" + str(chk)
            if 'transformations' in chk and len (chk['transformations']) > 0:
                print "trafo; not handled: " + str(chk)
                return None
            if 'proof_attempts' in chk:
                for s,sd in chk['proof_attempts'].iteritems():
                    #print "attempt=" + str(s) + ":" + str(sd)
                    if not s in ret['solvers']: ret['solvers'][s] = {}
                    ret['solvers'][s]['cnt'] = ret['solvers'][s].get('cnt', 0) + 1
                    ret['time'] += sd['time']                    
                    if sd['steps'] > ret['maxsteps']: ret['maxsteps'] = sd['steps']
                    result.add(strip_result(sd['result']))                        
            else:
                print "no proof attempts"
            if 'how_proved' in chk and chk['how_proved'] in howproved_counted:
                ret['solvers'][chk['how_proved']] = ret['solvers'].get(chk['how_proved'], 0) + 1    
    
    # decide overall result of all provers    
    #print "result=" + str(result)
    if len(result) == 1:        
        ret["result"] = next(iter(result))        
    else:
        # multiple different reasons: if there is a timeout/stepout, then give this as reason; otherwise mention what is there        
        if "Valid" in result:
            ret["result"] = 'Valid'
        elif any (substring in result for substring in ["Timeout", "Step limit exceeded", "Resource limit exceeded", "Out Of Memory"]):
            reason = []
            if "Step limit exceeded" in result: reason.append("steplimit")
            if "Of Of Memory" in result: reason.append("memlimit")
            if "Resource limit exceeded" in result: reason.append("resourcelimit")
            if "Timeout" in result: reason.append("timeout")
            ret["result"] = '|'.join(list(reason))
        else:            
            ret["result"] = '|'.join(list(result))
    #print "consolidated result:" + ret["result"]
    return ret

VCs = totals["rules"].keys()

In [ ]:
####################
# summarize per VC
####################
res = {}
have_checktree = False
for u,ud in units.iteritems():
    res[u] =  { 'VC': { v : {'cnt':0, 'solvers': {}, 'time_total': 0.0, 'time_total_undecided' : 0.0, 'time_samples' :[], 'time_samples_undecided' : [], 'result': {}} for v in VCs }}
    t_total = 0.0
    t_total_undecided = 0.0
    maxsteps = 0
    if "details_proofs" in ud:        
        #print "num proofs in " + u + "=" + str(len(ud['details_proofs']))
        for p in ud['details_proofs']:
            vc = p['rule']
            res[u]['VC'][vc]['cnt'] += 1
            ct = None
            if 'check_tree' in p: 
                ct = parse_checktree(p['check_tree'])            
            if ct:
                have_checktree = True
                t_total += ct['time']
                res[u]['VC'][vc]['time_total'] += ct['time']                  
                res[u]['VC'][vc]['time_samples'].append (ct['time'])
                if ct['maxsteps'] > maxsteps: maxsteps = ct['maxsteps']
                # merge dicts counting solver invocations
                for k,v in ct['solvers'].iteritems():
                    res[u]['VC'][vc]['solvers'][k] = res[u]['VC'][vc]['solvers'].get(k,0) + v['cnt']
                # we get no result class when there is no check tree. this happens for interval
                if ct['result'] == 'undefined':
                    if p['how_proved'] == 'interval':
                        ct['result'] = 'Valid' if p['severity'] == 'info' else 'interval failed'
                    else:
                        print "ERROR: unhandled value of how_proved. Not a solver, not interval check. what is it?"
                # we get back exactly one result class per check. Count how often this happens in the unit
                res[u]['VC'][vc]['result'][ct['result']] = res[u]['VC'][vc]['result'].get(ct['result'],0) + 1
                # now also collect samples and sum time for proofs that did not finish
                if not ct['result'] in ["Valid", "HighFailure", "Failure"]:
                    t_total_undecided += ct['time']
                    res[u]['VC'][vc]['time_samples_undecided'].append (ct['time'])
                    res[u]['VC'][vc]['time_total_undecided'] += ct['time']
    res[u]['time'] = t_total
    res[u]['time_undecided'] = t_total_undecided
    res[u]['result'] = {} # TODO: summarize 
    res[u]['maxsteps'] = maxsteps

t_analysis_sec = sum([ud['time'] for u,ud in res.iteritems()])
print "################################"
print "# TOTAL CPU TIME = {0:0.1f} min".format(t_analysis_sec/60)
print "################################"
print "Note: divide by number of cores to get approximate wall-clock time"
    
    
if not have_checktree:
    print "No Check tree (needs newer GNATprove). Stopping report here."
    #exit(0) #  does not kill the kernel

In [ ]:
###########################
# plot total time per unit
###########################
df_unittime = pd.DataFrame({k: v['time'] for k,v in res.iteritems() if v['time'] > 0.0}, index=['time']).T
df_unittime.sort_values(by=['time'], inplace=True, ascending=False)
ax=df_unittime.plot.bar(figsize=(15,5),color=[Pantone301],legend=False)
ax.set_ylabel('analysis time [s]')
ax.set_title('Analysis time vs. Unit')
#ax.grid()
plt.savefig(FOLDER + os.sep + 'units_time.pdf', bbox_inches='tight')
plt.show()

In [ ]:
###########################
# plot maxsteps per unit
###########################
df_unitsteps = pd.DataFrame({k:v['maxsteps'] for k,v in res.iteritems() if v['maxsteps'] > 0}, index=['steps']).T
df_unitsteps.sort_values(by=['steps'], inplace=True, ascending=False)
ax=df_unitsteps.plot.bar(figsize=(15,5),color=[Pantone301],legend=False)
ax.set_ylabel('steps')
ax.set_title('Analysis steps vs. Unit')
#ax.grid()
plt.savefig(FOLDER + os.sep + 'steps.pdf', bbox_inches='tight')
plt.show()

In [ ]:
##########################
# plot total time per VC (summarizing all units)
##########################
def plot_vc_overview():
    pdata = {} # {u'UNINITIALIZED': {'cnt': 0, 'time': 0.0, 'result': {}, 'solvers': {}}, ... }
    
    def accumulate(vc,vcd):        
        #print vc + ": " + str(vcd['time_total']) + " [" + str(vcd['time_samples']) + "]"
        if not vc in pdata: pdata[vc] = {}                   
        pdata[vc]['time'] = pdata[vc].get('time', 0.0) + vcd['time_total']
        pdata[vc]['time_undecided'] = pdata[vc].get('time_undecided', 0.0) + vcd['time_total_undecided']
        if not 'samples' in pdata[vc]: pdata[vc]['samples'] = []
        pdata[vc]['samples'].extend (vcd['time_samples'])
        if not 'samples_undecided' in pdata[vc]: pdata[vc]['samples_undecided'] = []
        pdata[vc]['samples_undecided'].extend (vcd['time_samples_undecided'])
    
    for u,ud in res.iteritems():
        #print "##" + u
        for vc,vcd in ud['VC'].iteritems():
            accumulate(pretty_vcname(vc),vcd)

    #print pdata
            
    # sanity check: sum of all must be approximately analysis time
    #print "############################"
    #print "# time total for all VCs: {0:0.1f} min".format(sum([v['time'] for k,v in pdata.iteritems()])/60)
    #print "############################"
            
    df_vc = pd.DataFrame(pdata).T
    df_vc.sort_values(by=['time'], inplace=True, ascending=False)
    print df_vc.head()
    
    ax=df_vc.ix[:,df_vc.columns.difference(['time_samples', 'time_samples_undecided','time_undecided'])].plot.bar(logy=True,figsize=(5,2.5),color=[Pantone301,TUMred],legend=False);
    ax.set_ylabel('Analysis time [s]')
    ax.set_title('Total CPU time per VC type')
    #ax.grid()
    plt.savefig(FOLDER + os.sep + 'total_time_per_VC.pdf', bbox_inches='tight')    
    plt.show()  
    
    print "##############################"    
    print "# Percentage of time spent in:"
    print "##############################"
    for k,v in pdata.iteritems():
        print k + ": {0:0.1f}".format(100.0*v['time']/t_analysis_sec)
    
    tmp = { k: v['samples'] for k,v in pdata.iteritems()}
    df_stats = pd.DataFrame.from_dict(tmp, orient='index').T #  we need this orient and .T because we want NaNs to fill unequal lengths
    #print df_stats.head()
    #print df_stats.describe()    
    exclude_columns = ['Fp Overflow Check','Assert'] # because Float is extremely slow
    ax=df_stats.ix[:,df_stats.columns.difference(exclude_columns)].plot.box(vert=False,figsize=(20,6),showfliers=False);
    #plt.setp( ax.xaxis.get_majorticklabels(), rotation=90 )
    ax.set_ylabel('Analysis time [s]')
    ax.set_title('Statistical analysis time of a single VC')
    #ax.grid()
    plt.savefig(FOLDER + os.sep + 'statistical_time_per_VC.pdf', bbox_inches='tight')    
    plt.show()  
        
        
plot_vc_overview()

In [ ]:
def plot_vc_time_per_unit(vcname):
    """
    Make a plot showing time spent in a specific VC for all modules
    """
    pdata = {}
    for u,ud in res.iteritems():
        if vcname in ud['VC']:
            t = ud['VC'][vcname]['time_total']
            t2 = ud['VC'][vcname]['time_total_undecided']
            if t > 0.0: 
                pdata[u] = { 'time total': t, 'time undecided':t2 }
                

    if len(pdata.keys()) == 0: return
                
    df_vc = pd.DataFrame(pdata).T
    df_vc.sort_values(by=['time total'], inplace=True, ascending=False)
    ax=df_vc.plot.bar(figsize=(13,7),color=[Pantone301, TUMred],legend=True);
    ax.set_ylabel('Analysis time [s]')
    ax.set_title('Analysis time for ' + pretty_vcname(vcname))
    #ax.grid()
    plt.savefig(FOLDER + os.sep + 'time_in_' + vcname + '.pdf', bbox_inches='tight')    
    plt.show()    
    


#########################################
# plot VC time of a specific VC per unit
#########################################

for vc in totals["rules"].keys():    
    plot_vc_time_per_unit(vc)

How often do we fail because of timeout/stepout?


In [ ]:
#print " "
#print "Mission:"
#print res['mission']
#  u'VC_RANGE_CHECK': {'cnt': 61, 'time': 68.78999999999995, 'result': {'steplimit': 3, u'Valid': 48, 'undefined': 10}, 'solvers': {u'CVC4': 51, u'altergo': 3, u'CVC4_CE': 3, u'Z3': 3}}

def plot_failure_reason(unit):
    """
    Make a plot showing why checks in unit fail
    """            
    pdata = {v : vd['result'] for v,vd in res[unit]['VC'].iteritems() if vd and 'cnt' in vd and vd['cnt'] > 0}
    
    #print unit + ":" + str(pdata)
    num_fails = sum([1 for v,vd in pdata.iteritems() if not 'Valid' in vd.keys() or len(vd.keys()) > 1])
    if num_fails == 0:
        print " "
        print "no fails for " + unit
        return
    
    # add 'cnt' to each VC (total)
    for v,vd in pdata.iteritems():        
        vd['cnt'] = res[unit]['VC'][v]['cnt']            
        
    
    
    if len(pdata.keys()) == 0: 
        print "no data for " + unit
        return
          
    df_vc = pd.DataFrame(pdata).T
    df_vc.sort_values(by=['cnt'], inplace=True, ascending=False)
    
    exclude_columns=['cnt']
    ax=df_vc.ix[:,df_vc.columns.difference(exclude_columns)].plot.bar(figsize=(10,5));
    ax.set_ylabel('Number of')
    ax.set_title('Why checks fail in ' + unit)
    #ax.grid()
    plt.savefig(FOLDER + os.sep + 'unit_fails_' + unit + '.pdf', bbox_inches='tight')    
    plt.show() 
    
for k in units.keys():
    if units[k]["success"] < 100.0: plot_failure_reason (k)