In [10]:
from ipywidgets import widgets
from IPython.display import display
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.types import StringType,StructField,StructType,ArrayType,DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT,Matrix,MatrixUDT,DenseMatrix
from pyspark.ml.clustering import KMeans
from pyspark.sql import Row
from pyspark.sql import Window
from pyspark.ml import Pipeline
import pandas as pd
import re
import random
from prettytable import PrettyTable
import sys
from datetime import datetime
from operator import add
import numpy as np
import matplotlib.pyplot as plt
#from spark_sklearn import GridSearchCV,Converter
PATH = "/home/svanhmic/workspace/Python/Erhvervs/data/cdata/"
sc.addPyFile("/home/svanhmic/workspace/Python/Erhvervs/src/cvr/GridSearchLogRegAndKmeans.py")
In [2]:
Out[2]:
In [3]:
In [4]:
def getAllDistances(matrix1,matrix2):
return [[np.linalg.norm(v-w) for v in matrix1 ] for w in matrix2]
In [5]:
#Get all the cvr features and rename bad name rank into vaerdiSlope
df = (sqlContext
.read
.parquet(PATH+"featureDataCvr")
)
rankCols = [re.sub(pattern="rank_",repl="vaerdiSlope_",string=i) for i in df.columns ]
renamedDf = (df
.withColumn(colName="reklamebeskyttet",col=F.col("reklamebeskyttet").cast("double"))
.select([F.col(val).alias(rankCols[idx]) for idx,val in enumerate(df.columns)])
)
#Get all companies
windowSpecRank =(Window.partitionBy(F.col("cvrNummer"))).orderBy(F.col("periode_gyldigFra").desc())
groupCols = ["cvrNummer","vaerdi"]
companyNameDf = (sqlContext
.read
.parquet(PATH+"companyCvrData")
.withColumn(colName="rank",col=F.rank().over(windowSpecRank))
.filter((F.col("rank")==1) & (F.col("sekvensnr")==0))
.select([F.col(i) for i in groupCols])
.withColumnRenamed(existing="vaerdi",new="navn")
.orderBy(F.col("cvrNummer"))
.cache()
)
In [8]:
labelCols = ["navn","cvrNummer","label","status"]
featCols = [i for i in companyNameDf.columns+renamedDf.columns if i not in labelCols]
selectCols = [F.col(i[0]).cast("double") if i[1] == "bigint" else F.col(i[0]) for i in renamedDf.dtypes]
toDenseUDf = F.udf(lambda x: Vectors.dense(x.toArray()),VectorUDT())
#the steps in the pipeline is generated here
vectorizer = VectorAssembler(inputCols=featCols,outputCol="features")
standardScale = StandardScaler(withMean=True,withStd=True,inputCol=vectorizer.getOutputCol(),outputCol="scaledFeatures")
renamedWithCompaniesDf = (renamedDf
.join(companyNameDf,(companyNameDf["cvrNummer"]==renamedDf["cvrNummer"]),"left")
.drop(companyNameDf["cvrNummer"])
.select(selectCols)
.withColumn(col=F.col("cvrNummer").cast("long"),colName="cvrNummer")
.na
.fill(0.0,featCols)
.distinct()
)
In [9]:
renamedWithCompaniesDf.show()
In [ ]: