In [1]:
import math
import string
import datetime
import numpy as np
import matplotlib.pyplot
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pylab

%matplotlib inline

In [ ]:
def deltaCalc(time):
    dt = datetime.datetime.strptime(time,"%Y-%m-%d %H:%M:%S")
            # the J2000.0 epoch start on January 1, 2000, 12:00 GMT = 4:00 PST
    delta = dt - datetime.datetime.strptime("2000-01-01 04:00:00", "%Y-%m-%d %H:%M:%S")
    return float(delta.total_seconds() / (365.25*24*60*60))

exo_rdd = exo_file.rdd.map(lambda p: Row(TgtId=p.TgtId, J2000_time=deltaCalc(p.Time),drift=float(p.DriftHzs)))

In [ ]:
doppler_fields = [StructField("J2000_time",FloatType(), True),
                  StructField("TgtId",StringType(), True),
                  StructField("drift",FloatType(), True)
                  ]

doppler_schema = StructType(doppler_fields)
print doppler_schema

In [ ]:
# Register the doppler corrected DF as a table.
calcdf = sqlContext.createDataFrame(exo_rdd, doppler_schema)
calcdf.registerTempTable("calcdb")

In [ ]:
driftSpreadDF = sqlContext.sql("SELECT TgtId, MIN(drift) minimum_d, MAX (drift) maximum_d, MIN(J2000_time) minimum_time, MAX(J2000_time) maximum_time from calcdb GROUP BY TgtId")
driftSpreadDF.count()

In [ ]:
#show results for export
driftSpreadDF.show(20000)

In [ ]: