In [3]:
from __future__ import print_function;
import sys;
import findspark
findspark.init('/home/imran/spark')


from pyspark.sql import SparkSession;
from pyspark.sql.types import StructType, StructField, StringType, IntegerType;
from pyspark.sql.functions import regexp_replace, ltrim, rtrim;

filepath = "/home/imran/py/PepoleEDA/data/ConfLongDemo_JSI.txt";

spark = SparkSession.builder.appName("Parse DF Using UDF").getOrCreate();

In [4]:
df = spark.read.csv(filepath, inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, maxColumns=100
                   , timestampFormat='dd.MM.yyyy HH:mm:ss:SSS' );

In [5]:
print(df.schema);
df.select('_c3').distinct().take(1)


StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,LongType,true),StructField(_c3,TimestampType,true),StructField(_c4,DoubleType,true),StructField(_c5,DoubleType,true),StructField(_c6,DoubleType,true),StructField(_c7,StringType,true)))
Out[5]:
[Row(_c3=datetime.datetime(2009, 5, 27, 14, 3, 25, 453000))]

In [73]:
df = spark.read.csv(filepath, inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, maxColumns=100);
df.select("_c3").show(2, False);
print(df.schema);


+-----------------------+
|_c3                    |
+-----------------------+
|27.05.2009 14:03:25:127|
|27.05.2009 14:03:25:183|
+-----------------------+
only showing top 2 rows

StructType(List(StructField(_c0,StringType,true),StructField(_c1,StringType,true),StructField(_c2,LongType,true),StructField(_c3,StringType,true),StructField(_c4,DoubleType,true),StructField(_c5,DoubleType,true),StructField(_c6,DoubleType,true),StructField(_c7,StringType,true)))

In [75]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DateType


strformat = '%d.%m.%Y %H:%M:%S:%f';

print("Method one, exception handling can be customized")
def udf_formatdate(x): 
    dvalue = None;
    try:
         dvalue = datetime.strptime(x if x!= None else '27.05.2009 14:03:25:127', strformat);
    except ValueError as e:
        dvalue = datetime.now();
    return dvalue;

udf_str_date = udf(udf_formatdate, DateType());
df1 = df.withColumn("ConvertedDate",udf_str_date(col("_c3")) )
df1.select("ConvertedDate").show(3, False) 
df1.printSchema();

print("Method two, no exception handling")
print("Although we are handling null values using ternary operator, but in case of buff chars in date string ~ then it breaks")
func =  udf (lambda x: datetime.strptime(x if x!= None else '27.05.2009 14:03:25:127', strformat), DateType())
df2 = df.withColumn('dtColumn', func(col('_c3')))
df2.select("dtColumn").show(5, False)
df2.printSchema();


Method one, exception handling can be customized
+-------------+
|ConvertedDate|
+-------------+
|2009-05-27   |
|2009-05-27   |
|2009-05-27   |
+-------------+
only showing top 3 rows

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: long (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: string (nullable = true)
 |-- ConvertedDate: date (nullable = true)

Method two, no exception handling
Although we are handling null values using ternary operator, but in case of buff chars in date string ~ then it breaks
+----------+
|dtColumn  |
+----------+
|2009-05-27|
|2009-05-27|
|2009-05-27|
|2009-05-27|
|2009-05-27|
+----------+
only showing top 5 rows

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: long (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: string (nullable = true)
 |-- dtColumn: date (nullable = true)


In [ ]: