In [1]:
import dask
import dask.dataframe as dd
from dask.diagnostics import ProgressBar, Profiler, ResourceProfiler, visualize
import bokeh.plotting as bp

In [2]:
ProgressBar().register()
bp.output_notebook()


Loading BokehJS ...

In [3]:
cols = ['square_id', 'timestamp', 'country_code',
        'sms_in', 'sms_out','call_in','call_out', 'internet']

dtypes = {'square_id': int, 'timestamp': int, 'countrycode': int, 
          'sms_in': float,'sms_out': float, 'call_in': float, 'call_out': float, 'internet': float}

In [4]:
#reads file from external SSD because of txt files account for 20,8GB
df = dd.read_csv('/Volumes/Samsung_T5/data/cells/*.txt', 
                 header=0, 
                 names=cols, 
                 dtype=dtypes, 
                 sep="\t")

In [5]:
df.head(10)


[########################################] | 100% Completed |  1.3s
Out[5]:
square_id timestamp country_code sms_in sms_out call_in call_out internet
0 1 1383260400000 39 0.141864 0.156787 0.160938 0.052275 11.028366
1 1 1383261000000 0 0.136588 NaN NaN 0.027300 NaN
2 1 1383261000000 33 NaN NaN NaN NaN 0.026137
3 1 1383261000000 39 0.278452 0.119926 0.188777 0.133637 11.100963
4 1 1383261600000 0 0.053438 NaN NaN NaN NaN
5 1 1383261600000 39 0.330641 0.170952 0.134176 0.054601 10.892771
6 1 1383262200000 0 0.026137 NaN NaN NaN NaN
7 1 1383262200000 39 0.681434 0.220815 0.027300 0.053438 8.622425
8 1 1383262800000 0 0.027300 NaN NaN NaN NaN
9 1 1383262800000 39 0.243378 0.192891 0.053438 0.080738 8.009927

In [6]:
df.tail(2)


[########################################] | 100% Completed |  0.3s
Out[6]:
square_id timestamp country_code sms_in sms_out call_in call_out internet
137425 9999 1388616600000 0 0.085995 NaN NaN NaN NaN
137426 9999 1388616600000 39 0.328587 0.50038 NaN 0.085995 13.823244

In [7]:
#Show count of data in dask dataframe
with Profiler() as prof, ResourceProfiler() as rprof:
    records = df.count().compute()
print(records)
visualize([prof, rprof])


[########################################] | 100% Completed |  3min  2.7s
square_id       319896227
timestamp       319896227
country_code    319896227
sms_in          173720327
sms_out          99966079
call_in          98596516
call_out        138075046
internet        160681340
dtype: int64
Out[7]:
Column(
id = 'd5457b2a-6ad6-4a3e-aa3c-d9fc488553cd', …)

In [8]:
#The dask DataFrame is partitioned into chunks along the index. 
#To see how many partitions, you can use the npartitions attribute.
df.npartitions


Out[8]:
353

In [9]:
#show highest internet traffic squares by country code
with Profiler() as prof, ResourceProfiler() as rprof:
    top_internet_cells = df.internet.groupby([df.square_id, df.country_code]).mean().nlargest(5).compute()
print(top_internet_cells)
visualize([prof, rprof])


[########################################] | 100% Completed |  4min 23.2s
square_id  country_code
5161       39              1405.944113
5059       39              1238.826801
5259       39              1166.016916
5061       39              1053.595438
5258       39               968.056671
Name: internet, dtype: float64
Out[9]:
Column(
id = '92f8869e-1dd4-43df-8827-425f85c41573', …)

In [ ]: