In [45]:
    
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PythonPi").getOrCreate()
#partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
partitions = 10
n = 100000 * partitions
def f(_):
    x = random() * 2 - 1
    y = random() * 2 - 1
    return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
    
    
In [46]:
    
sourceBytes = '                                                                                         \n\
import sys                                                                                              \n\
from random import random                                                                               \n\
from operator import add                                                                                \n\
                                                                                                        \n\
from pyspark.sql import SparkSession                                                                    \n\
                                                                                                        \n\
spark = SparkSession.builder.appName("PythonPi").getOrCreate()                                          \n\
                                                                                                        \n\
partitions = 10                                                                                         \n\
n = 100000 * partitions                                                                                 \n\
                                                                                                        \n\
def f(_):                                                                                               \n\
    x = random() * 2 - 1                                                                                \n\
    y = random() * 2 - 1                                                                                \n\
    return 1 if x ** 2 + y ** 2 <= 1 else 0                                                             \n\
                                                                                                        \n\
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)                  \n\
print("Pi is roughly %f" % (4.0 * count / n))                                                           \n\
                                                                                                        \n\
spark.stop()                                                                                            \n\
'.encode('utf-8')
    
In [47]:
    
print(sourceBytes.decode("utf-8"))
    
    
In [54]:
    
# This can be removed once the Docker file for jupyterhb is intact
!cd ~ && rm scripts && ln -s /root/pipeline/jupyterhub.ml/scripts
    
In [55]:
    
!ls ~/scripts/pi
    
    
In [56]:
    
!mkdir -p ~/scripts/pi/
with open('/root/scripts/pi/pi.py', 'wb') as f:
  f.write(sourceBytes)
!cat ~/scripts/pi/pi.py
    
    
In [57]:
    
!git status
    
    
In [59]:
    
!git add --all ~/scripts
    
In [60]:
    
!git status
    
    
In [19]:
    
!git commit -m "updated pyspark scripts"
    
    
In [20]:
    
!git status
    
    
In [96]:
    
# If this fails with "Permission denied", use terminal within jupyter to manually `git push`
!git push
    
    
Airflow Workflow Deploys New PySpark Job through Github Post-Commit Webhook to Triggers
In [21]:
    
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from IPython.display import clear_output, Image, display, HTML
html = '<iframe width=100% height=500px src="http://demo.pipeline.io:8080/admin">'
display(HTML(html))
    
    
    
In [ ]: