In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
sys.path.append("..")

In [5]:
from optimus import Optimus
op = Optimus()

Create a Spark Dataframe


In [6]:
# make some test data
columns = ['todo_id']
vals = [1,2,3,4,5,6,7,8,9]

# create DataFrame
df = op.create.df(columns,vals).repartition(1).cache()
df.table()


Viewing 9 of 9 rows / 1 columns
1 partition(s)
todo_id
1 (string)
nullable
1
2
3
4
5
6
7
8
9
Viewing 9 of 9 rows / 1 columns
1 partition(s)

In [39]:
import requests

def func_request(params):
    # You can use here whatever header or auth info you need to send. 
    # For more information see the requests library
    url= "https://jsonplaceholder.typicode.com/todos/" + str(params["todo_id"])

    return requests.get(url)

def func_response(response):
    # Here you can parse de response
    return response["title"]

e = op.enrich(host="localhost", port=27017, db_name="jazz")
e.flush()
df_result = e.run(df, func_request, func_response, calls= 60, period = 60, max_tries = 8)




In [1]:
df_result.table()


---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-1-0114f9bbdc31> in <module>
----> 1 df_result.table()

NameError: name 'df_result' is not defined

In [ ]: