In [21]:
lines = sc.textFile("municipios_do_Brasil.csv")
data = lines.map(lambda line: line.split(","))
print(data.take(5))
[['uf', 'cidade', 'lat', 'lng', 'cond'], ['AC', 'Acrelândia', '-9.825808', '-66.897166', 'false'], ['AC', 'Assis Brasil', '-10.9409203', '-69.5672108', 'false'], ['AC', 'Brasiléia', '-11.0012764', '-68.7487974', 'false'], ['AC', 'Bujari', '-9.8309656', '-67.9520886', 'false']]
In [22]:
df = spark.read.csv("municipios_do_Brasil.csv", header=True)
In [23]:
df.show()
+---+--------------------+------------+------------+-----+
| uf| cidade| lat| lng| cond|
+---+--------------------+------------+------------+-----+
| AC| Acrelândia| -9.825808| -66.897166|false|
| AC| Assis Brasil| -10.9409203| -69.5672108|false|
| AC| Brasiléia| -11.0012764| -68.7487974|false|
| AC| Bujari| -9.8309656| -67.9520886|false|
| AC| Capixaba| -10.5729683| -67.6760894|false|
| AC| Cruzeiro do Sul| -7.6307956| -72.6703869|false|
| AC| Epitaciolândia| -11.0289439| -68.7411519|false|
| AC| Feijó| -8.1639128| -70.3541781|false|
| AC| Jordão| -9.4338167| -71.8843997|false|
| AC| Mâncio Lima| -7.6137775| -72.8964167|false|
| AC| Manoel Urbano| -8.8389428| -69.2601292|false|
| AC|Marechal Thaumaturgo| -8.9407511| -72.79151|false|
| AC| Plácido de Castro| -10.2759758| -67.1500664|false|
| AC| Porto Acre| -9.5879722| -67.53307|false|
| AC| Porto Walter| -8.2687722| -72.7444458|false|
| AC| Rio Branco|-9.976536213|-67.82207776| true|
| AC| Rodrigues Alves| -7.7417936| -72.6473894|false|
| AC| Santa Rosa do Purus| -9.4338948| -70.4909019|false|
| AC| Sena Madureira| -9.0659556| -68.6571058|false|
| AC| Senador Guiomard| -10.1509675| -67.7360856|false|
+---+--------------------+------------+------------+-----+
only showing top 20 rows
In [24]:
df.printSchema()
root
|-- uf: string (nullable = true)
|-- cidade: string (nullable = true)
|-- lat: string (nullable = true)
|-- lng: string (nullable = true)
|-- cond: string (nullable = true)
In [25]:
# Select only the "name" column
df.select("cidade").show()
+--------------------+
| cidade|
+--------------------+
| Acrelândia|
| Assis Brasil|
| Brasiléia|
| Bujari|
| Capixaba|
| Cruzeiro do Sul|
| Epitaciolândia|
| Feijó|
| Jordão|
| Mâncio Lima|
| Manoel Urbano|
|Marechal Thaumaturgo|
| Plácido de Castro|
| Porto Acre|
| Porto Walter|
| Rio Branco|
| Rodrigues Alves|
| Santa Rosa do Purus|
| Sena Madureira|
| Senador Guiomard|
+--------------------+
only showing top 20 rows
In [29]:
df.count()
Out[29]:
5563
In [30]:
df.approx_count_distinct("uf")
Out[30]:
Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
File "/var/folders/dv/zcjn7sq9799b_7vv7n926t0r0000gp/T/kernel-PySpark-044e9b24-b294-4f3f-a7c4-75af4a3df7df/pyspark_runner.py", line 194, in <module>
eval(compiled_code)
File "<string>", line 1, in <module>
File "/Applications/spark-2.2.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 1020, in __getattr__
"'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute 'approx_count_distinct'
StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
scala.Option.foreach(Option.scala:257)
org.apache.toree.interpreter.broker.BrokerState.markFailure(BrokerState.scala:162)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:497)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:280)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:214)
java.lang.Thread.run(Thread.java:745)
In [26]:
df.filter(df['uf'] == "CE").show()
+---+-----------------+----------+-----------+-----+
| uf| cidade| lat| lng| cond|
+---+-----------------+----------+-----------+-----+
| CE| Abaiara| -7.345879| -39.04159|false|
| CE| Acarape| -4.22083| -38.705532|false|
| CE| Acaraú| -2.887689| -40.118306|false|
| CE| Acopiara| -6.089114| -39.448042|false|
| CE| Aiuaba|-6.5737801|-40.1269838|false|
| CE| Alcântaras| -3.585369| -40.547867|false|
| CE| Altaneira| -6.988047| -39.748055|false|
| CE| Alto Santo| -5.508941| -38.274318|false|
| CE| Amontada| -3.360166| -39.828816|false|
| CE|Antonina do Norte| -6.769193| -39.98697|false|
| CE| Apuiarés| -3.945057| -39.43593|false|
| CE| Aquiraz|-3.9063524|-38.3877096|false|
| CE| Aracati| -4.558259| -37.767894|false|
| CE| Aracoiaba| -4.36872| -38.812512|false|
| CE| Ararendá| -4.745667| -40.83099|false|
| CE| Araripe| -7.213195| -40.135894|false|
| CE| Aratuba| -4.412291| -39.047117|false|
| CE| Arneiroz| -6.316499| -40.165285|false|
| CE| Assaré| -6.866897| -39.868872|false|
| CE| Aurora| 0| 0|false|
+---+-----------------+----------+-----------+-----+
only showing top 20 rows
In [27]:
df.groupBy("uf").count().show()
+---+-----+
| uf|count|
+---+-----+
| SC| 293|
| RO| 67|
| PI| 224|
| AM| 62|
| GO| 246|
| TO| 137|
| MT| 141|
| SP| 645|
| ES| 78|
| PB| 223|
| RS| 496|
| MS| 78|
| AL| 102|
| MG| 853|
| PA| 143|
| BA| 417|
| SE| 75|
| PE| 185|
| CE| 184|
| RN| 167|
+---+-----+
only showing top 20 rows
In [ ]:
Content source: abevieiramota/data-science-cookbook
Similar notebooks: