In [1]:
from __future__ import print_function
import os
import sys
In [2]:
# Someday this has to go in a config or kernel
spark_home = '/opt/spark/latest'
os.environ['SPARK_HOME'] = spark_home
sys.path.insert(0, spark_home + "/python")
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.1-src.zip'))
from pyspark.context import SparkContext
from pyspark.context import SparkConf
from pyspark.sql import SQLContext
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
It looks like we need to help out the mapper, es.field.read.as.array.include. How and where to set this?
https://spark.apache.org/docs/1.5.0/api/python/_modules/pyspark/conf.html http://stackoverflow.com/questions/32362783/how-to-change-sparkcontext-properties-in-interactive-pyspark-session
Looks like making a conf object before creating the context. Can we really not change this at runtime? " once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. Spark does not support modifying the configu$ "
In [3]:
# The conf is imutable so the config must be created first and then passed in. This
# has to be re-run whenever the config is changed.
# Doing the conf in advance isn't needed, can pass options at the time of reading.
conf = SparkConf()
#conf.set("driver-class-path",
# "/home/mcollins/elasticsearch-hadoop-2.4.0/dist/elasticsearch-spark_2.11-2.4.0.jar")
#conf.set("es.nodes",
# "c18node14.acis.ufl.edu,c18node2.acis.ufl.edu,c18node6.acis.ufl.edu,c18node10.acis.ufl.edu,c18node12.acis.ufl.edu")
#conf.set("es.read.field.as.array.include",
# "indexData")
# No combination of the options "es.read.field.as.array.include", "ES_READ_FIELD_AS_ARRAY_INCLUDE",
# "gbif:vernacularname" and "indexData.gbif:vernacularname" seems to make a difference in the
# schema that is read, it is always struct (actually I was using an older es.field.read... [note reversed words]
# flag which was changed last year.)
#
# Now using "indexData.gbif:vernacularname.dcterms:source" and "es.read.field.as.array.include" yeilds an
# exception about not being able to read things as arrays so that is the right syntax at least
# This doesn't work, stop and start the kernel instead.
try:
sc
except:
pass
else:
sc.stop()
sc = None
sqlContext = None
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
In [5]:
# Let's try loading some simple columns only
es_df = es_df = sqlContext.read.format("org.elasticsearch.spark.sql") \
.option("es.query", '{"query": {"bool": {"must": [{"term":{"genus":"acer"}}]}}}') \
.option("es.read.field.include", "uuid,genus") \
.option("es.nodes",
"c18node14.acis.ufl.edu,c18node2.acis.ufl.edu,c18node6.acis.ufl.edu,c18node10.acis.ufl.edu,c18node12.acis.ufl.edu") \
.load("idigbio-2.10.2/records")
# .option("es.read.field.as.array.include", "indexData") \
es_df.printSchema()
root
|-- genus: string (nullable = true)
|-- uuid: string (nullable = true)
In [6]:
es_df.head(10)
Out[6]:
[Row(genus='acer', uuid='0e64f89b-dfd8-43d0-bc05-112b63a79c0e'),
Row(genus='acer', uuid='0ec892d1-640a-4ea3-a8df-52cc376f9e03'),
Row(genus='acer', uuid='0ed80ce4-964c-4cbb-9aa6-6a299f295418'),
Row(genus='acer', uuid='0eeb178f-2756-4d74-ab54-0f0414573342'),
Row(genus='acer', uuid='0ef76673-adff-41d0-9e9d-f4c5896a68a5'),
Row(genus='acer', uuid='0f3a5453-e064-42a7-aff0-cda62761133d'),
Row(genus='acer', uuid='0f547150-a674-4c5f-b00f-f79687c6f605'),
Row(genus='acer', uuid='05fd5b82-ba78-4f9a-85fa-e7678b92aa48'),
Row(genus='acer', uuid='061df2e6-49d6-4e46-a868-6776c36e8d10'),
Row(genus='acer', uuid='0621c415-24aa-4344-bf1e-badf4e1e17f2')]
In [8]:
es_df.count()
Out[8]:
34658
In [7]:
es_df.write.parquet("test_acer")
In [15]:
acer_df = es_df.select(es_df.genus == "Acer")
acer_df.count()
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-15-e22d2d8dfbfe> in <module>()
1 acer_df = es_df.select(es_df.genus == "Acer")
----> 2 acer_df.count()
/opt/spark/latest/python/pyspark/sql/dataframe.py in count(self)
297 2
298 """
--> 299 return int(self._jdf.count())
300
301 @ignore_unicode_prefix
/opt/spark/latest/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
929 proto.END_COMMAND_PART
930
--> 931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
933 answer, self.gateway_client, self.target_id, self.name)
/opt/spark/latest/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in send_command(self, command, retry, binary)
693 connection = self._get_connection()
694 try:
--> 695 response = connection.send_command(command)
696 if binary:
697 return response, self._create_connection_guard(connection)
/opt/spark/latest/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in send_command(self, command)
826 self.socket.sendall(command.encode("utf-8"))
827
--> 828 answer = smart_decode(self.stream.readline()[:-1])
829 logger.debug("Answer received: {0}".format(answer))
830 if answer.startswith(proto.RETURN_MESSAGE):
/usr/lib/python3.5/socket.py in readinto(self, b)
573 while True:
574 try:
--> 575 return self._sock.recv_into(b)
576 except timeout:
577 self._timeout_occurred = True
KeyboardInterrupt:
About the ES connector: version 2.1 of the elasticsearch-hadoop connector (not 2.1 of spark!) has native support for Spark now, released around June 2016. https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
Video (fake registration works, real content @ about 9:00) https://www.elastic.co/webinars/adding-spark-and-security-to-elasticsearch-for-hadoop/?baymax=rtp&elektra=docs&iesrc=ctr
In [5]:
# This loads the schema but head(1) will throw an exception on various fields
# (depends on what gets read first) because the column type isn't an array and
# it encounters multi-valued data.
es_df = es_df = sqlContext.read.format("org.elasticsearch.spark.sql") \
.option("es.read.field.as.array.include", "indexData") \
.option("es.nodes",
"c18node14.acis.ufl.edu,c18node2.acis.ufl.edu,c18node6.acis.ufl.edu,c18node10.acis.ufl.edu,c18node12.acis.ufl.edu") \
.load("idigbio-2.10.2/records")
# Adding ,indexData.dwc:multimedia yeilds exception
es_df.printSchema()
root
|-- barcodevalue: string (nullable = true)
|-- basisofrecord: string (nullable = true)
|-- bed: string (nullable = true)
|-- canonicalname: string (nullable = true)
|-- catalognumber: string (nullable = true)
|-- class: string (nullable = true)
|-- collectioncode: string (nullable = true)
|-- collectionid: string (nullable = true)
|-- collectionname: string (nullable = true)
|-- collector: string (nullable = true)
|-- commonname: string (nullable = true)
|-- commonnames: string (nullable = true)
|-- continent: string (nullable = true)
|-- coordinateuncertainty: float (nullable = true)
|-- country: string (nullable = true)
|-- countrycode: string (nullable = true)
|-- county: string (nullable = true)
|-- data: struct (nullable = true)
| |-- aec:associatedTaxa: struct (nullable = true)
| | |-- aec:associatedAuthor: string (nullable = true)
| | |-- aec:associatedCommonName: string (nullable = true)
| | |-- aec:associatedCondition: string (nullable = true)
| | |-- aec:associatedDeterminedBy: string (nullable = true)
| | |-- aec:associatedEmergenceVerbatimDate: string (nullable = true)
| | |-- aec:associatedFamily: string (nullable = true)
| | |-- aec:associatedGenus: string (nullable = true)
| | |-- aec:associatedLocationOnHost: string (nullable = true)
| | |-- aec:associatedNotes: string (nullable = true)
| | |-- aec:associatedRelationshipTerm: string (nullable = true)
| | |-- aec:associatedRelationshipURI: string (nullable = true)
| | |-- aec:associatedScientificName: string (nullable = true)
| | |-- aec:associatedSpecificEpithet: string (nullable = true)
| | |-- aec:isCultivar: string (nullable = true)
| | |-- coreid: string (nullable = true)
| | |-- dwc:basisOfRecord: string (nullable = true)
| |-- coreid: string (nullable = true)
| |-- dc:rights: string (nullable = true)
| |-- dcterms:accessRights: string (nullable = true)
| |-- dcterms:bibliographicCitation: string (nullable = true)
| |-- dcterms:language: string (nullable = true)
| |-- dcterms:license: string (nullable = true)
| |-- dcterms:modified: string (nullable = true)
| |-- dcterms:references: string (nullable = true)
| |-- dcterms:rights: string (nullable = true)
| |-- dcterms:rightsHolder: string (nullable = true)
| |-- dcterms:source: string (nullable = true)
| |-- dcterms:type: string (nullable = true)
| |-- dwc:Identification: struct (nullable = true)
| | |-- coreid: string (nullable = true)
| | |-- dwc:class: string (nullable = true)
| | |-- dwc:dateIdentified: string (nullable = true)
| | |-- dwc:family: string (nullable = true)
| | |-- dwc:genus: string (nullable = true)
| | |-- dwc:higherClassification: string (nullable = true)
| | |-- dwc:identificationID: string (nullable = true)
| | |-- dwc:identificationQualifier: string (nullable = true)
| | |-- dwc:identificationReferences: string (nullable = true)
| | |-- dwc:identificationRemarks: string (nullable = true)
| | |-- dwc:identificationVerificationStatus: string (nullable = true)
| | |-- dwc:identifiedBy: string (nullable = true)
| | |-- dwc:infraspecificEpithet: string (nullable = true)
| | |-- dwc:kingdom: string (nullable = true)
| | |-- dwc:namePublishedInYear: string (nullable = true)
| | |-- dwc:nomenclaturalCode: string (nullable = true)
| | |-- dwc:nomenclaturalStatus: string (nullable = true)
| | |-- dwc:order: string (nullable = true)
| | |-- dwc:phylum: string (nullable = true)
| | |-- dwc:scientificName: string (nullable = true)
| | |-- dwc:scientificNameAuthorship: string (nullable = true)
| | |-- dwc:scientificNameID: string (nullable = true)
| | |-- dwc:specificEpithet: string (nullable = true)
| | |-- dwc:subgenus: string (nullable = true)
| | |-- dwc:taxonID: string (nullable = true)
| | |-- dwc:taxonRank: string (nullable = true)
| | |-- dwc:taxonRemarks: string (nullable = true)
| | |-- dwc:typeStatus: string (nullable = true)
| | |-- dwc:verbatimTaxonRank: string (nullable = true)
| | |-- dwc:vernacularName: string (nullable = true)
| | |-- idigbio:recordId: string (nullable = true)
| | |-- symbiota:tidInterpreted: string (nullable = true)
| |-- dwc:MeasurementOrFact: struct (nullable = true)
| | |-- coreid: string (nullable = true)
| | |-- dwc:measurementRemarks: string (nullable = true)
| | |-- dwc:measurementType: string (nullable = true)
| | |-- dwc:measurementUnit: string (nullable = true)
| | |-- dwc:measurementValue: string (nullable = true)
| |-- dwc:ResourceRelationship: struct (nullable = true)
| | |-- coreid: string (nullable = true)
| | |-- dwc:relatedResourceID: string (nullable = true)
| | |-- dwc:relationshipOfResource: string (nullable = true)
| | |-- dwc:scientificName: string (nullable = true)
| |-- dwc:VerbatimEventDate: string (nullable = true)
| |-- dwc:accessRights: string (nullable = true)
| |-- dwc:associatedMedia: string (nullable = true)
| |-- dwc:associatedOccurrences: string (nullable = true)
| |-- dwc:associatedReferences: string (nullable = true)
| |-- dwc:associatedSequences: string (nullable = true)
| |-- dwc:associatedTaxa: string (nullable = true)
| |-- dwc:basisOfRecord: string (nullable = true)
| |-- dwc:bed: string (nullable = true)
| |-- dwc:behavior: string (nullable = true)
| |-- dwc:catalogNumber: string (nullable = true)
| |-- dwc:class: string (nullable = true)
| |-- dwc:collectionCode: string (nullable = true)
| |-- dwc:collectionID: string (nullable = true)
| |-- dwc:continent: string (nullable = true)
| |-- dwc:coordinatePrecision: string (nullable = true)
| |-- dwc:coordinateUncertaintyInMeters: string (nullable = true)
| |-- dwc:country: string (nullable = true)
| |-- dwc:countryCode: string (nullable = true)
| |-- dwc:county: string (nullable = true)
| |-- dwc:dataGeneralizations: string (nullable = true)
| |-- dwc:datasetID: string (nullable = true)
| |-- dwc:datasetName: string (nullable = true)
| |-- dwc:dateIdentified: string (nullable = true)
| |-- dwc:day: string (nullable = true)
| |-- dwc:decimalLatitude: string (nullable = true)
| |-- dwc:decimalLongitude: string (nullable = true)
| |-- dwc:disposition: string (nullable = true)
| |-- dwc:dynamicProperties: string (nullable = true)
| |-- dwc:earliestAgeOrLowestStage: string (nullable = true)
| |-- dwc:earliestEonOrLowestEonothem: string (nullable = true)
| |-- dwc:earliestEpochOrLowestSeries: string (nullable = true)
| |-- dwc:earliestEraOrLowestErathem: string (nullable = true)
| |-- dwc:earliestPeriodOrLowestSystem: string (nullable = true)
| |-- dwc:endDayOfYear: string (nullable = true)
| |-- dwc:establishmentMeans: string (nullable = true)
| |-- dwc:eventDate: string (nullable = true)
| |-- dwc:eventID: string (nullable = true)
| |-- dwc:eventRemarks: string (nullable = true)
| |-- dwc:eventTime: string (nullable = true)
| |-- dwc:family: string (nullable = true)
| |-- dwc:fieldNotes: string (nullable = true)
| |-- dwc:fieldNumber: string (nullable = true)
| |-- dwc:footprintWKT: string (nullable = true)
| |-- dwc:formation: string (nullable = true)
| |-- dwc:genus: string (nullable = true)
| |-- dwc:geodeticDatum: string (nullable = true)
| |-- dwc:geologicalContextID: string (nullable = true)
| |-- dwc:georeferenceProtocol: string (nullable = true)
| |-- dwc:georeferenceRemarks: string (nullable = true)
| |-- dwc:georeferenceSources: string (nullable = true)
| |-- dwc:georeferenceVerificationStatus: string (nullable = true)
| |-- dwc:georeferencedBy: string (nullable = true)
| |-- dwc:georeferencedDate: string (nullable = true)
| |-- dwc:group: string (nullable = true)
| |-- dwc:habitat: string (nullable = true)
| |-- dwc:higherClassification: string (nullable = true)
| |-- dwc:higherGeography: string (nullable = true)
| |-- dwc:higherGeographyID: string (nullable = true)
| |-- dwc:highestBiostratigraphicZone: string (nullable = true)
| |-- dwc:identificationID: string (nullable = true)
| |-- dwc:identificationQualifier: string (nullable = true)
| |-- dwc:identificationReferences: string (nullable = true)
| |-- dwc:identificationRemarks: string (nullable = true)
| |-- dwc:identificationVerificationStatus: string (nullable = true)
| |-- dwc:identifiedBy: string (nullable = true)
| |-- dwc:individualCount: string (nullable = true)
| |-- dwc:individualID: string (nullable = true)
| |-- dwc:informationWithheld: string (nullable = true)
| |-- dwc:infraspecificEpithet: string (nullable = true)
| |-- dwc:institutionCode: string (nullable = true)
| |-- dwc:institutionID: string (nullable = true)
| |-- dwc:island: string (nullable = true)
| |-- dwc:islandGroup: string (nullable = true)
| |-- dwc:kingdom: string (nullable = true)
| |-- dwc:latestAgeOrHighestStage: string (nullable = true)
| |-- dwc:latestEonOrHighestEonothem: string (nullable = true)
| |-- dwc:latestEpochOrHighestSeries: string (nullable = true)
| |-- dwc:latestEraOrHighestErathem: string (nullable = true)
| |-- dwc:latestPeriodOrHighestSystem: string (nullable = true)
| |-- dwc:lifeStage: string (nullable = true)
| |-- dwc:lithostratigraphicTerms: string (nullable = true)
| |-- dwc:locality: string (nullable = true)
| |-- dwc:locationAccordingTo: string (nullable = true)
| |-- dwc:locationID: string (nullable = true)
| |-- dwc:locationRemarks: string (nullable = true)
| |-- dwc:lowestBiostratigraphicZone: string (nullable = true)
| |-- dwc:maximumDepthInMeters: string (nullable = true)
| |-- dwc:maximumElevationInMeters: string (nullable = true)
| |-- dwc:member: string (nullable = true)
| |-- dwc:minimumDepthInMeters: string (nullable = true)
| |-- dwc:minimumElevationInMeters: string (nullable = true)
| |-- dwc:month: string (nullable = true)
| |-- dwc:municipality: string (nullable = true)
| |-- dwc:nameAccordingTo: string (nullable = true)
| |-- dwc:nameAccordingToID: string (nullable = true)
| |-- dwc:namePublishedIn: string (nullable = true)
| |-- dwc:namePublishedInID: string (nullable = true)
| |-- dwc:namePublishedInYear: string (nullable = true)
| |-- dwc:nomenclaturalCode: string (nullable = true)
| |-- dwc:nomenclaturalStatus: string (nullable = true)
| |-- dwc:occurrenceDetails: string (nullable = true)
| |-- dwc:occurrenceID: string (nullable = true)
| |-- dwc:occurrenceRemarks: string (nullable = true)
| |-- dwc:occurrenceStatus: string (nullable = true)
| |-- dwc:order: string (nullable = true)
| |-- dwc:organismID: string (nullable = true)
| |-- dwc:originalNameUsage: string (nullable = true)
| |-- dwc:otherCatalogNumbers: string (nullable = true)
| |-- dwc:ownerInstitutionCode: string (nullable = true)
| |-- dwc:parentNameUsage: string (nullable = true)
| |-- dwc:phylum: string (nullable = true)
| |-- dwc:preparations: string (nullable = true)
| |-- dwc:previousIdentifications: string (nullable = true)
| |-- dwc:recordNumber: string (nullable = true)
| |-- dwc:recordedBy: string (nullable = true)
| |-- dwc:reproductiveCondition: string (nullable = true)
| |-- dwc:rights: string (nullable = true)
| |-- dwc:rightsHolder: string (nullable = true)
| |-- dwc:samplingEffort: string (nullable = true)
| |-- dwc:samplingProtocol: string (nullable = true)
| |-- dwc:scientificName: string (nullable = true)
| |-- dwc:scientificNameAuthorship: string (nullable = true)
| |-- dwc:scientificNameID: string (nullable = true)
| |-- dwc:sex: string (nullable = true)
| |-- dwc:specificEpithet: string (nullable = true)
| |-- dwc:startDayOfYear: string (nullable = true)
| |-- dwc:stateProvince: string (nullable = true)
| |-- dwc:subgenus: string (nullable = true)
| |-- dwc:taxonID: string (nullable = true)
| |-- dwc:taxonRank: string (nullable = true)
| |-- dwc:taxonRemarks: string (nullable = true)
| |-- dwc:taxonomicStatus: string (nullable = true)
| |-- dwc:typeStatus: string (nullable = true)
| |-- dwc:verbatimCoordinateSystem: string (nullable = true)
| |-- dwc:verbatimCoordinates: string (nullable = true)
| |-- dwc:verbatimDepth: string (nullable = true)
| |-- dwc:verbatimElevation: string (nullable = true)
| |-- dwc:verbatimEventDate: string (nullable = true)
| |-- dwc:verbatimLatitude: string (nullable = true)
| |-- dwc:verbatimLocality: string (nullable = true)
| |-- dwc:verbatimLongitude: string (nullable = true)
| |-- dwc:verbatimSRS: string (nullable = true)
| |-- dwc:verbatimTaxonRank: string (nullable = true)
| |-- dwc:vernacularName: string (nullable = true)
| |-- dwc:waterBody: string (nullable = true)
| |-- dwc:year: string (nullable = true)
| |-- fcc:datePicked: string (nullable = true)
| |-- fcc:pickedBy: string (nullable = true)
| |-- gbif:Identifier: struct (nullable = true)
| | |-- coreid: string (nullable = true)
| | |-- dcterms:identifier: string (nullable = true)
| | |-- dcterms:subject: string (nullable = true)
| |-- gbif:Reference: struct (nullable = true)
| | |-- coreid: string (nullable = true)
| | |-- dcterms:bibliographicCitation: string (nullable = true)
| | |-- dcterms:creator: string (nullable = true)
| | |-- dcterms:date: string (nullable = true)
| | |-- dcterms:description: string (nullable = true)
| | |-- dcterms:identifier: string (nullable = true)
| | |-- dcterms:source: string (nullable = true)
| | |-- dcterms:subject: string (nullable = true)
| | |-- dcterms:title: string (nullable = true)
| | |-- dcterms:type: string (nullable = true)
| | |-- dwc:taxonRemarks: string (nullable = true)
| |-- id: string (nullable = true)
| |-- idigbio:preservative: string (nullable = true)
| |-- idigbio:recordId: string (nullable = true)
| |-- idigbio:subfamily: string (nullable = true)
| |-- idigbio:substrate: string (nullable = true)
| |-- idigbio:superfamily: string (nullable = true)
| |-- symbiota:recordEnteredBy: string (nullable = true)
| |-- symbiota:verbatimScientificName: string (nullable = true)
|-- datecollected: timestamp (nullable = true)
|-- datemodified: timestamp (nullable = true)
|-- dqs: float (nullable = true)
|-- earliestageorloweststage: string (nullable = true)
|-- earliesteonorlowesteonothem: string (nullable = true)
|-- earliestepochorlowestseries: string (nullable = true)
|-- earliesteraorlowesterathem: string (nullable = true)
|-- earliestperiodorlowestsystem: string (nullable = true)
|-- etag: string (nullable = true)
|-- eventdate: string (nullable = true)
|-- family: string (nullable = true)
|-- fieldnumber: string (nullable = true)
|-- flags: string (nullable = true)
|-- formation: string (nullable = true)
|-- genus: string (nullable = true)
|-- geologicalcontextid: string (nullable = true)
|-- geopoint: struct (nullable = true)
| |-- lat: double (nullable = true)
| |-- lon: double (nullable = true)
|-- group: string (nullable = true)
|-- hasImage: boolean (nullable = true)
|-- hasMedia: boolean (nullable = true)
|-- highertaxon: string (nullable = true)
|-- highestbiostratigraphiczone: string (nullable = true)
|-- indexData: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- aec:associatedTaxa: struct (nullable = true)
| | | |-- aec:associatedAuthor: string (nullable = true)
| | | |-- aec:associatedCommonName: string (nullable = true)
| | | |-- aec:associatedCondition: string (nullable = true)
| | | |-- aec:associatedDeterminedBy: string (nullable = true)
| | | |-- aec:associatedEmergenceVerbatimDate: string (nullable = true)
| | | |-- aec:associatedFamily: string (nullable = true)
| | | |-- aec:associatedGenus: string (nullable = true)
| | | |-- aec:associatedLocationOnHost: string (nullable = true)
| | | |-- aec:associatedNotes: string (nullable = true)
| | | |-- aec:associatedRelationshipTerm: string (nullable = true)
| | | |-- aec:associatedRelationshipURI: string (nullable = true)
| | | |-- aec:associatedScientificName: string (nullable = true)
| | | |-- aec:associatedSpecificEpithet: string (nullable = true)
| | | |-- aec:isCultivar: string (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dwc:basisOfRecord: string (nullable = true)
| | |-- coreid: string (nullable = true)
| | |-- dc:rights: string (nullable = true)
| | |-- dcterms:accessRights: string (nullable = true)
| | |-- dcterms:bibliographicCitation: string (nullable = true)
| | |-- dcterms:language: string (nullable = true)
| | |-- dcterms:license: string (nullable = true)
| | |-- dcterms:modified: string (nullable = true)
| | |-- dcterms:references: string (nullable = true)
| | |-- dcterms:rights: string (nullable = true)
| | |-- dcterms:rightsHolder: string (nullable = true)
| | |-- dcterms:source: string (nullable = true)
| | |-- dcterms:type: string (nullable = true)
| | |-- dwc:Identification: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dwc:class: string (nullable = true)
| | | |-- dwc:dateIdentified: string (nullable = true)
| | | |-- dwc:family: string (nullable = true)
| | | |-- dwc:genus: string (nullable = true)
| | | |-- dwc:higherClassification: string (nullable = true)
| | | |-- dwc:identificationID: string (nullable = true)
| | | |-- dwc:identificationQualifier: string (nullable = true)
| | | |-- dwc:identificationReferences: string (nullable = true)
| | | |-- dwc:identificationRemarks: string (nullable = true)
| | | |-- dwc:identificationVerificationStatus: string (nullable = true)
| | | |-- dwc:identifiedBy: string (nullable = true)
| | | |-- dwc:infraspecificEpithet: string (nullable = true)
| | | |-- dwc:kingdom: string (nullable = true)
| | | |-- dwc:namePublishedInYear: string (nullable = true)
| | | |-- dwc:nomenclaturalCode: string (nullable = true)
| | | |-- dwc:nomenclaturalStatus: string (nullable = true)
| | | |-- dwc:order: string (nullable = true)
| | | |-- dwc:phylum: string (nullable = true)
| | | |-- dwc:scientificName: string (nullable = true)
| | | |-- dwc:scientificNameAuthorship: string (nullable = true)
| | | |-- dwc:scientificNameID: string (nullable = true)
| | | |-- dwc:specificEpithet: string (nullable = true)
| | | |-- dwc:subgenus: string (nullable = true)
| | | |-- dwc:taxonID: string (nullable = true)
| | | |-- dwc:taxonRank: string (nullable = true)
| | | |-- dwc:taxonRemarks: string (nullable = true)
| | | |-- dwc:typeStatus: string (nullable = true)
| | | |-- dwc:verbatimTaxonRank: string (nullable = true)
| | | |-- dwc:vernacularName: string (nullable = true)
| | | |-- idigbio:recordId: string (nullable = true)
| | | |-- symbiota:tidInterpreted: string (nullable = true)
| | |-- dwc:MeasurementOrFact: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dwc:measurementRemarks: string (nullable = true)
| | | |-- dwc:measurementType: string (nullable = true)
| | | |-- dwc:measurementUnit: string (nullable = true)
| | | |-- dwc:measurementValue: string (nullable = true)
| | |-- dwc:ResourceRelationship: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dwc:relatedResourceID: string (nullable = true)
| | | |-- dwc:relationshipOfResource: string (nullable = true)
| | | |-- dwc:scientificName: string (nullable = true)
| | |-- dwc:VerbatimEventDate: string (nullable = true)
| | |-- dwc:accessRights: string (nullable = true)
| | |-- dwc:associatedMedia: string (nullable = true)
| | |-- dwc:associatedOccurrences: string (nullable = true)
| | |-- dwc:associatedReferences: string (nullable = true)
| | |-- dwc:associatedSequences: string (nullable = true)
| | |-- dwc:associatedTaxa: string (nullable = true)
| | |-- dwc:basisOfRecord: string (nullable = true)
| | |-- dwc:bed: string (nullable = true)
| | |-- dwc:behavior: string (nullable = true)
| | |-- dwc:catalogNumber: string (nullable = true)
| | |-- dwc:class: string (nullable = true)
| | |-- dwc:collectionCode: string (nullable = true)
| | |-- dwc:collectionID: string (nullable = true)
| | |-- dwc:continent: string (nullable = true)
| | |-- dwc:coordinatePrecision: string (nullable = true)
| | |-- dwc:coordinateUncertaintyInMeters: string (nullable = true)
| | |-- dwc:country: string (nullable = true)
| | |-- dwc:countryCode: string (nullable = true)
| | |-- dwc:county: string (nullable = true)
| | |-- dwc:dataGeneralizations: string (nullable = true)
| | |-- dwc:datasetID: string (nullable = true)
| | |-- dwc:datasetName: string (nullable = true)
| | |-- dwc:datasetid: string (nullable = true)
| | |-- dwc:dateIdentified: string (nullable = true)
| | |-- dwc:day: string (nullable = true)
| | |-- dwc:decimalLatitude: string (nullable = true)
| | |-- dwc:decimalLongitude: string (nullable = true)
| | |-- dwc:disposition: string (nullable = true)
| | |-- dwc:dynamicProperties: string (nullable = true)
| | |-- dwc:earliestAgeOrLowestStage: string (nullable = true)
| | |-- dwc:earliestEonOrLowestEonothem: string (nullable = true)
| | |-- dwc:earliestEpochOrLowestSeries: string (nullable = true)
| | |-- dwc:earliestEraOrLowestErathem: string (nullable = true)
| | |-- dwc:earliestPeriodOrLowestSystem: string (nullable = true)
| | |-- dwc:endDayOfYear: string (nullable = true)
| | |-- dwc:establishmentMeans: string (nullable = true)
| | |-- dwc:eventDate: string (nullable = true)
| | |-- dwc:eventID: string (nullable = true)
| | |-- dwc:eventRemarks: string (nullable = true)
| | |-- dwc:eventTime: string (nullable = true)
| | |-- dwc:family: string (nullable = true)
| | |-- dwc:fieldNotes: string (nullable = true)
| | |-- dwc:fieldNumber: string (nullable = true)
| | |-- dwc:footprintWKT: string (nullable = true)
| | |-- dwc:formation: string (nullable = true)
| | |-- dwc:genus: string (nullable = true)
| | |-- dwc:geodeticDatum: string (nullable = true)
| | |-- dwc:geologicalContextID: string (nullable = true)
| | |-- dwc:georeferenceProtocol: string (nullable = true)
| | |-- dwc:georeferenceRemarks: string (nullable = true)
| | |-- dwc:georeferenceSources: string (nullable = true)
| | |-- dwc:georeferenceVerificationStatus: string (nullable = true)
| | |-- dwc:georeferencedBy: string (nullable = true)
| | |-- dwc:georeferencedDate: string (nullable = true)
| | |-- dwc:group: string (nullable = true)
| | |-- dwc:habitat: string (nullable = true)
| | |-- dwc:higherClassification: string (nullable = true)
| | |-- dwc:higherGeography: string (nullable = true)
| | |-- dwc:higherGeographyID: string (nullable = true)
| | |-- dwc:highestBiostratigraphicZone: string (nullable = true)
| | |-- dwc:identificationID: string (nullable = true)
| | |-- dwc:identificationQualifier: string (nullable = true)
| | |-- dwc:identificationReferences: string (nullable = true)
| | |-- dwc:identificationRemarks: string (nullable = true)
| | |-- dwc:identificationVerificationStatus: string (nullable = true)
| | |-- dwc:identifiedBy: string (nullable = true)
| | |-- dwc:individualCount: string (nullable = true)
| | |-- dwc:individualID: string (nullable = true)
| | |-- dwc:informationWithheld: string (nullable = true)
| | |-- dwc:infraspecificEpithet: string (nullable = true)
| | |-- dwc:institutionCode: string (nullable = true)
| | |-- dwc:institutionID: string (nullable = true)
| | |-- dwc:island: string (nullable = true)
| | |-- dwc:islandGroup: string (nullable = true)
| | |-- dwc:kingdom: string (nullable = true)
| | |-- dwc:latestAgeOrHighestStage: string (nullable = true)
| | |-- dwc:latestEonOrHighestEonothem: string (nullable = true)
| | |-- dwc:latestEpochOrHighestSeries: string (nullable = true)
| | |-- dwc:latestEraOrHighestErathem: string (nullable = true)
| | |-- dwc:latestPeriodOrHighestSystem: string (nullable = true)
| | |-- dwc:lifeStage: string (nullable = true)
| | |-- dwc:lithostratigraphicTerms: string (nullable = true)
| | |-- dwc:locality: string (nullable = true)
| | |-- dwc:locationAccordingTo: string (nullable = true)
| | |-- dwc:locationID: string (nullable = true)
| | |-- dwc:locationRemarks: string (nullable = true)
| | |-- dwc:lowestBiostratigraphicZone: string (nullable = true)
| | |-- dwc:maximumDepthInMeters: string (nullable = true)
| | |-- dwc:maximumElevationInMeters: string (nullable = true)
| | |-- dwc:member: string (nullable = true)
| | |-- dwc:minimumDepthInMeters: string (nullable = true)
| | |-- dwc:minimumElevationInMeters: string (nullable = true)
| | |-- dwc:month: string (nullable = true)
| | |-- dwc:multimedia: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dcterms:created: string (nullable = true)
| | | |-- dcterms:creator: string (nullable = true)
| | | |-- dcterms:description: string (nullable = true)
| | | |-- dcterms:identifier: string (nullable = true)
| | | |-- dcterms:license: string (nullable = true)
| | | |-- dcterms:publisher: string (nullable = true)
| | | |-- dcterms:references: string (nullable = true)
| | | |-- dcterms:rightsholder: string (nullable = true)
| | | |-- dcterms:source: string (nullable = true)
| | | |-- dcterms:title: string (nullable = true)
| | |-- dwc:municipality: string (nullable = true)
| | |-- dwc:nameAccordingTo: string (nullable = true)
| | |-- dwc:nameAccordingToID: string (nullable = true)
| | |-- dwc:namePublishedIn: string (nullable = true)
| | |-- dwc:namePublishedInID: string (nullable = true)
| | |-- dwc:namePublishedInYear: string (nullable = true)
| | |-- dwc:nomenclaturalCode: string (nullable = true)
| | |-- dwc:nomenclaturalStatus: string (nullable = true)
| | |-- dwc:occurrenceDetails: string (nullable = true)
| | |-- dwc:occurrenceID: string (nullable = true)
| | |-- dwc:occurrenceRemarks: string (nullable = true)
| | |-- dwc:occurrenceStatus: string (nullable = true)
| | |-- dwc:order: string (nullable = true)
| | |-- dwc:organismID: string (nullable = true)
| | |-- dwc:originalNameUsage: string (nullable = true)
| | |-- dwc:originalnameusageid: string (nullable = true)
| | |-- dwc:otherCatalogNumbers: string (nullable = true)
| | |-- dwc:ownerInstitutionCode: string (nullable = true)
| | |-- dwc:parentNameUsage: string (nullable = true)
| | |-- dwc:parentnameusageid: string (nullable = true)
| | |-- dwc:phylum: string (nullable = true)
| | |-- dwc:preparations: string (nullable = true)
| | |-- dwc:previousIdentifications: string (nullable = true)
| | |-- dwc:recordNumber: string (nullable = true)
| | |-- dwc:recordedBy: string (nullable = true)
| | |-- dwc:reproductiveCondition: string (nullable = true)
| | |-- dwc:rights: string (nullable = true)
| | |-- dwc:rightsHolder: string (nullable = true)
| | |-- dwc:samplingEffort: string (nullable = true)
| | |-- dwc:samplingProtocol: string (nullable = true)
| | |-- dwc:scientificName: string (nullable = true)
| | |-- dwc:scientificNameAuthorship: string (nullable = true)
| | |-- dwc:scientificNameID: string (nullable = true)
| | |-- dwc:sex: string (nullable = true)
| | |-- dwc:specificEpithet: string (nullable = true)
| | |-- dwc:startDayOfYear: string (nullable = true)
| | |-- dwc:stateProvince: string (nullable = true)
| | |-- dwc:subgenus: string (nullable = true)
| | |-- dwc:taxonID: string (nullable = true)
| | |-- dwc:taxonRank: string (nullable = true)
| | |-- dwc:taxonRemarks: string (nullable = true)
| | |-- dwc:taxonid: string (nullable = true)
| | |-- dwc:taxonomicStatus: string (nullable = true)
| | |-- dwc:taxonomicstatus: string (nullable = true)
| | |-- dwc:taxonrank: string (nullable = true)
| | |-- dwc:taxonremarks: string (nullable = true)
| | |-- dwc:typeStatus: string (nullable = true)
| | |-- dwc:verbatimCoordinateSystem: string (nullable = true)
| | |-- dwc:verbatimCoordinates: string (nullable = true)
| | |-- dwc:verbatimDepth: string (nullable = true)
| | |-- dwc:verbatimElevation: string (nullable = true)
| | |-- dwc:verbatimEventDate: string (nullable = true)
| | |-- dwc:verbatimLatitude: string (nullable = true)
| | |-- dwc:verbatimLocality: string (nullable = true)
| | |-- dwc:verbatimLongitude: string (nullable = true)
| | |-- dwc:verbatimSRS: string (nullable = true)
| | |-- dwc:verbatimTaxonRank: string (nullable = true)
| | |-- dwc:vernacularName: string (nullable = true)
| | |-- dwc:waterBody: string (nullable = true)
| | |-- dwc:year: string (nullable = true)
| | |-- fcc:datePicked: string (nullable = true)
| | |-- fcc:pickedBy: string (nullable = true)
| | |-- flag_dwc_class_added: boolean (nullable = true)
| | |-- flag_dwc_class_replaced: boolean (nullable = true)
| | |-- flag_dwc_continent_added: boolean (nullable = true)
| | |-- flag_dwc_continent_replaced: boolean (nullable = true)
| | |-- flag_dwc_country_added: boolean (nullable = true)
| | |-- flag_dwc_country_replaced: boolean (nullable = true)
| | |-- flag_dwc_datasetid_added: boolean (nullable = true)
| | |-- flag_dwc_datasetid_replaced: boolean (nullable = true)
| | |-- flag_dwc_family_added: boolean (nullable = true)
| | |-- flag_dwc_family_replaced: boolean (nullable = true)
| | |-- flag_dwc_genus_added: boolean (nullable = true)
| | |-- flag_dwc_genus_replaced: boolean (nullable = true)
| | |-- flag_dwc_kingdom_added: boolean (nullable = true)
| | |-- flag_dwc_kingdom_replaced: boolean (nullable = true)
| | |-- flag_dwc_multimedia_added: boolean (nullable = true)
| | |-- flag_dwc_order_added: boolean (nullable = true)
| | |-- flag_dwc_order_replaced: boolean (nullable = true)
| | |-- flag_dwc_originalnameusageid_added: boolean (nullable = true)
| | |-- flag_dwc_parentnameusageid_added: boolean (nullable = true)
| | |-- flag_dwc_phylum_added: boolean (nullable = true)
| | |-- flag_dwc_phylum_replaced: boolean (nullable = true)
| | |-- flag_dwc_stateprovince_replaced: boolean (nullable = true)
| | |-- flag_dwc_taxonid_added: boolean (nullable = true)
| | |-- flag_dwc_taxonid_replaced: boolean (nullable = true)
| | |-- flag_dwc_taxonomicstatus_added: boolean (nullable = true)
| | |-- flag_dwc_taxonomicstatus_replaced: boolean (nullable = true)
| | |-- flag_dwc_taxonrank_added: boolean (nullable = true)
| | |-- flag_dwc_taxonrank_replaced: boolean (nullable = true)
| | |-- flag_dwc_taxonremarks_added: boolean (nullable = true)
| | |-- flag_dwc_taxonremarks_replaced: boolean (nullable = true)
| | |-- flag_gbif_canonicalname_added: boolean (nullable = true)
| | |-- flag_gbif_reference_added: boolean (nullable = true)
| | |-- flag_gbif_taxon_corrected: boolean (nullable = true)
| | |-- flag_gbif_vernacularname_added: boolean (nullable = true)
| | |-- flag_idigbio_isocountrycode_added: boolean (nullable = true)
| | |-- flag_taxon_match_failed: boolean (nullable = true)
| | |-- gbif:Identifier: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dcterms:identifier: string (nullable = true)
| | | |-- dcterms:subject: string (nullable = true)
| | |-- gbif:Reference: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dcterms:bibliographicCitation: string (nullable = true)
| | | |-- dcterms:bibliographiccitation: string (nullable = true)
| | | |-- dcterms:creator: string (nullable = true)
| | | |-- dcterms:date: string (nullable = true)
| | | |-- dcterms:description: string (nullable = true)
| | | |-- dcterms:identifier: string (nullable = true)
| | | |-- dcterms:source: string (nullable = true)
| | | |-- dcterms:subject: string (nullable = true)
| | | |-- dcterms:title: string (nullable = true)
| | | |-- dcterms:type: string (nullable = true)
| | | |-- dwc:taxonRemarks: string (nullable = true)
| | |-- gbif:canonicalname: string (nullable = true)
| | |-- gbif:reference: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dcterms:bibliographiccitation: string (nullable = true)
| | | |-- dcterms:identifier: string (nullable = true)
| | | |-- dcterms:source: string (nullable = true)
| | |-- gbif:vernacularname: struct (nullable = true)
| | | |-- coreid: string (nullable = true)
| | | |-- dcterms:language: string (nullable = true)
| | | |-- dcterms:source: string (nullable = true)
| | | |-- dwc:country: string (nullable = true)
| | | |-- dwc:countrycode: string (nullable = true)
| | | |-- dwc:vernacularname: string (nullable = true)
| | |-- id: string (nullable = true)
| | |-- idigbio:dateModified: string (nullable = true)
| | |-- idigbio:etag: string (nullable = true)
| | |-- idigbio:isocountrycode: string (nullable = true)
| | |-- idigbio:parent: string (nullable = true)
| | |-- idigbio:preservative: string (nullable = true)
| | |-- idigbio:recordId: string (nullable = true)
| | |-- idigbio:recordIds: string (nullable = true)
| | |-- idigbio:siblings: struct (nullable = true)
| | | |-- mediarecord: string (nullable = true)
| | |-- idigbio:subfamily: string (nullable = true)
| | |-- idigbio:substrate: string (nullable = true)
| | |-- idigbio:superfamily: string (nullable = true)
| | |-- idigbio:uuid: string (nullable = true)
| | |-- symbiota:recordEnteredBy: string (nullable = true)
| | |-- symbiota:verbatimScientificName: string (nullable = true)
|-- individualcount: float (nullable = true)
|-- infraspecificepithet: string (nullable = true)
|-- institutioncode: string (nullable = true)
|-- institutionid: string (nullable = true)
|-- institutionname: string (nullable = true)
|-- kingdom: string (nullable = true)
|-- latestageorhigheststage: string (nullable = true)
|-- latesteonorhighesteonothem: string (nullable = true)
|-- latestepochorhighestseries: string (nullable = true)
|-- latesteraorhighesterathem: string (nullable = true)
|-- latestperiodorhighestsystem: string (nullable = true)
|-- lithostratigraphicterms: string (nullable = true)
|-- locality: string (nullable = true)
|-- lowestbiostratigraphiczone: string (nullable = true)
|-- maxdepth: float (nullable = true)
|-- maxelevation: float (nullable = true)
|-- mediarecords: string (nullable = true)
|-- member: string (nullable = true)
|-- mindepth: float (nullable = true)
|-- minelevation: float (nullable = true)
|-- municipality: string (nullable = true)
|-- occurrenceid: string (nullable = true)
|-- order: string (nullable = true)
|-- phylum: string (nullable = true)
|-- recordids: string (nullable = true)
|-- recordnumber: string (nullable = true)
|-- recordset: string (nullable = true)
|-- scientificname: string (nullable = true)
|-- specificepithet: string (nullable = true)
|-- startdayofyear: integer (nullable = true)
|-- stateprovince: string (nullable = true)
|-- taxonid: string (nullable = true)
|-- taxonomicstatus: string (nullable = true)
|-- taxonrank: string (nullable = true)
|-- typestatus: string (nullable = true)
|-- uuid: string (nullable = true)
|-- verbatimeventdate: string (nullable = true)
|-- verbatimlocality: string (nullable = true)
|-- version: integer (nullable = true)
|-- waterbody: string (nullable = true)
In [6]:
es_df.head(1)
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-2cf597babc90> in <module>()
----> 1 es_df.head(1)
/opt/spark/latest/python/pyspark/sql/dataframe.py in head(self, n)
794 rs = self.head(1)
795 return rs[0] if rs else None
--> 796 return self.take(n)
797
798 @ignore_unicode_prefix
/opt/spark/latest/python/pyspark/sql/dataframe.py in take(self, num)
348 with SCCallSiteSync(self._sc) as css:
349 port = self._sc._jvm.org.apache.spark.sql.execution.python.EvaluatePython.takeAndServe(
--> 350 self._jdf, num)
351 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
352
/opt/spark/latest/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
931 answer = self.gateway_client.send_command(command)
932 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934
935 for temp_arg in temp_args:
/opt/spark/latest/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/spark/latest/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.\n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.execution.python.EvaluatePython.takeAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'indexData.dwc:multimedia.dcterms:source' not found; typically this occurs with arrays which are not mapped as single value
at org.elasticsearch.spark.sql.RowValueReader$class.rowColumns(RowValueReader.scala:34)
at org.elasticsearch.spark.sql.ScalaRowValueReader.rowColumns(ScalaEsRowValueReader.scala:14)
at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:51)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:778)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:704)
at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:746)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:699)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:806)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:704)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:806)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:704)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:458)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:383)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:278)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:251)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:456)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:86)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$takeAndServe$1.apply$mcI$sp(EvaluatePython.scala:41)
at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$takeAndServe$1.apply(EvaluatePython.scala:39)
at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$takeAndServe$1.apply(EvaluatePython.scala:39)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2532)
at org.apache.spark.sql.execution.python.EvaluatePython$.takeAndServe(EvaluatePython.scala:39)
at org.apache.spark.sql.execution.python.EvaluatePython.takeAndServe(EvaluatePython.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'indexData.dwc:multimedia.dcterms:source' not found; typically this occurs with arrays which are not mapped as single value
at org.elasticsearch.spark.sql.RowValueReader$class.rowColumns(RowValueReader.scala:34)
at org.elasticsearch.spark.sql.ScalaRowValueReader.rowColumns(ScalaEsRowValueReader.scala:14)
at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:51)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:778)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:704)
at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:746)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:699)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:806)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:704)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:806)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:704)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:458)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:383)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:278)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:251)
at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:456)
at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:86)
at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:43)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
The zip file here has all the jars in it: https://www.elastic.co/downloads/hadoop
And example code:
https://spark.apache.org/docs/latest/programming-guide.html#external-datasets
Full docs, only Scala and Java samples except the last lines which are Python:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
ls /opt/spark/latest/jars/ela* /opt/spark/latest/jars/elasticsearch-hadoop-2.4.0.jar
>>> es_df = sqlContext.read.format("org.elasticsearch.spark.sql").load("idigbio-2.10.1/records")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark/latest/python/pyspark/sql/readwriter.py", line 147, in load
return self._df(self._jreader.load(path))
File "/opt/spark/latest/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", line 933, in __call__
File "/opt/spark/latest/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark/latest/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.load.
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Unknown GeoShape [null]
at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.doParseGeoShapeInfo(MappingUtils.java:254)
at org.elasticsearch.hadoop.serialization.dto.mapping.MappingUtils.parseGeoInfo(MappingUtils.java:219)
at org.elasticsearch.hadoop.rest.RestRepository.sampleGeoFields(RestRepository.java:447)
at org.elasticsearch.spark.sql.SchemaUtils$.discoverMappingAsField(SchemaUtils.scala:86)
at org.elasticsearch.spark.sql.SchemaUtils$.discoverMapping(SchemaUtils.scala:69)
at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:112)
at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:112)
at org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:116)
at org.elasticsearch.spark.sql.ElasticsearchRelation$$anonfun$schema$1.apply(DefaultSource.scala:116)
at scala.Option.getOrElse(Option.scala:121)
at org.elasticsearch.spark.sql.ElasticsearchRelation.schema(DefaultSource.scala:116)
at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:40)
at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
https://github.com/elastic/elasticsearch-hadoop/issues/607
Alex took out the GeoShape field in the 2.10.2 index because it was all nulls and unused. The documented behavior when converting this field type from ES to a DF is to sample one row and use that to determine the data strucuture. It's not happy when all rows are null apparently.
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapping.html
Content source: bio-guoda/guoda-datasets
Similar notebooks: