In [1]:
import numpy as np
from scipy.integrate import simps
import dask.array as da
from dask import delayed
from dask.multiprocessing import get

def doForwardEquation(rays,K_ne,mTCI):
    N1,N2, _ , Ns = rays.shape
    nevec = np.zeros(Ns,dtype=np.double)
    g = np.zeros([N1,N2],dtype=np.double)
    j = 0
    while j < N1:
        k = 0
        while k < N2:
            x = rays[j,k,0,:]
            y = rays[j,k,1,:]
            z = rays[j,k,2,:]
            s = rays[j,k,3,:]
            nevec *= 0
            idx = 0
            while idx < Ns:
                nevec[idx] += mTCI.interp(x[idx],y[idx],z[idx])
                idx += 1
            
            np.exp(nevec,out=nevec)
            nevec *= K_ne/1e13
            g[j,k] += simps(nevec,s)
            k += 1
        j += 1
    return g
                

def forwardEquation(rays,K_ne,mTCI,i0):
    '''For each ray do the forward equation using ref antenna i0'''
    Na,Nt,Nd, _, Ns = rays.shape
    if Na < Nd:
        #do over antennas
        tec = np.stack([doForwardEquation(rays[i,:,:,:,:],K_ne,mTCI) for i in range(Na)],axis=0)
    else:
        #do over directions
        tec = np.stack([doForwardEquation(rays[:,:,k,:,:],K_ne,mTCI) for k in range(Nd)],axis=2)
    dtec = tec - tec[i0,:,:]
    return dtec

def forwardEquation_dask(rays,K_ne,mTCI,i0):
    '''For each ray do the forward equation using ref antenna i0'''
    Na,Nt,Nd, _, Ns = rays.shape
    if Na < Nd:
        #do over antennas
        tec = da.stack([da.from_delayed(delayed(doForwardEquation)(rays[i,:,:,:,:],K_ne,mTCI),(Nt,Nd),dtype=np.double) for i in range(Na)],axis=0)
    else:
        #do over directions
        tec = da.stack([da.from_delayed(delayed(doForwardEquation)(rays[:,:,k,:,:],K_ne,mTCI),(Na,Nt),dtype=np.double) for k in range(Nd)],axis=2)
    dtec = tec - tec[i0,:,:]
    return dtec.compute(get=get)

def test_forwardEquation():
    from TricubicInterpolation import TriCubic
    from RealData import DataPack
    from AntennaFacetSelection import selectAntennaFacets
    from CalcRays import calcRays
    import pylab as plt
    datapack = DataPack(filename="output/test/datapackObs.hdf5")
    datapackSel = selectAntennaFacets(15, datapack, antIdx=-1, dirIdx=-1, timeIdx = [0])
    antennas,antennaLabels = datapackSel.get_antennas(antIdx = -1)
    patches, patchNames = datapackSel.get_directions(dirIdx = -1)
    times,timestamps = datapackSel.get_times(timeIdx=[0])
    Na = len(antennas)
    Nt = len(times)
    Nd = len(patches)  
    fixtime = times[Nt>>1]
    phase = datapack.getCenterDirection()
    arrayCenter = datapack.radioArray.getCenter()
    neTCI = TriCubic(filename="output/test/simulate/simulate_0/neModel.hdf5")
    rays = calcRays(antennas,patches,times, arrayCenter, fixtime, phase, neTCI, datapack.radioArray.frequency, True, 1000, 250)
    K_ne = np.mean(neTCI.m)
    neTCI.m = np.log(neTCI.m/K_ne)
    mTCI = neTCI
    i0 = 0
    
    g = forwardEquation_dask(rays,K_ne,mTCI,i0)
    %time forwardEquation_dask(rays,K_ne,mTCI,i0)
    %timen 3 forwardEquation(rays,K_ne,mTCI,i0)
    
    m = mTCI.m.copy()
    i0 = 0
    rmse = []
    rmsef = []
    for error in np.linspace(0,0.25,40):
        mTCI.m = m + error*np.random.normal(size=m.size)
        g_ = forwardEquation_dask(rays,K_ne,mTCI,i0)
        rmse.append(np.sqrt(np.mean((g_ - g)**2)))
        rmsef.append(np.sqrt(np.mean(((g_ - g)/(g+1e-15))**2)))
        #plt.hist(g.flatten())
        #plt.show()
    plt.plot(np.linspace(0,0.25,40),rmse)
    plt.xlabel('Gaussian Noise Level')
    plt.ylabel('RMS Error of forward equation (TECU)')
    plt.yscale('log')
    plt.savefig('forwardEquationRMSE-noise.pdf',format='pdf')
    plt.show()
    plt.plot(np.linspace(0,0.25,40),rmsef)
    plt.xlabel('Gaussian Noise Level')
    plt.ylabel('RMS Error of forward equation (fractional)')
    plt.yscale('log')
    plt.savefig('forwardEquationRMSEf-noise.pdf',format='pdf')
    plt.show()
    #datapackSel.set_dtec(g,antIdx=-1,timeIdx=[0], dirIdx=-1,refAnt=antennaLabels[i0])
    #datapackSel.save("output/test/datapackSim.hdf5")
    
    
    
    

if __name__ == '__main__':
    test_forwardEquation()


Setting refAnt: CS201HBA0
Loaded 58 antennas, 3595 times, 34 directions
Setting refAnt: CS201HBA0
flagged ['CS201HBA0', 'CS021HBA1', 'CS021HBA0', 'CS011HBA0', 'CS011HBA1', 'CS030HBA1', 'CS030HBA0', 'CS004HBA0', 'CS004HBA1', 'CS026HBA0', 'CS026HBA1', 'CS301HBA0', 'CS301HBA1', 'CS101HBA0', 'CS101HBA1', 'CS024HBA0', 'CS024HBA1', 'CS401HBA0', 'CS032HBA1', 'CS032HBA0', 'CS028HBA0', 'CS028HBA1', 'CS001HBA1', 'CS001HBA0', 'RS503HBA', 'CS003HBA1', 'CS003HBA0', 'RS306HBA', 'CS002HBA0', 'CS002HBA1', 'CS501HBA0', 'CS501HBA1', 'RS305HBA', 'CS005HBA1', 'CS005HBA0', 'CS006HBA0', 'CS006HBA1', 'CS302HBA1', 'CS302HBA0', 'CS017HBA0', 'CS017HBA1', 'CS031HBA0', 'CS031HBA1']
Setting refAnt: CS401HBA1
Loaded 15 antennas, 3595 times, 34 directions
Setting refAnt: CS401HBA1
flagged ['s2', 's251', 's252', 's254', 's8', 's23', 's3', 's10', 's5', 's7', 's9', 's6', 's22', 's15', 's17', 's24', 's11', 's16', 's30']
Casting rays: 225
splitting over directions
ERROR:root:Line magic function `%timen` not found.
Wall time: 1.42 s
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-1-d131f23c34d3> in <module>()
    114 
    115 if __name__ == '__main__':
--> 116     test_forwardEquation()

<ipython-input-1-d131f23c34d3> in test_forwardEquation()
     89     for error in np.linspace(0,0.25,40):
     90         mTCI.m = m + error*np.random.normal(size=m.size)
---> 91         g_ = forwardEquation_dask(rays,K_ne,mTCI,i0)
     92         rmse.append(np.sqrt(np.mean((g_ - g)**2)))
     93         rmsef.append(np.sqrt(np.mean(((g_ - g)/(g+1e-15))**2)))

<ipython-input-1-d131f23c34d3> in forwardEquation_dask(rays, K_ne, mTCI, i0)
     53         tec = da.stack([da.from_delayed(delayed(doForwardEquation)(rays[:,:,k,:,:],K_ne,mTCI),(Na,Nt),dtype=np.double) for k in range(Nd)],axis=2)
     54     dtec = tec - tec[i0,:,:]
---> 55     return dtec.compute(get=get)
     56 
     57 def test_forwardEquation():

C:\Users\josh\Anaconda3\envs\compute\lib\site-packages\dask-0.14.3+28.gb0cf627-py3.5.egg\dask\base.py in compute(self, **kwargs)
     94             Extra keywords to forward to the scheduler ``get`` function.
     95         """
---> 96         (result,) = compute(self, traverse=False, **kwargs)
     97         return result
     98 

C:\Users\josh\Anaconda3\envs\compute\lib\site-packages\dask-0.14.3+28.gb0cf627-py3.5.egg\dask\base.py in compute(*args, **kwargs)
    201     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    202     keys = [var._keys() for var in variables]
--> 203     results = get(dsk, keys, **kwargs)
    204 
    205     results_iter = iter(results)

C:\Users\josh\Anaconda3\envs\compute\lib\site-packages\dask-0.14.3+28.gb0cf627-py3.5.egg\dask\multiprocessing.py in get(dsk, keys, num_workers, func_loads, func_dumps, optimize_graph, **kwargs)
    175                            get_id=_process_get_id, dumps=dumps, loads=loads,
    176                            pack_exception=pack_exception,
--> 177                            raise_exception=reraise, **kwargs)
    178     finally:
    179         if cleanup:

C:\Users\josh\Anaconda3\envs\compute\lib\site-packages\dask-0.14.3+28.gb0cf627-py3.5.egg\dask\local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    511         # Main loop, wait on tasks to finish, insert new ones
    512         while state['waiting'] or state['ready'] or state['running']:
--> 513             key, res_info, failed = queue_get(queue)
    514             if failed:
    515                 exc, tb = loads(res_info)

C:\Users\josh\Anaconda3\envs\compute\lib\site-packages\dask-0.14.3+28.gb0cf627-py3.5.egg\dask\local.py in queue_get(q)
    144         while True:
    145             try:
--> 146                 return q.get(block=True, timeout=0.1)
    147             except Empty:
    148                 pass

C:\Users\josh\Anaconda3\envs\compute\lib\queue.py in get(self, block, timeout)
    171                     if remaining <= 0.0:
    172                         raise Empty
--> 173                     self.not_empty.wait(remaining)
    174             item = self._get()
    175             self.not_full.notify()

C:\Users\josh\Anaconda3\envs\compute\lib\threading.py in wait(self, timeout)
    295             else:
    296                 if timeout > 0:
--> 297                     gotit = waiter.acquire(True, timeout)
    298                 else:
    299                     gotit = waiter.acquire(False)

KeyboardInterrupt: 

In [ ]: