In [3]:
from __future__ import print_function;
import sys;
import findspark
findspark.init('/home/imran/spark')
from pyspark.sql import SparkSession;
from pyspark.sql.types import StructType, StructField, StringType, IntegerType;
from pyspark.sql.functions import regexp_replace, ltrim, rtrim;
filepath = "/home/imran/py/PepoleEDA/data/ConfLongDemo_JSI.txt";
spark = SparkSession.builder.appName("Parse DF Using UDF").getOrCreate();
In [4]:
df = spark.read.csv(filepath, inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, maxColumns=100
, timestampFormat='dd.MM.yyyy HH:mm:ss:SSS' );
In [5]:
print(df.schema);
df.select('_c3').distinct().take(1)
Out[5]:
In [73]:
df = spark.read.csv(filepath, inferSchema=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, maxColumns=100);
df.select("_c3").show(2, False);
print(df.schema);
In [75]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DateType
strformat = '%d.%m.%Y %H:%M:%S:%f';
print("Method one, exception handling can be customized")
def udf_formatdate(x):
dvalue = None;
try:
dvalue = datetime.strptime(x if x!= None else '27.05.2009 14:03:25:127', strformat);
except ValueError as e:
dvalue = datetime.now();
return dvalue;
udf_str_date = udf(udf_formatdate, DateType());
df1 = df.withColumn("ConvertedDate",udf_str_date(col("_c3")) )
df1.select("ConvertedDate").show(3, False)
df1.printSchema();
print("Method two, no exception handling")
print("Although we are handling null values using ternary operator, but in case of buff chars in date string ~ then it breaks")
func = udf (lambda x: datetime.strptime(x if x!= None else '27.05.2009 14:03:25:127', strformat), DateType())
df2 = df.withColumn('dtColumn', func(col('_c3')))
df2.select("dtColumn").show(5, False)
df2.printSchema();
In [ ]: