This notebook shows:
In [ ]:
!pip show systemml
In [ ]:
from systemml import MLContext, dml, dmlFromResource
ml = MLContext(sc)
print ("Spark Version:" + sc.version)
print ("SystemML Version:" + ml.version())
print ("SystemML Built-Time:"+ ml.buildTime())
In [ ]:
ml.execute(dml("""s = 'Hello World!'""").output("s")).get("s")
In [ ]:
import sys, os, glob, subprocess
import matplotlib.pyplot as plt
import numpy as np
from sklearn import datasets
plt.switch_backend('agg')
def printLastLogLines(n):
fname = max(glob.iglob(os.sep.join([os.environ["HOME"],'/logs/notebook/kernel-pyspark-*.log'])), key=os.path.getctime)
print(subprocess.check_output(['tail', '-' + str(n), fname]))
In [ ]:
script = """
X = rand(rows=$nr, cols=1000, sparsity=0.5)
A = t(X) %*% X
s = sum(A)
"""
In [ ]:
prog = dml(script).input('$nr', 1e5).output('s')
s = ml.execute(prog).get('s')
print (s)
In [ ]:
ml = MLContext(sc)
ml = ml.setStatistics(True)
# re-execute ML program
# printLastLogLines(22)
In [ ]:
prog = dml(script).input('$nr', 1e6).output('s')
out = ml.execute(prog).get('s')
print (out)
In [ ]:
ml = MLContext(sc)
ml = ml.setStatistics(False)
In [ ]:
%matplotlib inline
In [ ]:
diabetes = datasets.load_diabetes()
diabetes_X = diabetes.data[:, np.newaxis, 2]
diabetes_X_train = diabetes_X[:-20]
diabetes_X_test = diabetes_X[-20:]
diabetes_y_train = diabetes.target[:-20].reshape(-1,1)
diabetes_y_test = diabetes.target[-20:].reshape(-1,1)
plt.scatter(diabetes_X_train, diabetes_y_train, color='black')
plt.scatter(diabetes_X_test, diabetes_y_test, color='red')
In [ ]:
diabetes.data.shape
In [ ]:
script = """
# add constant feature to X to model intercept
X = cbind(X, matrix(1, rows=nrow(X), cols=1))
A = t(X) %*% X
b = t(X) %*% y
w = solve(A, b)
bias = as.scalar(w[nrow(w),1])
w = w[1:nrow(w)-1,]
"""
In [ ]:
prog = dml(script).input(X=diabetes_X_train, y=diabetes_y_train).output('w', 'bias')
w, bias = ml.execute(prog).get('w','bias')
w = w.toNumPy()
In [ ]:
plt.scatter(diabetes_X_train, diabetes_y_train, color='black')
plt.scatter(diabetes_X_test, diabetes_y_test, color='red')
plt.plot(diabetes_X_test, (w*diabetes_X_test)+bias, color='blue', linestyle ='dotted')
In [ ]:
script = """
# add constant feature to X to model intercepts
X = cbind(X, matrix(1, rows=nrow(X), cols=1))
max_iter = 100
w = matrix(0, rows=ncol(X), cols=1)
for(i in 1:max_iter){
XtX = t(X) %*% X
dw = XtX %*%w - t(X) %*% y
alpha = -(t(dw) %*% dw) / (t(dw) %*% XtX %*% dw)
w = w + dw*alpha
}
bias = as.scalar(w[nrow(w),1])
w = w[1:nrow(w)-1,]
"""
In [ ]:
prog = dml(script).input(X=diabetes_X_train, y=diabetes_y_train).output('w').output('bias')
w, bias = ml.execute(prog).get('w', 'bias')
w = w.toNumPy()
In [ ]:
plt.scatter(diabetes_X_train, diabetes_y_train, color='black')
plt.scatter(diabetes_X_test, diabetes_y_test, color='red')
plt.plot(diabetes_X_test, (w*diabetes_X_test)+bias, color='red', linestyle ='dashed')
Problem with gradient descent: Takes very similar directions many times
Solution: Enforce conjugacy
Step 1: Start with an initial point
while(not converged) {
Step 2: Compute gradient dw.
Step 3: Compute stepsize alpha.
Step 4: Compute next direction p by enforcing conjugacy with previous direction.
Step 5: Update: w_new = w_old + alpha*p
}
In [ ]:
script = """
# add constant feature to X to model intercepts
X = cbind(X, matrix(1, rows=nrow(X), cols=1))
m = ncol(X); i = 1;
max_iter = 20;
w = matrix (0, rows = m, cols = 1); # initialize weights to 0
dw = - t(X) %*% y; p = - dw; # dw = (X'X)w - (X'y)
norm_r2 = sum (dw ^ 2);
for(i in 1:max_iter) {
q = t(X) %*% (X %*% p)
alpha = norm_r2 / sum (p * q); # Minimizes f(w - alpha*r)
w = w + alpha * p; # update weights
dw = dw + alpha * q;
old_norm_r2 = norm_r2; norm_r2 = sum (dw ^ 2);
p = -dw + (norm_r2 / old_norm_r2) * p; # next direction - conjugacy to previous direction
i = i + 1;
}
bias = as.scalar(w[nrow(w),1])
w = w[1:nrow(w)-1,]
"""
In [ ]:
prog = dml(script).input(X=diabetes_X_train, y=diabetes_y_train).output('w').output('bias')
w, bias = ml.execute(prog).get('w','bias')
w = w.toNumPy()
In [ ]:
plt.scatter(diabetes_X_train, diabetes_y_train, color='black')
plt.scatter(diabetes_X_test, diabetes_y_test, color='red')
plt.plot(diabetes_X_test, (w*diabetes_X_test)+bias, color='red', linestyle ='dashed')
In [ ]:
import os
from subprocess import call
dirName = os.path.dirname(os.path.realpath("~")) + "/scripts"
call(["mkdir", "-p", dirName])
call(["wget", "-N", "-q", "-P", dirName, "https://raw.githubusercontent.com/apache/systemml/master/scripts/algorithms/LinearRegDS.dml"])
scriptName = dirName + "/LinearRegDS.dml"
dml_script = dmlFromResource(scriptName)
prog = dml_script.input(X=diabetes_X_train, y=diabetes_y_train).input('$icpt',1.0).output('beta_out')
w = ml.execute(prog).get('beta_out')
w = w.toNumPy()
bias=w[1]
print (bias)
In [ ]:
plt.scatter(diabetes_X_train, diabetes_y_train, color='black')
plt.scatter(diabetes_X_test, diabetes_y_test, color='red')
plt.plot(diabetes_X_test, (w[0]*diabetes_X_test)+bias, color='red', linestyle ='dashed')
mllearn API allows a Python programmer to invoke SystemML's algorithms using scikit-learn like API as well as Spark's MLPipeline API.
In [ ]:
from pyspark.sql import SQLContext
from systemml.mllearn import LinearRegression
sqlCtx = SQLContext(sc)
In [ ]:
regr = LinearRegression(sqlCtx)
# Train the model using the training sets
regr.fit(diabetes_X_train, diabetes_y_train)
In [ ]:
predictions = regr.predict(diabetes_X_test)
In [ ]:
# Use the trained model to perform prediction
%matplotlib inline
plt.scatter(diabetes_X_train, diabetes_y_train, color='black')
plt.scatter(diabetes_X_test, diabetes_y_test, color='red')
plt.plot(diabetes_X_test, predictions, color='black')