In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
sys.path.append("..")

In [3]:
from optimus.livy import Livy


C:\Users\argenisleon\Anaconda3\lib\site-packages\socks.py:58: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  from collections import Callable

    You are using PySparkling of version 2.4.10, but your PySpark is of
    version 2.3.1. Please make sure Spark and PySparkling versions are compatible. 

In [25]:
HOST = 'http://46.101.172.155:8998'
livy = Livy(HOST)

In [26]:
# Create session
livy.start()


{'id': 52, 'name': None, 'appId': None, 'owner': None, 'proxyUser': None, 'state': 'starting', 'kind': 'pyspark', 'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None}, 'log': ['stdout: ', '\nstderr: ']}

In [27]:
# Get session info
livy.session()


Out[27]:
{'id': 52,
 'name': None,
 'appId': None,
 'owner': None,
 'proxyUser': None,
 'state': 'starting',
 'kind': 'pyspark',
 'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None},
 'log': ['stdout: ', '\nstderr: ']}

In [35]:
data = {'code': '1 + 1'}

response = livy.execute(data)
print(response)


{'id': 2, 'code': '1 + 1', 'state': 'available', 'output': {'status': 'ok', 'execution_count': 2, 'data': {'text/plain': '2'}}, 'progress': 1.0}

In [90]:
import textwrap, pprint
code  = """
    from optimus import Optimus    
    op = Optimus(spark)
    df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.csv")
    """

response = livy.submit(code)
print(response)


{'id': 33, 'state': 'running', 'output': None, 'progress': 0.0}

In [91]:
import textwrap, pprint
code  = """   
    import json
    json.dumps(df.to_json())
    """

response = livy.submit(code)
print(response)


{'id': 34, 'state': 'waiting', 'output': None, 'progress': 0.0}

In [92]:
a = livy.result()

In [96]:
import json
print(a)
json.loads(a)


'[{"id": 1, "product": "Cake", "firstName": "Luis", "price": 10, "lastName": "Alvarez$$%!", "billingId": 123, "dummyCol": "never", "birth": "1980/07/07"}, {"id": 2, "product": "piza", "firstName": "Andr\\u00e9", "price": 8, "lastName": "Amp\\u00e8re", "billingId": 423, "dummyCol": "gonna", "birth": "1950/07/08"}, {"id": 3, "product": "pizza", "firstName": "NiELS", "price": 8, "lastName": "B\\u00f6hr//((%%", "billingId": 551, "dummyCol": "give", "birth": "1990/07/09"}, {"id": 4, "product": "pizza", "firstName": "PAUL", "price": 8, "lastName": "dirac$", "billingId": 521, "dummyCol": "you", "birth": "1954/07/10"}, {"id": 5, "product": "pizza", "firstName": "Albert", "price": 8, "lastName": "Einstein", "billingId": 634, "dummyCol": "up", "birth": "1990/07/11"}, {"id": 6, "product": "arepa", "firstName": "Galileo", "price": 5, "lastName": "             GALiLEI", "billingId": 672, "dummyCol": "never", "birth": "1930/08/12"}, {"id": 7, "product": "taco", "firstName": "CaRL", "price": 3, "lastName": "Ga%%%uss", "billingId": 323, "dummyCol": "gonna", "birth": "1970/07/13"}, {"id": 8, "product": "taaaccoo", "firstName": "David", "price": 3, "lastName": "H$$$ilbert", "billingId": 624, "dummyCol": "let", "birth": "1950/07/14"}, {"id": 9, "product": "taco", "firstName": "Johannes", "price": 3, "lastName": "KEPLER", "billingId": 735, "dummyCol": "you", "birth": "1920/04/22"}, {"id": 10, "product": "taco", "firstName": "JaMES", "price": 3, "lastName": "M$$ax%%well", "billingId": 875, "dummyCol": "down", "birth": "1923/03/12"}, {"id": 11, "product": "pasta", "firstName": "Isaac", "price": 9, "lastName": "Newton", "billingId": 992, "dummyCol": "never ", "birth": "1999/02/15"}, {"id": 12, "product": "pasta", "firstName": "Emmy%%", "price": 9, "lastName": "N\\u00f6ether$", "billingId": 234, "dummyCol": "gonna", "birth": "1993/12/08"}, {"id": 13, "product": "hamburguer", "firstName": "Max!!!", "price": 4, "lastName": "Planck!!!", "billingId": 111, "dummyCol": "run ", "birth": "1994/01/04"}, {"id": 14, "product": "pizzza", "firstName": "Fred", "price": 8, "lastName": "Hoy&&&le", "billingId": 553, "dummyCol": "around", "birth": "1997/06/27"}, {"id": 15, "product": "pizza", "firstName": "(((   Heinrich )))))", "price": 8, "lastName": "Hertz", "billingId": 116, "dummyCol": "and", "birth": "1956/11/30"}, {"id": 16, "product": "BEER", "firstName": "William", "price": 2, "lastName": "Gilbert###", "billingId": 886, "dummyCol": "desert", "birth": "1958/03/26"}, {"id": 17, "product": "Rice", "firstName": "Marie", "price": 1, "lastName": "CURIE", "billingId": 912, "dummyCol": "you", "birth": "2000/03/22"}, {"id": 18, "product": "110790", "firstName": "Arthur", "price": 5, "lastName": "COM%%%pton", "billingId": 812, "dummyCol": "#", "birth": "1899/01/01"}, {"id": 19, "product": "null", "firstName": "JAMES", "price": 10, "lastName": "Chadwick", "billingId": 467, "dummyCol": "#", "birth": "1921/05/03"}]'
---------------------------------------------------------------------------
JSONDecodeError                           Traceback (most recent call last)
<ipython-input-96-0785c2deabf7> in <module>
      1 import json
      2 print(a)
----> 3 json.loads(a)

~\Anaconda3\lib\json\__init__.py in loads(s, encoding, cls, object_hook, parse_float, parse_int, parse_constant, object_pairs_hook, **kw)
    346             parse_int is None and parse_float is None and
    347             parse_constant is None and object_pairs_hook is None and not kw):
--> 348         return _default_decoder.decode(s)
    349     if cls is None:
    350         cls = JSONDecoder

~\Anaconda3\lib\json\decoder.py in decode(self, s, _w)
    335 
    336         """
--> 337         obj, end = self.raw_decode(s, idx=_w(s, 0).end())
    338         end = _w(s, end).end()
    339         if end != len(s):

~\Anaconda3\lib\json\decoder.py in raw_decode(self, s, idx)
    353             obj, end = self.scan_once(s, idx)
    354         except StopIteration as err:
--> 355             raise JSONDecodeError("Expecting value", s, err.value) from None
    356         return obj, end

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [89]:
{}


Out[89]:
{}

In [24]:
livy.delete_session(46)


http://46.101.172.155:8998/sessions/46

In [42]:
livy.sessions()


Out[42]:
[{'id': 52,
  'name': None,
  'appId': None,
  'owner': None,
  'proxyUser': None,
  'state': 'idle',
  'kind': 'pyspark',
  'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None},
  'log': ['19/06/04 03:52:01 INFO Executor: Fetching spark://10.19.0.5:42304/jars/livy-repl_2.11-0.6.0-incubating.jar with timestamp 1559619584198',
   '19/06/04 03:52:01 INFO Utils: Fetching spark://10.19.0.5:42304/jars/livy-repl_2.11-0.6.0-incubating.jar to /tmp/spark-e906068a-b29b-4594-aef1-58a2d37bde15/userFiles-d5869831-80e5-4da1-aa9c-37245d12c7f2/fetchFileTemp3930129784218452512.tmp',
   '19/06/04 03:52:01 INFO Executor: Adding file:/tmp/spark-e906068a-b29b-4594-aef1-58a2d37bde15/userFiles-d5869831-80e5-4da1-aa9c-37245d12c7f2/livy-repl_2.11-0.6.0-incubating.jar to class loader',
   '19/06/04 03:52:02 INFO PythonRunner: Times: total = 898, boot = 676, init = 55, finish = 167',
   '19/06/04 03:52:02 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1401 bytes result sent to driver',
   '19/06/04 03:52:02 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1515 ms on localhost (executor driver) (1/1)',
   '19/06/04 03:52:02 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool ',
   '19/06/04 03:52:02 INFO PythonAccumulatorV2: Connected to AccumulatorServer at host: 127.0.0.1 port: 49371',
   '19/06/04 03:52:02 INFO DAGScheduler: ResultStage 0 (reduce at <stdin>:8) finished in 1.819 s',
   '19/06/04 03:52:02 INFO DAGScheduler: Job 0 finished: reduce at <stdin>:8, took 1.931369 s']}]

In [ ]: