Introduction

In this notebook, we use the Bluemix CLI tools to create a new IBM Analytics Engine instance that is configured to use IBM Cloud Object Storage (IBM COS).


Load utility library and set notebook width

To prevent this notebook from getting too cluttered, we use some python utilities. We load them below.


In [ ]:
import sys
sys.path.append("./modules")
import iae_examples

Let's set this notebook to use the full width of the browser using the utilities


In [ ]:
iae_examples.set_notebook_full_width()

Read Cloud Foundry endpoint properties

We can read some variables saved when we ran the notebook examples/CLI/CLI_Setup.ipynb to configure our choosen api, org and space


In [ ]:
(CF_API, CF_ORG, CF_SPACE) = iae_examples.read_cf_target_endpoint_details('../secrets/cf_target_endpoint.json')

Save IBM Cloud Object Storage endpoint properties

Create a file ../secrets/cos_s3_endpoint.json with your COS credentials. The file format should be:

{
   "S3_ACCESS_KEY":       "<AccessKey-changeme>",
   "S3_PRIVATE_ENDPOINT": "<Private-EndPoint-changeme>",
   "S3_PUBLIC_ENDPOINT":  "<Public-EndPoint-changeme>",
   "S3_SECRET_KEY":       "<SecretKey-changeme>"
}

Now let's load the cos file into some variables that we will use later


In [ ]:
(S3_ACCESS_KEY, S3_PRIVATE_ENDPOINT, S3_PUBLIC_ENDPOINT, S3_SECRET_KEY) = \
    iae_examples.read_cos_endpoint_details('../secrets/cos_s3_endpoint.json')

Upload IAE bootstrap file to COS


In [ ]:
url = 'https://raw.githubusercontent.com/snowch/IBM_Analytics_Engine_Examples/master/scripts/COS_S3.sh'
filename = 'COS_S3_bootstrap.sh'
bucket_name = 'temp-bucket'

In [ ]:
iae_examples.save_url_to_cos(url, bucket_name, filename, S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT)

Provision IAE instance

Before we can provision IAE, we need to login to Bluemix using the Bluemix CLI


In [ ]:
! bx login --apikey @../secrets/apiKeyPersonal.json -a {CF_API} -o {CF_ORG} -s {CF_SPACE}

There are a few ways to configure IAE to use IBM COS. Let's automate the process with a custom script.

NOTE: These examples prefer automation to manual approaches for configuration. One key benefit of automation is that it supports creating environments in a repeatable and testable way.


In [ ]:
import json

custom_script = { 
    "num_compute_nodes": 1, 
    "hardware_config": "default", 
    "software_package": "ae-1.0-hadoop-spark",
    "customization": [{
        "name": "action1",
        "type": "bootstrap",
        "script": {
            "source_type": "CosS3",
            "source_props": {
                "auth_endpoint": S3_PRIVATE_ENDPOINT,
                "access_key_id": S3_ACCESS_KEY,
                "secret_access_key": S3_SECRET_KEY
             },
         "script_path": bucket_name + "/COS_S3_bootstrap.sh"
        },
        "script_params": [S3_ACCESS_KEY, S3_PRIVATE_ENDPOINT, S3_SECRET_KEY]
    }]
}

# write the script to a file in the local directory where we can access it in the next step using the Bluemix CLI 

with open('../secrets/custom_script.json', 'w') as f:
    f.write(json.dumps(custom_script))

We can then attempt to create an IBM Analytics Engine Instance using the custom script file that we created in the previous step.


In [ ]:
! bx cf create-service IBMAnalyticsEngine lite 'myiaeinstance' -c ../secrets/custom_script.json

Note the output from above. If all went ok, the CLI should suggest running cf service myiaeinstance to check the provisioning status. Let's do that now.

NOTE: If there is an error output by the above step, jump to the section below on debugging.


In [ ]:
! bx cf service myiaeinstance

When the status is: create succeeded, move on to the next step.


Create service key

Here we create a service key which contains the cluster credentials. We export the service key information to a file. We can then read the service key details into python variables so we can use those variables later in this notebook.


In [ ]:
! bx cf create-service-key myiaeinstance myiaeinstance_servicekey

In [ ]:
! bx cf service-keys myiaeinstance

In [ ]:
! bx cf service-key myiaeinstance myiaeinstance_servicekey > ../secrets/iae_service_key.json

# unfortunately, the output of the above command contains some lines of text before the json
# lets remove the first four lines of output and save the raw json

iae_examples.strip_premable_from_service_key('../secrets/iae_service_key.json')

In [ ]:
IAE_USER        = iae_examples.iae_service_user('../secrets/iae_service_key.json')
IAE_PASSWORD    = iae_examples.iae_service_password('../secrets/iae_service_key.json')
IAE_AMBARI_URL  = iae_examples.iae_service_endpoint_ambari('../secrets/iae_service_key.json')
IAE_LIVY_URL    = iae_examples.iae_service_endpoint_livy('../secrets/iae_service_key.json')
IAE_WEBHDFS_URL = iae_examples.iae_service_endpoint_webhdfs('../secrets/iae_service_key.json')

In [ ]:
iae_examples.read_iae_service_keys('../secrets/iae_service_key.json')['cluster']['service_endpoints']['hive_jdbc']

Verify COS was successfully configured


In [ ]:
# This is broken
# iae_examples.is_s3_access_key_set(IAE_AMBARI_URL, IAE_USER, IAE_PASSWORD, S3_ACCESS_KEY)

Create hive table and query it

SSH onto the cluster (see ../secrets/iae_service_key.json for endpoints and credentials)

ssh clsadmin@chs-xxxxx-mn003.bi.services.us-south.bluemix.net
beeline -u 'jdbc:hive2://chs-xxxxx-mn001.bi.services.us-south.bluemix.net:8443/;ssl=true;transportMode=http;httpPath=gateway/default/hive' -n clsadmin -p **********

Next from the beeline session, create the hive table:

CREATE EXTERNAL TABLE avro_hive_table
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION 's3a://temp-bucket/transactions/'
TBLPROPERTIES (
    'avro.schema.literal'='{
                            "namespace": "transaction.avro",
                                     "type": "record",
                                     "name": "Transaction",
                                     "fields": [
                                         {"name": "InvoiceNo",     "type": "int"    },
                                         {"name": "StockCode",     "type": "string" },
                                         {"name": "Description",   "type": "string" },
                                         {"name": "Quantity",      "type": "int"    },
                                         {"name": "InvoiceDate",   "type": "long"   },
                                         {"name": "UnitPrice",     "type": "float"  },
                                         {"name": "CustomerID",    "type": "int"    },
                                         {"name": "Country",       "type": "string" },
                                         {"name": "LineNo",        "type": "int"    },
                                         {"name": "InvoiceTime",   "type": "string" },
                                         {"name": "StoreID",       "type": "int"    },
                                         {"name": "TransactionID", "type": "string" }
                                     ]
                            }'
    )
;

And query:

select count(*) from avro_hive_table;

Upload spark script to COS

First, let's create a pyspark script


In [ ]:
file_contents = """
from __future__ import print_function
from datetime import datetime

from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.appName("PythonPi").getOrCreate()

    output = "Hello World at %s" % (str(datetime.now()))
    print(output)

    output_rdd = spark.sparkContext.parallelize([output])
    output_rdd.coalesce(1, True).saveAsTextFile('s3a://{0}/provision_iae_with_cos_spark_job_output.txt')

    spark.stop()
""".format(bucket_name)

bucket_name = 'temp-bucket'
filename = 'PiEx.py'

(S3_ACCESS_KEY, S3_PRIVATE_ENDPOINT, S3_PUBLIC_ENDPOINT, S3_SECRET_KEY) = \
    iae_examples.read_cos_endpoint_details('../secrets/cos_s3_endpoint.json')

iae_examples.save_string_to_cos(
    file_contents.encode('ascii'), bucket_name, filename, S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT
    )

Submit spark job with Livy API

Execute spark job


In [ ]:
IAE_USER        = iae_examples.iae_service_user('../secrets/iae_service_key.json')
IAE_PASSWORD    = iae_examples.iae_service_password('../secrets/iae_service_key.json')
IAE_LIVY_URL    = iae_examples.iae_service_endpoint_livy('../secrets/iae_service_key.json')
IAE_WEBHDFS_URL = iae_examples.iae_service_endpoint_webhdfs('../secrets/iae_service_key.json')

Ensure the bucket_name variable is still set.


In [ ]:
print(bucket_name)

In [ ]:
import requests, json

headers = { 
    'Content-Type': 'application/json',
    'X-Requested-By': 'livy'
}
data = { "file":"s3a://{0}/PiEx.py".format(bucket_name) }

res = requests.post(IAE_LIVY_URL, auth=(IAE_USER, IAE_PASSWORD), headers=headers, data=json.dumps(data))
print(res.text)

id = res.json()['id']

Note that we saved the job id in the previous step. Let's use that to. Let's use that to get job state.

NOTE: keep running the cell below until status is successful or it has failed.


In [ ]:
headers = { 
    'Content-Type': 'application/json',
    'X-Requested-By': 'livy'
}
url = '{0}/{1}'.format(IAE_LIVY_URL, id)
response = requests.get(url, auth=(IAE_USER, IAE_PASSWORD), headers=headers)
print(response.json()['state'])

Take a look at the spark job log using the job id:


In [ ]:
headers = { 
    'Content-Type': 'application/json',
    'X-Requested-By': 'livy'
}
url = '{0}/{1}/log'.format(IAE_LIVY_URL, id)
response = requests.get(url, auth=(IAE_USER, IAE_PASSWORD), headers=headers)
print('\n'.join(response.json()['log']))

Debugging errors

This is the processed I followed to debug an issue with my submitted job.

If there is an error we can debug by looking for the yarn application id in the log output, e.g.

17/09/23 06:21:51 INFO Client: Application report for application_1506108548102_0002 (state: ACCEPTED)

Ssh onto the cluster, I ran the following command (change for your application_xxxx value):

$ yarn logs -applicationId application_1506108548102_0002 | less

Burried in the yarn output, I noticed the following:

py4j.protocol.Py4JJavaError: An error occurred while calling o76.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3a://temp-bucket/provision_iae_with_cos_spark_job_output.txt already exists

Now let's take a look at the file contents. In the spark job, we coalesced the RDD. This causes spark to just save one output file, which will be part-00000:

output_rdd.coalesce(1, True).saveAsTextFile('s3a://{0}/provision_iae_with_cos_spark_job_output.txt')

In [ ]:
data = iae_examples.get_file_content_from_cos(
                bucket_name, 
                'provision_iae_with_cos_spark_job_output.txt/part-00000', 
                S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT
                )
print(data)

Let's keep things tidy and remove our job output. Note that if you don't remove it, next time you run the spark job it will fail because the output file already exists.


In [ ]:
iae_examples.recursively_delete_file_in_cos(
                bucket_name, 
                'provision_iae_with_cos_spark_job_output.txt', 
                S3_ACCESS_KEY, S3_SECRET_KEY, S3_PUBLIC_ENDPOINT
                )

Debugging

TODO


In [ ]:
! bx cf space dev --guid

In [ ]:
! bx cf services

In [ ]:
! bx cf service-keys myiaeinstance

In [ ]:
! bx cf delete-service-key myiaeinstance myiaeinstance_servicekey -f

In [ ]:
! bx cf delete-service myiaeinstance -f

In [ ]: