To prevent this notebook from getting too cluttered, we use some python utilities. We load them below.
In [ ]:
import sys
sys.path.append("./modules")
import iae_examples
Let's set this notebook to use the full width of the browser using the utilities
In [ ]:
iae_examples.set_notebook_full_width()
We can read some variables saved when we ran the notebook examples/CLI/CLI_Setup.ipynb
to configure our choosen api, org and space
In [ ]:
(CF_API, CF_ORG, CF_SPACE) = iae_examples.read_cf_target_endpoint_details('../secrets/cf_target_endpoint.json')
Create a file ../secrets/cos_s3_endpoint.json
with your COS credentials. The file format should be:
{
"S3_ACCESS_KEY": "<AccessKey-changeme>",
"S3_PRIVATE_ENDPOINT": "<Private-EndPoint-changeme>",
"S3_PUBLIC_ENDPOINT": "<Public-EndPoint-changeme>",
"S3_SECRET_KEY": "<SecretKey-changeme>"
}
Now let's load the cos file into some variables that we will use later
In [ ]:
(S3_ACCESS_KEY, S3_PRIVATE_ENDPOINT, S3_PUBLIC_ENDPOINT, S3_SECRET_KEY) = \
iae_examples.read_cos_endpoint_details('../secrets/cos_s3_endpoint.json')
In [ ]:
url = 'https://raw.githubusercontent.com/snowch/IBM_Analytics_Engine_Examples/master/scripts/COS_S3.sh'
filename = 'COS_S3_bootstrap.sh'
bucket_name = 'temp-bucket'
In [ ]:
iae_examples.save_url_to_cos(url, bucket_name, filename, S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT)
Before we can provision IAE, we need to login to Bluemix using the Bluemix CLI
In [ ]:
! bx login --apikey @../secrets/apiKeyPersonal.json -a {CF_API} -o {CF_ORG} -s {CF_SPACE}
There are a few ways to configure IAE to use IBM COS. Let's automate the process with a custom script.
NOTE: These examples prefer automation to manual approaches for configuration. One key benefit of automation is that it supports creating environments in a repeatable and testable way.
In [ ]:
import json
custom_script = {
"num_compute_nodes": 1,
"hardware_config": "default",
"software_package": "ae-1.0-hadoop-spark",
"customization": [{
"name": "action1",
"type": "bootstrap",
"script": {
"source_type": "CosS3",
"source_props": {
"auth_endpoint": S3_PRIVATE_ENDPOINT,
"access_key_id": S3_ACCESS_KEY,
"secret_access_key": S3_SECRET_KEY
},
"script_path": bucket_name + "/COS_S3_bootstrap.sh"
},
"script_params": [S3_ACCESS_KEY, S3_PRIVATE_ENDPOINT, S3_SECRET_KEY]
}]
}
# write the script to a file in the local directory where we can access it in the next step using the Bluemix CLI
with open('../secrets/custom_script.json', 'w') as f:
f.write(json.dumps(custom_script))
We can then attempt to create an IBM Analytics Engine Instance using the custom script file that we created in the previous step.
In [ ]:
! bx cf create-service IBMAnalyticsEngine lite 'myiaeinstance' -c ../secrets/custom_script.json
Note the output from above. If all went ok, the CLI should suggest running cf service myiaeinstance
to check the provisioning status. Let's do that now.
NOTE: If there is an error output by the above step, jump to the section below on debugging.
In [ ]:
! bx cf service myiaeinstance
When the status is: create succeeded
, move on to the next step.
In [ ]:
! bx cf create-service-key myiaeinstance myiaeinstance_servicekey
In [ ]:
! bx cf service-keys myiaeinstance
In [ ]:
! bx cf service-key myiaeinstance myiaeinstance_servicekey > ../secrets/iae_service_key.json
# unfortunately, the output of the above command contains some lines of text before the json
# lets remove the first four lines of output and save the raw json
iae_examples.strip_premable_from_service_key('../secrets/iae_service_key.json')
In [ ]:
IAE_USER = iae_examples.iae_service_user('../secrets/iae_service_key.json')
IAE_PASSWORD = iae_examples.iae_service_password('../secrets/iae_service_key.json')
IAE_AMBARI_URL = iae_examples.iae_service_endpoint_ambari('../secrets/iae_service_key.json')
IAE_LIVY_URL = iae_examples.iae_service_endpoint_livy('../secrets/iae_service_key.json')
IAE_WEBHDFS_URL = iae_examples.iae_service_endpoint_webhdfs('../secrets/iae_service_key.json')
In [ ]:
iae_examples.read_iae_service_keys('../secrets/iae_service_key.json')['cluster']['service_endpoints']['hive_jdbc']
In [ ]:
# This is broken
# iae_examples.is_s3_access_key_set(IAE_AMBARI_URL, IAE_USER, IAE_PASSWORD, S3_ACCESS_KEY)
SSH onto the cluster (see ../secrets/iae_service_key.json for endpoints and credentials)
ssh clsadmin@chs-xxxxx-mn003.bi.services.us-south.bluemix.net
beeline -u 'jdbc:hive2://chs-xxxxx-mn001.bi.services.us-south.bluemix.net:8443/;ssl=true;transportMode=http;httpPath=gateway/default/hive' -n clsadmin -p **********
Next from the beeline session, create the hive table:
CREATE EXTERNAL TABLE avro_hive_table
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 's3a://temp-bucket/transactions/'
TBLPROPERTIES (
'avro.schema.literal'='{
"namespace": "transaction.avro",
"type": "record",
"name": "Transaction",
"fields": [
{"name": "InvoiceNo", "type": "int" },
{"name": "StockCode", "type": "string" },
{"name": "Description", "type": "string" },
{"name": "Quantity", "type": "int" },
{"name": "InvoiceDate", "type": "long" },
{"name": "UnitPrice", "type": "float" },
{"name": "CustomerID", "type": "int" },
{"name": "Country", "type": "string" },
{"name": "LineNo", "type": "int" },
{"name": "InvoiceTime", "type": "string" },
{"name": "StoreID", "type": "int" },
{"name": "TransactionID", "type": "string" }
]
}'
)
;
And query:
select count(*) from avro_hive_table;
First, let's create a pyspark script
In [ ]:
file_contents = """
from __future__ import print_function
from datetime import datetime
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.appName("PythonPi").getOrCreate()
output = "Hello World at %s" % (str(datetime.now()))
print(output)
output_rdd = spark.sparkContext.parallelize([output])
output_rdd.coalesce(1, True).saveAsTextFile('s3a://{0}/provision_iae_with_cos_spark_job_output.txt')
spark.stop()
""".format(bucket_name)
bucket_name = 'temp-bucket'
filename = 'PiEx.py'
(S3_ACCESS_KEY, S3_PRIVATE_ENDPOINT, S3_PUBLIC_ENDPOINT, S3_SECRET_KEY) = \
iae_examples.read_cos_endpoint_details('../secrets/cos_s3_endpoint.json')
iae_examples.save_string_to_cos(
file_contents.encode('ascii'), bucket_name, filename, S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT
)
Execute spark job
In [ ]:
IAE_USER = iae_examples.iae_service_user('../secrets/iae_service_key.json')
IAE_PASSWORD = iae_examples.iae_service_password('../secrets/iae_service_key.json')
IAE_LIVY_URL = iae_examples.iae_service_endpoint_livy('../secrets/iae_service_key.json')
IAE_WEBHDFS_URL = iae_examples.iae_service_endpoint_webhdfs('../secrets/iae_service_key.json')
Ensure the bucket_name
variable is still set.
In [ ]:
print(bucket_name)
In [ ]:
import requests, json
headers = {
'Content-Type': 'application/json',
'X-Requested-By': 'livy'
}
data = { "file":"s3a://{0}/PiEx.py".format(bucket_name) }
res = requests.post(IAE_LIVY_URL, auth=(IAE_USER, IAE_PASSWORD), headers=headers, data=json.dumps(data))
print(res.text)
id = res.json()['id']
Note that we saved the job id
in the previous step. Let's use that to. Let's use that to get job state.
NOTE: keep running the cell below until status is successful or it has failed.
In [ ]:
headers = {
'Content-Type': 'application/json',
'X-Requested-By': 'livy'
}
url = '{0}/{1}'.format(IAE_LIVY_URL, id)
response = requests.get(url, auth=(IAE_USER, IAE_PASSWORD), headers=headers)
print(response.json()['state'])
Take a look at the spark job log using the job id
:
In [ ]:
headers = {
'Content-Type': 'application/json',
'X-Requested-By': 'livy'
}
url = '{0}/{1}/log'.format(IAE_LIVY_URL, id)
response = requests.get(url, auth=(IAE_USER, IAE_PASSWORD), headers=headers)
print('\n'.join(response.json()['log']))
This is the processed I followed to debug an issue with my submitted job.
If there is an error we can debug by looking for the yarn application id in the log output, e.g.
17/09/23 06:21:51 INFO Client: Application report for application_1506108548102_0002 (state: ACCEPTED)
Ssh onto the cluster, I ran the following command (change for your application_xxxx value):
$ yarn logs -applicationId application_1506108548102_0002 | less
Burried in the yarn output, I noticed the following:
py4j.protocol.Py4JJavaError: An error occurred while calling o76.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3a://temp-bucket/provision_iae_with_cos_spark_job_output.txt already exists
Now let's take a look at the file contents. In the spark job, we coalesced the RDD. This causes spark to just save one output file, which will be part-00000:
output_rdd.coalesce(1, True).saveAsTextFile('s3a://{0}/provision_iae_with_cos_spark_job_output.txt')
In [ ]:
data = iae_examples.get_file_content_from_cos(
bucket_name,
'provision_iae_with_cos_spark_job_output.txt/part-00000',
S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT
)
print(data)
Let's keep things tidy and remove our job output. Note that if you don't remove it, next time you run the spark job it will fail because the output file already exists.
In [ ]:
iae_examples.recursively_delete_file_in_cos(
bucket_name,
'provision_iae_with_cos_spark_job_output.txt',
S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT
)
In [ ]:
! bx cf space dev --guid
In [ ]:
! bx cf services
In [ ]:
! bx cf service-keys myiaeinstance
In [ ]:
! bx cf delete-service-key myiaeinstance myiaeinstance_servicekey -f
In [ ]:
! bx cf delete-service myiaeinstance -f
In [ ]: