In [ ]:
# This benchmark measures the performance of pipeline related operations in Kubeflow Pipelines, including latencies of creating/getting/deleting pipelines.

import random
import kfp
import kfp_server_api
import os
import string
import time

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats

# CHANGE necessary paramters here
# Host is your KFP endpoint
host = 'http://127.0.0.1:3001'
# Number of pipelines you want to create 
num_pipelines = 10
# Number of pipeline versions you want to create under each pipeline
num_pipeline_versions_per_pipeline = 10
# Use the pipeline you prefer
pipeline_file_url = 'https://storage.googleapis.com/jingzhangjz-project-pipelines/benchmarks/taxi.yaml'


def random_suffix() -> string:
    return ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))


if __name__ == '__main__':
    client = kfp.Client(host)
    api_url = kfp_server_api.models.ApiUrl(pipeline_file_url)
        
    # Create pipeline latency
    create_latencies = []
    created_pipeline_ids = []
    for i in range(num_pipelines):
        api_pipeline = kfp_server_api.models.ApiPipeline(
            name='pipeline-' + random_suffix(),
            url=api_url)
        start = time.perf_counter()
        pipeline = client.pipelines.create_pipeline(body=api_pipeline)
        dur = time.perf_counter() - start
        create_latencies.append(dur)
        created_pipeline_ids.append(pipeline.id)
        
    # Create version latency    
    create_version_latencies = []
    created_version_ids = []
    for pipeline_id in created_pipeline_ids:
        for j in range(num_pipeline_versions_per_pipeline):
            key = kfp_server_api.models.ApiResourceKey(id=pipeline_id, type=kfp_server_api.models.ApiResourceType.PIPELINE)
            reference = kfp_server_api.models.ApiResourceReference(key=key, relationship=kfp_server_api.models.ApiRelationship.OWNER)
            resource_references=[reference]
            api_pipeline_version = kfp_server_api.models.ApiPipelineVersion(
                name='pipeline-version-' + random_suffix(),
                package_url=api_url,
                resource_references=resource_references)
            start = time.perf_counter()
            pipeline_version = client.pipelines.create_pipeline_version(body=api_pipeline_version)
            dur = time.perf_counter() - start
            create_version_latencies.append(dur)
            created_version_ids.append(pipeline_version.id)     
            # We sometimes observe errors when the version creation calls are too close to each other when those 
            # versions are created in the same pipeline. When adding a new version to a specific pipeline, the 
            # pipeline's default version is updated to the new version. Therefore, when we create a bunch of versions
            # for the same pipeline in a row within a short period of time, these creation operations are competing 
            # for a write lock on the same row of pipelines table in our db. This is one of the possible hypotheses
            # to explain the errors when we've observed. But this is definitely an interesting symptom that worths 
            # further investigation. For now, we separate the version creation calls by 2 seconds.
            time.sleep(2)
        
    # Get pipeline latency
    get_latencies = []
    for i in created_pipeline_ids:
        start = time.perf_counter()
        pipeline = client.pipelines.get_pipeline(i)  
        dur = time.perf_counter() - start
        get_latencies.append(dur)      
        
    # Delete pipeline latency
    delete_latencies= []
    for i in created_pipeline_ids:
        start = time.perf_counter()
        pipeline = client.pipelines.delete_pipeline(i)  
        dur = time.perf_counter() - start
        delete_latencies.append(dur)

    # Plots
    fig, axs = plt.subplots(nrows=4, figsize=(10,20))
    
    axs[0].set(title='Create Pipeline Latency', xlabel='Time (Second)', ylabel='Create Pipeline')
    sns.distplot(a=create_latencies,  ax=axs[0], hist=True, kde=False, rug=True)
    
    axs[1].set(title='Create Pipeline Version Latency', xlabel='Time (Second)', ylabel='Create Pipeline Version')
    sns.distplot(a=create_version_latencies,  ax=axs[1], hist=True, kde=False, rug=True)    
    
    axs[2].set(title='Get Pipeline Latency', xlabel='Time (Second)', ylabel='Get Pipeline')
    sns.distplot(a=get_latencies,  ax=axs[2], hist=True, kde=False, rug=True)    
    
    axs[3].set(title='Delete Pipeline Latency', xlabel='Time (Second)', ylabel='Delete Pipeline')
    sns.distplot(a=delete_latencies,  ax=axs[3], hist=True, kde=False, rug=True)
    
    # TODO(jingzhang36): maybe dump the durations data to db or gcs, and let searborn read from there

In [ ]:


In [ ]: