In [1]:
%load_ext autoreload
%autoreload 2
import sys
sys.path.append("..")
from optimus import Optimus
# Create optimus
op = Optimus(master="local", app_name= "optimus", verbose = True)
C:\Users\argenisleon\Anaconda3\lib\site-packages\socks.py:58: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import Callable
You are using PySparkling of version 2.4.10, but your PySpark is of
version 2.3.1. Please make sure Spark and PySparkling versions are compatible.
`formatargspec` is deprecated since Python 3.5. Use `signature` and the `Signature` object directly
INFO:optimus:Operative System:Windows
INFO:optimus:Just check that Spark and all necessary environments vars are present...
INFO:optimus:-----
INFO:optimus:SPARK_HOME=C:\opt\spark\spark-2.3.1-bin-hadoop2.7
INFO:optimus:HADOOP_HOME=C:\opt\hadoop-2.7.7
INFO:optimus:PYSPARK_PYTHON=C:\Users\argenisleon\Anaconda3\python.exe
INFO:optimus:PYSPARK_DRIVER_PYTHON=jupyter
INFO:optimus:PYSPARK_SUBMIT_ARGS=--jars "file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-redis-2.4.1-SNAPSHOT-jar-with-dependencies.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/presto-jdbc-0.224.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-cassandra-connector_2.11-2.4.1.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/sqlite-jdbc-3.27.2.1.jar,file:///C:/Users/argenisleon/Documents/Optimus/optimus/jars/mssql-jdbc-7.4.1.jre8.jar" --driver-class-path "C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-redis-2.4.1-SNAPSHOT-jar-with-dependencies.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/RedshiftJDBC42-1.2.16.1027.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mysql-connector-java-8.0.16.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/ojdbc8.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/postgresql-42.2.5.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/presto-jdbc-0.224.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/spark-cassandra-connector_2.11-2.4.1.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/sqlite-jdbc-3.27.2.1.jar;C:/Users/argenisleon/Documents/Optimus/optimus/jars/mssql-jdbc-7.4.1.jre8.jar" --conf "spark.sql.catalogImplementation=hive" pyspark-shell
INFO:optimus:JAVA_HOME=C:\java
INFO:optimus:Pyarrow Installed
INFO:optimus:-----
INFO:optimus:Starting or getting SparkSession and SparkContext...
INFO:optimus:Spark Version:2.3.1
INFO:optimus:
____ __ _
/ __ \____ / /_(_)___ ___ __ _______
/ / / / __ \/ __/ / __ `__ \/ / / / ___/
/ /_/ / /_/ / /_/ / / / / / / /_/ (__ )
\____/ .___/\__/_/_/ /_/ /_/\__,_/____/
/_/
INFO:optimus:Transform and Roll out...
INFO:optimus:Optimus successfully imported. Have fun :).
INFO:optimus:Config.ini not found
In [2]:
# Put your db credentials here
db = op.connect(
driver="mysql",
host="165.227.196.70",
database= "optimus",
user= "test",
password = "test")
INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
In [3]:
db.tables()
INFO:optimus:(SELECT table_name, table_rows FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'optimus') AS t
INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
Viewing 1 of 1 rows / 2 columns
1 partition(s)
TABLE_NAME
1 (string)
nullable
TABLE_ROWS
2 (decimal(20,0))
nullable
test_data
100
Viewing 1 of 1 rows / 2 columns
1 partition(s)
In [4]:
db.table_to_df("test_data").table()
INFO:optimus:SELECT * FROM test_data
INFO:optimus:(SELECT * FROM test_data) AS t
INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
Viewing 10 of 10 rows / 6 columns
1 partition(s)
id
1 (int)
nullable
first_name
2 (string)
nullable
last_name
3 (string)
nullable
email
4 (string)
nullable
gender
5 (string)
nullable
ip_address
6 (string)
nullable
1
Ikey
Crudginton
icrudginton0@freewebs.com
Male
72.210.21.255
2
Erwin
Edden
eedden1@nytimes.com
Male
16.205.155.142
3
Rudyard
Dullaghan
rdullaghan2@techcrunch.com
Male
84.170.67.167
4
Eugen
Staining
estaining3@merriam-webster.com
Male
211.36.45.228
5
Carleton
Hammond
chammond4@example.com
Male
177.7.250.134
6
Ermengarde
Knightly
eknightly5@google.co.jp
Female
231.176.117.190
7
Myles
Rattray
mrattray6@about.com
Male
4.193.247.67
8
Banky
Shires
bshires7@so-net.ne.jp
Male
16.18.210.158
9
Chastity
Birtwell
cbirtwell8@seesaa.net
Female
167.15.222.219
10
Harv
Fotherby
hfotherby9@godaddy.com
Male
143.117.248.106
Viewing 10 of 10 rows / 6 columns
1 partition(s)
In [64]:
df = db.table_to_df("test_data", limit=None)
INFO:optimus:SELECT * FROM test_data
INFO:optimus:(SELECT * FROM test_data) AS t
INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
In [65]:
db.tables_names_to_json()
INFO:optimus:(SELECT TABLE_NAME AS table_name FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'optimus' GROUP BY TABLE_NAME) AS t
INFO:optimus:jdbc:mysql://165.227.196.70:3306/optimus?currentSchema=public
Out[65]:
['test_data']
In [66]:
# Put your db credentials here
db = op.connect(
driver="postgres",
host="165.227.196.70",
database= "optimus",
user= "testuser",
password = "test")
INFO:optimus:jdbc:postgresql://165.227.196.70:5432/optimus?currentSchema=public
In [67]:
db.tables()
INFO:optimus:(
SELECT relname as table_name,cast (reltuples as integer) AS count
FROM pg_class C LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE nspname IN ('public') AND relkind='r' ORDER BY reltuples DESC) AS t
INFO:optimus:jdbc:postgresql://165.227.196.70:5432/optimus?currentSchema=public
Viewing 1 of 1 rows / 2 columns
1 partition(s)
table_name
1 (string)
nullable
count
2 (int)
nullable
test_data
1100
Viewing 1 of 1 rows / 2 columns
1 partition(s)
In [68]:
db.table_to_df("test_data").table()
INFO:optimus:SELECT * FROM test_data
INFO:optimus:(SELECT * FROM test_data) AS t
INFO:optimus:jdbc:postgresql://165.227.196.70:5432/optimus?currentSchema=public
Viewing 10 of 10 rows / 6 columns
1 partition(s)
id
1 (int)
nullable
first_name
2 (string)
nullable
last_name
3 (string)
nullable
email
4 (string)
nullable
gender
5 (string)
nullable
ip_address
6 (string)
nullable
1
Ikey
Crudginton
icrudginton0@freewebs.com
Male
72.210.21.255
2
Erwin
Edden
eedden1@nytimes.com
Male
16.205.155.142
3
Rudyard
Dullaghan
rdullaghan2@techcrunch.com
Male
84.170.67.167
4
Eugen
Staining
estaining3@merriam-webster.com
Male
211.36.45.228
5
Carleton
Hammond
chammond4@example.com
Male
177.7.250.134
6
Ermengarde
Knightly
eknightly5@google.co.jp
Female
231.176.117.190
7
Myles
Rattray
mrattray6@about.com
Male
4.193.247.67
8
Banky
Shires
bshires7@so-net.ne.jp
Male
16.18.210.158
9
Chastity
Birtwell
cbirtwell8@seesaa.net
Female
167.15.222.219
10
Harv
Fotherby
hfotherby9@godaddy.com
Male
143.117.248.106
Viewing 10 of 10 rows / 6 columns
1 partition(s)
In [69]:
db.tables_names_to_json()
INFO:optimus:(
SELECT relname as table_name
FROM pg_class C LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE nspname IN ('public') AND relkind='r' ORDER BY reltuples DESC) AS t
INFO:optimus:jdbc:postgresql://165.227.196.70:5432/optimus?currentSchema=public
Out[69]:
['test_data']
In [70]:
# Put your db credentials here
db = op.connect(
driver="sqlserver",
host="165.227.196.70",
database= "optimus",
user= "test",
password = "test*0261")
INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
In [71]:
db.tables()
INFO:optimus:(SELECT * FROM INFORMATION_SCHEMA.TABLES) AS t
INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
Viewing 1 of 1 rows / 4 columns
1 partition(s)
TABLE_CATALOG
1 (string)
nullable
TABLE_SCHEMA
2 (string)
nullable
TABLE_NAME
3 (string)
nullable
TABLE_TYPE
4 (string)
nullable
optimus
dbo
test_data
BASE⋅TABLE
Viewing 1 of 1 rows / 4 columns
1 partition(s)
In [72]:
db.table_to_df("test_data").table()
INFO:optimus:SELECT * FROM test_data
INFO:optimus:(SELECT * FROM test_data) AS t
INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
Viewing 10 of 10 rows / 6 columns
1 partition(s)
id
1 (int)
nullable
first_name
2 (string)
nullable
last_name
3 (string)
nullable
email
4 (string)
nullable
gender
5 (string)
nullable
ip_address
6 (string)
nullable
1
Keenan
McAirt
kmcairt0@spotify.com
Male
68.97.227.147
2
Fredelia
Lemarie
flemarie1@furl.net
Female
16.145.123.46
1
Keenan
McAirt
kmcairt0@spotify.com
Male
68.97.227.147
2
Fredelia
Lemarie
flemarie1@furl.net
Female
16.145.123.46
1
Keenan
McAirt
kmcairt0@spotify.com
Male
68.97.227.147
2
Fredelia
Lemarie
flemarie1@furl.net
Female
16.145.123.46
1
Keenan
McAirt
kmcairt0@spotify.com
Male
68.97.227.147
2
Fredelia
Lemarie
flemarie1@furl.net
Female
16.145.123.46
2
Fredelia
Lemarie
flemarie1@furl.net
Female
16.145.123.46
1
Evyn
Abbey
eabbey0@mlb.com
Male
202.99.246.227
Viewing 10 of 10 rows / 6 columns
1 partition(s)
In [73]:
db.tables_names_to_json()
INFO:optimus:(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES) AS t
INFO:optimus:jdbc:sqlserver://165.227.196.70:1433;databaseName=optimus
Out[73]:
['test_data']
In [2]:
# Put your db credentials here
db = op.connect(
driver="redshift",
host="165.227.196.70",
database= "optimus",
user= "testuser",
password = "test")
INFO:optimus:jdbc:redshift://redshift-cluster-1.chuvgsqx7epn.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=public
In [3]:
db.tables()
INFO:optimus:(
SELECT relname as table_name,cast (reltuples as integer) AS count
FROM pg_class C LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE nspname IN ('public') AND relkind='r' ORDER BY reltuples DESC) AS t
INFO:optimus:jdbc:redshift://redshift-cluster-1.chuvgsqx7epn.us-east-1.redshift.amazonaws.com:5439/dev?currentSchema=public
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-3-cdef22199e9a> in <module>
----> 1 db.tables()
~\Documents\Optimus\optimus\io\jdbc.py in tables(self, schema, database, limit)
179 FROM user_tables ORDER BY table_name"""
180
--> 181 df = self.execute(query, limit)
182 return df.table(limit)
183
~\Documents\Optimus\optimus\io\jdbc.py in execute(self, query, limit)
309 conf.options(table=self.cassandra_table, keyspace=self.cassandra_keyspace)
310
--> 311 return self._limit(conf.load(), limit)
312
313 def df_to_table(self, df, table, mode="overwrite"):
~\Anaconda3\lib\site-packages\pyspark\sql\readwriter.py in load(self, path, format, schema, **options)
170 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
171 else:
--> 172 return self._df(self._jreader.load())
173
174 @since(1.4)
~\Anaconda3\lib\site-packages\py4j\java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~\Anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~\Anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o41.load.
: java.sql.SQLException: [Amazon](500150) Error setting/closing connection: Connection timed out: connect.
at com.amazon.redshift.client.PGClient.connect(Unknown Source)
at com.amazon.redshift.client.PGClient.<init>(Unknown Source)
at com.amazon.redshift.core.PGJDBCConnection.connect(Unknown Source)
at com.amazon.jdbc.common.BaseConnectionFactory.doConnect(Unknown Source)
at com.amazon.jdbc.common.AbstractDriver.connect(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
Caused by: com.amazon.support.exceptions.GeneralException: [Amazon](500150) Error setting/closing connection: Connection timed out: connect.
... 24 more
Caused by: java.net.ConnectException: Connection timed out: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:454)
at sun.nio.ch.Net.connect(Net.java:446)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:96)
at com.amazon.redshift.client.PGClient.connect(Unknown Source)
at com.amazon.redshift.client.PGClient.<init>(Unknown Source)
at com.amazon.redshift.core.PGJDBCConnection.connect(Unknown Source)
at com.amazon.jdbc.common.BaseConnectionFactory.doConnect(Unknown Source)
at com.amazon.jdbc.common.AbstractDriver.connect(Unknown Source)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
In [ ]:
db.table_to_df("test_data").table()
In [ ]:
# Put your db credentials here
db = op.connect(
driver="oracle",
host="165.227.196.70",
database= "optimus",
user= "testuser",
password = "test")
In [53]:
# Put your db credentials here
db = op.connect(
driver="sqlite",
host="chinook.db",
database= "employes",
user= "testuser",
password = "test")
INFO:optimus:jdbc:sqlite:chinook.db
In [54]:
db.tables()
INFO:optimus:(SELECT name FROM sqlite_master WHERE type='table') AS t
INFO:optimus:jdbc:sqlite:chinook.db
Viewing 10 of 10 rows / 1 columns
1 partition(s)
name
1 (string)
nullable
albums
sqlite_sequence
artists
customers
employees
genres
invoices
invoice_items
media_types
playlists
Viewing 10 of 10 rows / 1 columns
1 partition(s)
In [55]:
db.table_to_df("albums",limit="all").table()
INFO:optimus:(SELECT COUNT(*) as COUNT FROM albums) AS t
INFO:optimus:jdbc:sqlite:chinook.db
347 rows
INFO:optimus:SELECT * FROM albums
INFO:optimus:(SELECT * FROM albums) AS t
INFO:optimus:jdbc:sqlite:chinook.db
Viewing 10 of 347 rows / 3 columns
1 partition(s)
AlbumId
1 (int)
nullable
Title
2 (string)
nullable
ArtistId
3 (int)
nullable
1
For⋅Those⋅About⋅To⋅Rock⋅We⋅Salute⋅You
1
2
Balls⋅to⋅the⋅Wall
2
3
Restless⋅and⋅Wild
2
4
Let⋅There⋅Be⋅Rock
1
5
Big⋅Ones
3
6
Jagged⋅Little⋅Pill
4
7
Facelift
5
8
Warner⋅25⋅Anos
6
9
Plays⋅Metallica⋅By⋅Four⋅Cellos
7
10
Audioslave
8
Viewing 10 of 347 rows / 3 columns
1 partition(s)
In [60]:
db.tables_names_to_json()
INFO:optimus:(SELECT name FROM sqlite_master WHERE type='table') AS t
INFO:optimus:jdbc:sqlite:chinook.db
Out[60]:
['albums',
'sqlite_sequence',
'artists',
'customers',
'employees',
'genres',
'invoices',
'invoice_items',
'media_types',
'playlists',
'playlist_track',
'tracks',
'sqlite_stat1']
In [2]:
df = op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.csv", sep=",", header='true', infer_schema='true', charset="UTF-8", null_value="None")
INFO:optimus:Downloading foo.csv from https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.csv
INFO:optimus:Downloaded 967 bytes
INFO:optimus:Creating DataFrame for foo.csv. Please wait...
In [3]:
df.table()
Viewing 10 of 19 rows / 8 columns
1 partition(s)
id
1 (int)
nullable
firstName
2 (string)
nullable
lastName
3 (string)
nullable
billingId
4 (int)
nullable
product
5 (string)
nullable
price
6 (int)
nullable
birth
7 (string)
nullable
dummyCol
8 (string)
nullable
1
Luis
Alvarez$$%!
123
Cake
10
1980/07/07
never
2
André
Ampère
423
piza
8
1950/07/08
gonna
3
NiELS
Böhr//((%%
551
pizza
8
1990/07/09
give
4
PAUL
dirac$
521
pizza
8
1954/07/10
you
5
Albert
Einstein
634
pizza
8
1990/07/11
up
6
Galileo
⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅⋅GALiLEI
672
arepa
5
1930/08/12
never
7
CaRL
Ga%%%uss
323
taco
3
1970/07/13
gonna
8
David
H$$$ilbert
624
taaaccoo
3
1950/07/14
let
9
Johannes
KEPLER
735
taco
3
1920/04/22
you
10
JaMES
M$$ax%%well
875
taco
3
1923/03/12
down
Viewing 10 of 19 rows / 8 columns
1 partition(s)
In [30]:
# Put your db credentials here
db = op.connect(
driver="redis",
host="165.227.196.70",
port = 6379,
database= 1,
password = "")
In [32]:
db.df_to_table(df, "hola1", redis_primary_key="id")
INFO:optimus:`id`,`firstName`,`lastName`,`billingId`,`product`,`price`,`birth`,`dummyCol` column(s) was not processed because is/are not array,vector
INFO:optimus:Outputting 0 columns after filtering. Is this expected?
INFO:optimus:Using 'column_exp' to process column 'id' with function _cast_to
INFO:optimus:Using 'column_exp' to process column 'firstName' with function _cast_to
INFO:optimus:Using 'column_exp' to process column 'lastName' with function _cast_to
INFO:optimus:Using 'column_exp' to process column 'billingId' with function _cast_to
INFO:optimus:Using 'column_exp' to process column 'product' with function _cast_to
INFO:optimus:Using 'column_exp' to process column 'price' with function _cast_to
INFO:optimus:Using 'column_exp' to process column 'birth' with function _cast_to
INFO:optimus:Using 'column_exp' to process column 'dummyCol' with function _cast_to
hola1
In [33]:
# https://stackoverflow.com/questions/56707978/how-to-write-from-a-pyspark-dstream-to-redis
db.table_to_df(0)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-33-b3e61395c772> in <module>
1 # https://stackoverflow.com/questions/56707978/how-to-write-from-a-pyspark-dstream-to-redis
2
----> 3 db.table_to_df(0)
~\Documents\Optimus\optimus\io\jdbc.py in table_to_df(self, table_name, columns, limit)
122
123 db_table = table_name
--> 124 query = self.driver_context.count_query(db_table=db_table)
125 if limit == "all":
126 count = self.execute(query, "all").first()[0]
~\Documents\Optimus\optimus\io\driver_context.py in count_query(self, *args, **kwargs)
31
32 def count_query(self, *args, **kwargs) -> str:
---> 33 return self._driver.count_query(*args, **kwargs)
~\Documents\Optimus\optimus\io\sqlserver.py in count_query(self, *args, **kwargs)
24
25 def count_query(self, *args, **kwargs) -> str:
---> 26 return "SELECT COUNT(*) as COUNT FROM " + kwargs["db_table"]
TypeError: can only concatenate str (not "int") to str
In [ ]: