Inicjalizacja:


In [1]:
import re
import ast
import pyspark
import time
import datetime
import html
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
#visualisation
from bokeh.plotting import figure, show, output_file, output_notebook
from bokeh.charts import Histogram, Scatter, Bar, TimeSeries, BoxPlot, show, \
                         output_file, output_notebook, color, marker, defaults
from bokeh.charts.attributes import ColorAttr, CatAttr
from bokeh.layouts import column
from bokeh.models import NumeralTickFormatter, DatetimeTickFormatter
import pandas as pd

In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)

Domyślne opcje Bokeh:


In [3]:
# Domyślny rozmiar wykresów Bokeh:
defaults.width = 950
defaults.height = 600

Import danych z plików XML:

Dane z archiwum forum GIS Stack Exchange (https://archive.org/download/stackexchange).

Funkcja pomocnicza attribute_search(attribute, string) jest wykorzystywana podczas wczytywania wierszy XML do obiektu Row pyspark. Podajemy 'attribute' jako nazwę atrybutu XML, którego wartość chcemy otrzymać oraz 'string' -- wiersz tekstu pobrany z pliku XML.


In [4]:
# Funkcja pomocnicza: wyszukiwanie podanego atrybutu w linii pliku XML,
# zwraca wartość znalezionego atrybutu w string lub None jeśli atrybut nie istnieje

def attribute_search(attribute, my_string):
    result = re.search(attribute + '=\"(.*?)\"', my_string)
    if result:
        return result.group(1).replace('"', '')
    else:
        return None

Funkcje importujące pliki XML

Pliki XML archiwum XML zawierające tabele danych. Są zorganizowane na zasadzie "jedna tabela -- jedna funkcja importująca" ze względu na różny schemat danych oraz dostosowanie importu: niepotrzebne kolumny nie są importowane.


In [5]:
# Funkcje pomocnicze: interpretacja plików XML (różne schematy danych)
# ze Stackoverflow

def tags_from_xml(line):
    c = line.replace('<row', '').replace('/>', '')
    row = dict()
    row['Id'] = int(attribute_search('Id', c))
    row['TagName'] = attribute_search('TagName', c)
    count = attribute_search('Count', c)
    row['Count'] = int(count) if count else None
    excerpt = attribute_search('ExcerptPostId', c)
    row['ExcerptPostId'] = int(excerpt) if excerpt else None
    wiki = attribute_search('WikiPostId', c)
    row['WikiPostId'] = int(wiki) if wiki else None
    return pyspark.Row(**row)

def badges_from_xml(line):
    c = line.replace('<row', '').replace('/>', '')
    row = dict()
    row['Id'] = int(attribute_search('Id', c))
    row['UserId'] = int(attribute_search('UserId', c))
    row['Name'] = attribute_search('Name', c)
    row['Date'] = datetime.datetime.strptime(attribute_search('Date', c), "%Y-%m-%dT%H:%M:%S.%f")
    row['Class'] = int(attribute_search('Class', c))
    row['TagBased'] = ast.literal_eval(attribute_search('TagBased', c))
    return pyspark.Row(**row)

def users_from_xml(line):
    c = line.replace('<row', '').replace('/>', '')
    row = dict()
    row['Id'] = int(attribute_search('Id', c))
    row['Reputation'] = int(attribute_search('Reputation', c))
    row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
    row['DisplayName'] = attribute_search('DisplayName', c)
    row['LastAccessDate'] = datetime.datetime.strptime(attribute_search('LastAccessDate', c), "%Y-%m-%dT%H:%M:%S.%f")
    row['WebsiteUrl'] = attribute_search('WebsiteUrl', c)
    row['Location'] = attribute_search('Location', c)
    age = attribute_search('Age', c)
    row['Age'] = int(age) if age else None
    row['Views'] = int(attribute_search('Views', c))
    row['UpVotes'] = int(attribute_search('UpVotes', c))
    row['DownVotes'] = int(attribute_search('DownVotes', c))
    return pyspark.Row(**row)

def posts_from_xml(line):
    c = line.replace('<row', '').replace('/>', '')
    row = dict()
    row['Id'] = int(attribute_search('Id', c))
    row['PostTypeId'] = int(attribute_search('PostTypeId', c))
    found_id = attribute_search('ParentId', c)
    row['ParentId'] = int(found_id) if found_id else None
    found_id = attribute_search('AcceptedAnswerId', c)
    row['AcceptedAnswerId'] = int(found_id) if found_id else None
    row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
    row['Score'] = int(attribute_search('Score', c))
    vc = attribute_search('ViewCount', c)
    row['ViewCount'] = int(vc) if vc else None
    owner = attribute_search('OwnerUserId', c)
    row['OwnerUserId'] = int(owner) if owner else None
    lasted = attribute_search('LastEditorUserId', c)
    row['LastEditorUserId'] = int(lasted) if lasted else None

    row['Body'] = re.sub('(<!--.*?-->|<[^>]*>)', '', html.unescape(attribute_search('Body', c)))
    title = attribute_search('Title', c)
    row['Title'] = title if title else None
    tags = attribute_search('Tags', c)
    row['Tags'] = html.unescape(tags).replace('<', '').replace('>', ' ') if tags else None
    date = attribute_search('ClosedDate', c)
    row['ClosedDate'] = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") if date else None

    count = attribute_search('AnswerCount', c)
    row['AnswerCount'] = int(count) if count else None
    count = attribute_search('CommentCount', c)
    row['CommentCount'] = int(count) if count else None
    count = attribute_search('FavoriteCount', c)
    row['FavoriteCount'] = int(count) if count else None
        
    return pyspark.Row(**row)

def comments_from_xml(line):
    c = line.replace('<row', '').replace('/>', '')
    row = dict()
    row['Id'] = int(attribute_search('Id', c))
    row['PostId'] = int(attribute_search('PostId', c))
    row['Score'] = int(attribute_search('Score', c))
    row['Text'] = re.sub('(<!--.*?-->|<[^>]*>)', '', html.unescape(attribute_search('Text', c)))
    row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
    user = attribute_search('UserId', c)
    row['UserId'] = int(user) if user else None
    return pyspark.Row(**row)

def post_history_from_xml(line):
    c = line.replace('<row', '').replace('/>', '')
    row = dict()
    row['Id'] = int(attribute_search('Id', c))
    row['PostHistoryTypeId'] = int(attribute_search('PostHistoryTypeId', c))
    row['PostId'] = int(attribute_search('PostId', c))
    comm = attribute_search('Comment', c)
    row['Comment'] = comm if comm else None
    text = attribute_search('Text', c)
    row['Text'] = re.sub('(<!--.*?-->|<[^>]*>)', '', html.unescape(text)) if text else None
    return pyspark.Row(**row)

def post_links_from_xml(line):
    c = line.replace('<row', '').replace('/>', '')
    row = dict()
    row['Id'] = int(attribute_search('Id', c))
    row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
    row['PostId'] = int(attribute_search('PostId', c))
    row['RelatedPostId'] = int(attribute_search('RelatedPostId', c))
    row['LinkTypeId'] = int(attribute_search('LinkTypeId', c))
    return pyspark.Row(**row)

In [6]:
# Wczytanie danych do RDD z plików XML, a następnie konwersja RDD do DF: 
xml_load_path = 'file:///home/marek/Dokumenty/Notebooks/bd/gis_stack_spark/data/'

# Słownik: nazwa pliku i odpowiadająca mu funkcja pomocnicza interpretująca
# schemat danych w pliku XML
xml_load_list = {'Tags.xml': tags_from_xml, 'Badges.xml': badges_from_xml, \
                 'Users.xml': users_from_xml,'Posts.xml': posts_from_xml, \
                 'Comments.xml': comments_from_xml,'PostHistory.xml': post_history_from_xml, \
                 'PostLinks.xml': post_links_from_xml}

tags_rdd = sc.textFile(xml_load_path + 'Tags.xml').filter(lambda line: "row" in line) \
             .map(lambda l: xml_load_list['Tags.xml'](l))

badges_rdd = sc.textFile(xml_load_path + 'Badges.xml').filter(lambda line: "row" in line) \
               .map(lambda l: xml_load_list['Badges.xml'](l))

users_rdd = sc.textFile(xml_load_path + 'Users.xml').filter(lambda line: "row" in line) \
              .map(lambda l: xml_load_list['Users.xml'](l))

posts_rdd = sc.textFile(xml_load_path + 'Posts.xml').filter(lambda line: "row" in line) \
              .map(lambda l: xml_load_list['Posts.xml'](l))

comments_rdd = sc.textFile(xml_load_path + 'Comments.xml').filter(lambda line: "row" in line) \
                 .map(lambda l: xml_load_list['Comments.xml'](l))

post_history_rdd = sc.textFile(xml_load_path + 'PostHistory.xml').filter(lambda line: "row" in line) \
                     .map(lambda l: xml_load_list['PostHistory.xml'](l))

post_links_rdd = sc.textFile(xml_load_path + 'PostLinks.xml').filter(lambda line: "row" in line) \
                   .map(lambda l: xml_load_list['PostLinks.xml'](l))

# Konwersja na DataFrame:
users = sqlContext.createDataFrame(users_rdd)
badges = sqlContext.createDataFrame(badges_rdd)
posts = sqlContext.createDataFrame(posts_rdd)
tags = sqlContext.createDataFrame(tags_rdd)
comments = sqlContext.createDataFrame(comments_rdd)
post_history = sqlContext.createDataFrame(post_history_rdd)
post_links = sqlContext.createDataFrame(post_links_rdd)

Analiza danych

Oczyszczenie danych z nadmiarowych informacji

Ze zbioru postów należy usunąć posty: scalone (merged) oraz duplikaty (duplicate). Scalone posty mają często ujemne czasy odpowiedzi, duplikaty -- bardzo długie czasy odpowiedzi (jeśli np. nowy post jest duplikatem starego postu).

Posty mające w historii (tabela post_history): PostHistoryTypeId:

  • 37 = Post merge source
  • 38 = Post merge destination

Przykłady problematycznych postów:

Scalone posty miewają zamienione oryginalne pytanie z późniejszym, co sprawia, że czas odpowiedzi jest przed czasem zadania pytania.

Przykład scalonych postów (ujemny wyliczony czas odpowiedzi): https://gis.stackexchange.com/questions/124794/installing-qgis-with-ecw-support-on-mac

Uwaga: w dumpie z 14 marca 2017 ma 2 dodatkowe linki (scalenia), obecnie (maj 2017) są 3 linki (scalenia).

Przykład wpisu z czasem odpowiedzi = 0 sekund: https://gis.stackexchange.com/questions/203380/how-to-set-up-python-arcpy-with-arcgis-pro-1-3


In [7]:
# ze zbioru postów należy usunąć posty: zmerdżowane (merged)
# oraz duplikaty (duplicate). Merged mają często ujemne czasy
# odpowiedzi, duplikaty -- bardzo długie czasy (jeśli np. nowy
# post jest duplikatem starego postu)
merged_posts_ids = post_history.select(post_history.PostId) \
                   .where((post_history.PostHistoryTypeId == 37) | \
                          (post_history.PostHistoryTypeId == 38))

duplicate_posts_ids = post_links.select(post_links.PostId).where(post_links.LinkTypeId == 3)

remove_ids = merged_posts_ids.union(duplicate_posts_ids).distinct()
remove_ids.count()


Out[7]:
4222

In [8]:
#  anti_join using a dataframe:
#    tbl1.as("a").join(tbl2.as("b"), $"a.id" === $"b.id", "left_anti")
# This PR provides serves as the basis for implementing `NOT EXISTS` and `NOT IN 
# (...)` correlated sub-queries. It would also serve as good basis for 
# implementing an more efficient `EXCEPT` operator.

# nadpisujemy 'posts' za pomocą DF z usuniętymi:
# - duplikatami postów
# - postami scalonymi (merged)
# Id tych postów są w remove_ids
posts = posts.join(remove_ids, posts.Id == remove_ids.PostId, 'left_anti')

In [9]:
posts.count()


Out[9]:
176521

Liczba użytowników vs liczba napisanych postów:

Używana jest tabela (DataFrame) 'posts'.


In [10]:
users_by_posts = posts.groupBy('OwnerUserId').count()\
                      .select(col('OwnerUserId').alias('user'), col('count').alias('number_posts'))\
                      .groupBy('number_posts').count()\
                      .select(col('number_posts'), col('count').alias('number_users'))\
                      .orderBy('number_posts')

In [11]:
# Wersja z RDD i mapowaniem z lambdą:
# x = users_by_posts.rdd.map(lambda x: x.number_posts).collect()
# y = users_by_posts.rdd.map(lambda y: y.number_users).collect()

# lub z konwersją do pandas dataframe, a następnie do listy:
x = users_by_posts.toPandas()['number_posts'].tolist()
y = users_by_posts.toPandas()['number_users'].tolist()

Użycie Bokeh do wyświetlenia danych o użytkownikach i napisanych postach:


In [12]:
output_notebook()
plot = figure(plot_width=950, plot_height=600,\
              x_axis_type="log", y_axis_type="log",\
              x_axis_label="Liczba postów", y_axis_label="Liczba użytkowników")
plot.circle(x, y, size=6, color="green", alpha=0.4)
show(plot)


Loading BokehJS ...

Najpopularniejsze tagi na forum GIS


In [13]:
popular_tags = tags.select(tags.TagName, tags.Count)\
                   .sort(tags.Count, ascending=False).toPandas()

In [14]:
#.to_latex(column_format='l|r|r', decimal=','))
popular_tags.head(20)


Out[14]:
TagName Count
0 qgis 16285
1 arcgis-desktop 14326
2 arcpy 5919
3 python 5678
4 postgis 4967
5 raster 4432
6 arcgis-10.0 3528
7 arcmap 3509
8 geoserver 3308
9 openlayers-2 3270
10 arcgis-10.1 3060
11 shapefile 2899
12 arcgis-10.2 2730
13 gdal 2566
14 arcgis-server 2291
15 arcobjects 2195
16 postgresql 2192
17 pyqgis 2179
18 javascript 1898
19 openstreetmap 1873

In [15]:
output_notebook()
plot = Bar(popular_tags.head(20), values='Count', label=CatAttr(columns=['TagName'], sort=False), \
           title="Najpopularniejsze tagi na forum GIS", legend=False, color="darkviolet", \
           ylabel='Liczba postów', xlabel='Tag (nazwa)')
show(plot)


Loading BokehJS ...

Aktywność community GIS:

Aktywność community GIS: pytania pozostawione bez odpowiedzi.

PostTypeId == 1 -- pytanie PostTypeId == 2 -- odpowiedź


In [16]:
questions_vs_answers = posts.select(col('Id'), col('AnswerCount')).where(posts.PostTypeId == 1)\
                            .groupBy('AnswerCount').count()\
                            .select(col('AnswerCount').alias('answers'), col('count').alias('number_questions'))\
                            .orderBy('answers')

total_questions = posts.select(posts.Id).where(posts.PostTypeId == 1).count()
total_answers = posts.select(posts.Id).where(posts.PostTypeId == 2).count()

unanswered_questions = questions_vs_answers.select(col('number_questions')) \
                       .where(questions_vs_answers.answers == 0) \
                       .collect()[0].number_questions

In [17]:
print("Całkowita liczba postów (bez duplikatów): {}".format(posts.count()))
print("Liczba pytań na forum GIS:       {}".format(total_questions))
print("Liczba odpowiedzi na forum GIS:  {}".format(total_answers))
print("Liczba pytań bez odpowiedzi:     {}".format(unanswered_questions))
print("Procent pytań bez odpowiedzi:    {:.2%}".format(unanswered_questions/total_questions))


Całkowita liczba postów (bez duplikatów): 176521
Liczba pytań na forum GIS:       76472
Liczba odpowiedzi na forum GIS:  95980
Liczba pytań bez odpowiedzi:     15389
Procent pytań bez odpowiedzi:    20.12%

Inny typ wpisu (wiki -- nie jest to ani pytanie, ani odpowiedź):


In [18]:
posts.select(posts.Id).where(posts.PostTypeId == 4).count()


Out[18]:
2018

Aktywność community GIS: Number of Questions vs Answers


In [19]:
output_notebook()
plot = Bar(questions_vs_answers.toPandas().head(9), 'answers', values='number_questions', \
           title="Odpowiedzi vs liczba pytań", legend=False, color="navy", \
           ylabel='Pytania', xlabel='Odpowiedzi')

show(plot)


Loading BokehJS ...

In [20]:
questions_vs_answers.toPandas().head(9)


Out[20]:
answers number_questions
0 0 15389
1 1 40117
2 2 14251
3 3 4254
4 4 1376
5 5 536
6 6 240
7 7 124
8 8 60

In [22]:
print(questions_vs_answers.toPandas().head(9).to_latex(index=False))


\begin{tabular}{rr}
\toprule
 answers &  number\_questions \\
\midrule
       0 &             15389 \\
       1 &             40117 \\
       2 &             14251 \\
       3 &              4254 \\
       4 &              1376 \\
       5 &               536 \\
       6 &               240 \\
       7 &               124 \\
       8 &                60 \\
\bottomrule
\end{tabular}

Analiza danych dla wybranych produktów (oprogramowania) GIS

Dla kilku produktów: iterujemy po liście tagów produktowych 'gis_products' i składamy kolejne kolumny w DF pandas. Pierwszą kolumną z wartościami jest liczba postów na miesiąc dla całego forum.


In [23]:
gis_products = ['qgis', 'arcgis-desktop', 'postgis', 'openstreetmap', 'grass', 'google-maps']

In [ ]:
# lub analiza tylko dla produktów ArcGIS (gdyż występują różne wersje w tagach)
#gis_products = ['arcgis-desktop', 'arcgis-server', 'arcgis-10.0', \
#                'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4', 'arcgis-10.5']
gis_products = ['arcgis-10.0', \
                'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4', 'arcgis-10.5']

In [24]:
# Wyliczamy totals (dla wszystkich postow) w gis_posts_data.
# Dzięki temu mamy DF do którego potem doklejamy kolumny
# z wynikami dla innych tagów.
print('Wyliczam statystyki dla wszystkich postów.')
gis_posts_data = posts.filter(posts.Tags.isNotNull()).select(concat_ws('.', year("CreationDate"), \
                              lpad(month("CreationDate"), 2, "0")).alias('Months')) \
                      .groupBy('Months').count().sort('Months').toPandas()

gis_posts_data.rename(columns={'count': 'wszystkie'}, inplace=True)

for gis_p in gis_products:
    print('Wyliczam posty dla produktu: {0}'.format(gis_p))
    product_posts = posts.filter(posts.Tags.isNotNull() & posts.Tags.rlike(r'\b{0}\b'.format(gis_p))) \
        .select(concat_ws('.', year("CreationDate"), lpad(month("CreationDate"), 2, "0")).alias('Months')) \
        .groupBy('Months').count().sort('Months').toPandas()
    product_posts.rename(columns={'count': gis_p}, inplace=True)
    gis_posts_data = pd.merge(gis_posts_data, product_posts, on='Months', how='outer')

print('Gotowe.')


Wyliczam statystyki dla wszystkich postów.
Wyliczam posty dla produktu: qgis
Wyliczam posty dla produktu: arcgis-desktop
Wyliczam posty dla produktu: postgis
Wyliczam posty dla produktu: openstreetmap
Wyliczam posty dla produktu: grass
Wyliczam posty dla produktu: google-maps
Gotowe.

In [25]:
# Opcjonalnie: jako nowe kolumny
# Obliczanie % postow dla produktu ws totals
# np. dla produktu qgis obliczamy qgis_p jako
# (qgis / wszystkie) * 100%
for gis_p in gis_products:
    gis_posts_data['%' + gis_p] = (gis_posts_data[gis_p].fillna(0) \
                                   / gis_posts_data.wszystkie) * 100

In [26]:
# Pokaż dane:
gis_posts_data


Out[26]:
Months wszystkie qgis arcgis-desktop postgis openstreetmap grass google-maps %qgis %arcgis-desktop %postgis %openstreetmap %grass %google-maps
0 2009.08 1 NaN NaN NaN NaN NaN NaN 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
1 2010.02 1 NaN NaN NaN NaN NaN NaN 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
2 2010.03 1 NaN NaN NaN NaN NaN NaN 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
3 2010.04 2 NaN NaN NaN NaN NaN NaN 0.000000 0.000000 0.000000 0.000000 0.000000 0.000000
4 2010.06 4 NaN 1.0 NaN NaN NaN NaN 0.000000 25.000000 0.000000 0.000000 0.000000 0.000000
5 2010.07 133 3.0 15.0 8.0 3.0 2.0 7.0 2.255639 11.278195 6.015038 2.255639 1.503759 5.263158
6 2010.08 234 3.0 37.0 19.0 3.0 3.0 5.0 1.282051 15.811966 8.119658 1.282051 1.282051 2.136752
7 2010.09 201 7.0 27.0 10.0 11.0 2.0 5.0 3.482587 13.432836 4.975124 5.472637 0.995025 2.487562
8 2010.10 211 2.0 26.0 7.0 3.0 1.0 4.0 0.947867 12.322275 3.317536 1.421801 0.473934 1.895735
9 2010.11 223 6.0 36.0 11.0 2.0 2.0 2.0 2.690583 16.143498 4.932735 0.896861 0.896861 0.896861
10 2010.12 212 4.0 24.0 11.0 NaN 2.0 12.0 1.886792 11.320755 5.188679 0.000000 0.943396 5.660377
11 2011.01 265 3.0 32.0 12.0 4.0 7.0 11.0 1.132075 12.075472 4.528302 1.509434 2.641509 4.150943
12 2011.02 273 14.0 25.0 14.0 6.0 20.0 6.0 5.128205 9.157509 5.128205 2.197802 7.326007 2.197802
13 2011.03 395 19.0 48.0 17.0 6.0 27.0 14.0 4.810127 12.151899 4.303797 1.518987 6.835443 3.544304
14 2011.04 319 11.0 33.0 13.0 2.0 6.0 6.0 3.448276 10.344828 4.075235 0.626959 1.880878 1.880878
15 2011.05 387 19.0 53.0 19.0 12.0 12.0 9.0 4.909561 13.695090 4.909561 3.100775 3.100775 2.325581
16 2011.06 377 17.0 44.0 27.0 11.0 5.0 9.0 4.509284 11.671088 7.161804 2.917772 1.326260 2.387268
17 2011.07 362 32.0 30.0 33.0 5.0 5.0 11.0 8.839779 8.287293 9.116022 1.381215 1.381215 3.038674
18 2011.08 372 24.0 38.0 40.0 7.0 8.0 5.0 6.451613 10.215054 10.752688 1.881720 2.150538 1.344086
19 2011.09 342 25.0 36.0 34.0 8.0 5.0 16.0 7.309942 10.526316 9.941520 2.339181 1.461988 4.678363
20 2011.10 384 26.0 46.0 24.0 3.0 7.0 8.0 6.770833 11.979167 6.250000 0.781250 1.822917 2.083333
21 2011.11 359 24.0 43.0 22.0 3.0 4.0 14.0 6.685237 11.977716 6.128134 0.835655 1.114206 3.899721
22 2011.12 299 14.0 33.0 18.0 9.0 2.0 8.0 4.682274 11.036789 6.020067 3.010033 0.668896 2.675585
23 2012.01 405 29.0 39.0 18.0 14.0 5.0 13.0 7.160494 9.629630 4.444444 3.456790 1.234568 3.209877
24 2012.02 493 49.0 62.0 30.0 14.0 2.0 20.0 9.939148 12.576065 6.085193 2.839757 0.405680 4.056795
25 2012.03 537 48.0 64.0 26.0 15.0 4.0 22.0 8.938547 11.918063 4.841713 2.793296 0.744879 4.096834
26 2012.04 664 139.0 50.0 57.0 25.0 9.0 18.0 20.933735 7.530120 8.584337 3.765060 1.355422 2.710843
27 2012.05 738 150.0 72.0 64.0 14.0 19.0 18.0 20.325203 9.756098 8.672087 1.897019 2.574526 2.439024
28 2012.06 728 135.0 62.0 51.0 13.0 5.0 22.0 18.543956 8.516484 7.005495 1.785714 0.686813 3.021978
29 2012.07 746 155.0 87.0 52.0 25.0 6.0 16.0 20.777480 11.662198 6.970509 3.351206 0.804290 2.144772
... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
56 2014.10 1500 283.0 338.0 97.0 29.0 35.0 32.0 18.866667 22.533333 6.466667 1.933333 2.333333 2.133333
57 2014.11 1318 286.0 300.0 74.0 32.0 17.0 25.0 21.699545 22.761760 5.614568 2.427921 1.289833 1.896813
58 2014.12 1153 219.0 257.0 76.0 37.0 8.0 21.0 18.993929 22.289679 6.591500 3.209020 0.693842 1.821336
59 2015.01 1390 280.0 278.0 88.0 39.0 21.0 21.0 20.143885 20.000000 6.330935 2.805755 1.510791 1.510791
60 2015.02 1318 318.0 289.0 70.0 33.0 36.0 19.0 24.127466 21.927162 5.311077 2.503794 2.731411 1.441578
61 2015.03 1410 339.0 300.0 82.0 20.0 22.0 21.0 24.042553 21.276596 5.815603 1.418440 1.560284 1.489362
62 2015.04 1358 276.0 290.0 80.0 23.0 29.0 23.0 20.324006 21.354934 5.891016 1.693667 2.135493 1.693667
63 2015.05 1233 258.0 280.0 69.0 25.0 28.0 11.0 20.924574 22.708840 5.596107 2.027575 2.270884 0.892133
64 2015.06 1334 310.0 272.0 81.0 43.0 25.0 21.0 23.238381 20.389805 6.071964 3.223388 1.874063 1.574213
65 2015.07 1448 371.0 339.0 89.0 35.0 28.0 21.0 25.621547 23.411602 6.146409 2.417127 1.933702 1.450276
66 2015.08 1296 302.0 215.0 102.0 25.0 22.0 21.0 23.302469 16.589506 7.870370 1.929012 1.697531 1.620370
67 2015.09 1297 309.0 261.0 67.0 38.0 20.0 18.0 23.824210 20.123362 5.165767 2.929838 1.542020 1.387818
68 2015.10 1380 352.0 234.0 101.0 44.0 17.0 10.0 25.507246 16.956522 7.318841 3.188406 1.231884 0.724638
69 2015.11 1271 335.0 253.0 81.0 21.0 26.0 6.0 26.357199 19.905586 6.372935 1.652242 2.045633 0.472069
70 2015.12 1177 254.0 238.0 74.0 29.0 21.0 13.0 21.580289 20.220901 6.287171 2.463891 1.784197 1.104503
71 2016.01 1307 293.0 253.0 84.0 32.0 12.0 22.0 22.417751 19.357307 6.426932 2.448355 0.918133 1.683244
72 2016.02 1485 373.0 253.0 76.0 34.0 25.0 13.0 25.117845 17.037037 5.117845 2.289562 1.683502 0.875421
73 2016.03 1571 401.0 322.0 91.0 31.0 24.0 17.0 25.525143 20.496499 5.792489 1.973265 1.527689 1.082113
74 2016.04 1659 391.0 343.0 105.0 29.0 20.0 19.0 23.568415 20.675105 6.329114 1.748041 1.205546 1.145268
75 2016.05 1498 408.0 288.0 126.0 24.0 27.0 21.0 27.236315 19.225634 8.411215 1.602136 1.802403 1.401869
76 2016.06 1561 395.0 300.0 94.0 34.0 25.0 16.0 25.304292 19.218450 6.021781 2.178091 1.601537 1.024984
77 2016.07 1351 358.0 228.0 72.0 50.0 28.0 8.0 26.498890 16.876388 5.329386 3.700962 2.072539 0.592154
78 2016.08 1384 373.0 226.0 78.0 41.0 19.0 10.0 26.950867 16.329480 5.635838 2.962428 1.372832 0.722543
79 2016.09 1337 357.0 210.0 63.0 39.0 17.0 10.0 26.701571 15.706806 4.712042 2.916978 1.271503 0.747943
80 2016.10 1342 307.0 249.0 89.0 33.0 24.0 12.0 22.876304 18.554396 6.631893 2.459016 1.788376 0.894188
81 2016.11 1453 377.0 245.0 101.0 21.0 25.0 21.0 25.946318 16.861666 6.951136 1.445286 1.720578 1.445286
82 2016.12 1231 290.0 239.0 89.0 25.0 20.0 7.0 23.558083 19.415110 7.229894 2.030869 1.624695 0.568643
83 2017.01 1513 381.0 279.0 91.0 34.0 27.0 18.0 25.181758 18.440185 6.014541 2.247191 1.784534 1.189689
84 2017.02 1525 371.0 291.0 103.0 32.0 19.0 13.0 24.327869 19.081967 6.754098 2.098361 1.245902 0.852459
85 2017.03 765 197.0 128.0 45.0 16.0 5.0 7.0 25.751634 16.732026 5.882353 2.091503 0.653595 0.915033

86 rows × 14 columns

Wyświetl trend czasowy -- liczba postów na miesiąc:


In [ ]:
# Niewygładzony
output_notebook()

tsline = TimeSeries(gis_posts_data, x='Months', y=gis_products, \
         title="Posty na miesiąc (liczba)", xlabel='Data (miesiące)', ylabel='Produkty GIS', legend=True)

show(column(tsline))

In [27]:
# Trend czasowy wygładzony -- posty na miesiąc
output_notebook()

tsline = TimeSeries(gis_posts_data.rolling(on='Months', window=4).mean(), x='Months', y=gis_products,\
                    title="Posty na miesiąc (liczba), wygładzone", \
                    xlabel='Data (miesiące)', ylabel='Produkty GIS', legend=True)

show(column(tsline))


Loading BokehJS ...

In [28]:
# Trend czasowy -- posty na miesiąc w % do całości
output_notebook()

tsline = TimeSeries(gis_posts_data.rolling(on='Months', window=4).mean(), x='Months',\
                    y=['%{0}'.format(i) for i in gis_products],\
                    title="Posty na miesiąc (%całości)",\
                    xlabel='Data (miesiące)', ylabel='Produkty GIS', legend=True)

show(column(tsline))


Loading BokehJS ...

Wyznaczenie liczby pytań i odpowiedzi dla produktów GIS

Definicja listy produktów (za pomocą tagów z forum):


In [29]:
gis_products = ['qgis', 'arcgis-desktop', 'postgis', 'openstreetmap', 'grass', 'google-maps']
#gis_products = ['arcgis-desktop', 'arcgis-server', 'arcgis-10.0', \
#                'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4']
#gis_products = ['arcgis-10.0', \
#                'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4']

Funkcja UDF Sparka do obliczenia różnicy czasu (w sekundach):


In [30]:
# x, y -- parametry typu datetime
def time_delta(x, y):
    delta = int((y - x).total_seconds())
    return delta

time_delta_udf = udf(time_delta, returnType = IntegerType())

Utworzenie dwóch DF ('questions' i 'answers') będących podziałem 'posts' na pytania i odpowiedzi z przepisaniem kolumn potrzebnych do wyliczenia różnicy czasu pytanie-odpowiedź.


In [31]:
# Czas odpowiedzi
# posts -- PostTypeId = 1 (Question), 2 (Answer)

questions = posts.select(posts.Id, posts.AcceptedAnswerId,\
                         posts.CreationDate.alias('QuestionDate'),\
                         posts.Tags).where(posts.PostTypeId == 1)

# alias('Parent') to obejście problemu
# org.apache.spark.sql.AnalysisException:
# resolved attribute(s) ID missing from ID in operator !Join
# https://issues.apache.org/jira/browse/SPARK-10925
answers = posts.select(posts.Id.alias('AnswerId'), posts.ParentId.alias('Parent'),\
                       posts.CreationDate.alias('AnswerDate')).where(posts.PostTypeId == 2)

In [32]:
# Weryfikacja schematu DF:
questions.printSchema()
answers.printSchema()


root
 |-- Id: long (nullable = true)
 |-- AcceptedAnswerId: long (nullable = true)
 |-- QuestionDate: timestamp (nullable = true)
 |-- Tags: string (nullable = true)

root
 |-- AnswerId: long (nullable = true)
 |-- Parent: long (nullable = true)
 |-- AnswerDate: timestamp (nullable = true)


In [33]:
posts.printSchema()


root
 |-- AcceptedAnswerId: long (nullable = true)
 |-- AnswerCount: long (nullable = true)
 |-- Body: string (nullable = true)
 |-- ClosedDate: timestamp (nullable = true)
 |-- CommentCount: long (nullable = true)
 |-- CreationDate: timestamp (nullable = true)
 |-- FavoriteCount: long (nullable = true)
 |-- Id: long (nullable = true)
 |-- LastEditorUserId: long (nullable = true)
 |-- OwnerUserId: long (nullable = true)
 |-- ParentId: long (nullable = true)
 |-- PostTypeId: long (nullable = true)
 |-- Score: long (nullable = true)
 |-- Tags: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- ViewCount: long (nullable = true)


In [34]:
# statystyki Q&A dla całości forum GIS (tylko z tabeli 'posts'):
#print('Wyliczam statystyki Q&A dla całego forum.')
qa_stats = pd.DataFrame(columns=['Product', 'Value', 'Type'])
#qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': 'GIS',\
#             'Value': posts.where(posts.PostTypeId == 1).count(),\
#             'Type': 'Questions'}]), ignore_index=True)
#qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': 'GIS',\
#             'Value': posts.where(posts.PostTypeId == 2).count(),\
#             'Type': 'Answers'}]), ignore_index=True)

# dla produktów:
for gis_p in gis_products:
    print('Wyliczam statystyki Q&A dla produktu: {0}'.format(gis_p))
    prod_ques = posts.select(posts.Id, posts.AnswerCount, posts.Tags)\
                     .where((posts.PostTypeId == 1) & posts.Tags.rlike(r'\b{0}\b'.format(gis_p)))
    prod_ans = prod_ques.groupBy()\
                        .sum('AnswerCount')\
                        .select(col('sum(AnswerCount)').alias('Answers'))
    qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': gis_p,\
             'Value': prod_ques.count(),\
             'Type': 'Questions'}]), ignore_index=True)
    qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': gis_p,\
             'Value': prod_ans.collect()[0].Answers,\
             'Type': 'Answers'}]), ignore_index=True)

print('Gotowe.')
qa_stats


Wyliczam statystyki Q&A dla produktu: qgis
Wyliczam statystyki Q&A dla produktu: arcgis-desktop
Wyliczam statystyki Q&A dla produktu: postgis
Wyliczam statystyki Q&A dla produktu: openstreetmap
Wyliczam statystyki Q&A dla produktu: grass
Wyliczam statystyki Q&A dla produktu: google-maps
Gotowe.
Out[34]:
Product Type Value
0 qgis Questions 16384.0
1 qgis Answers 18577.0
2 arcgis-desktop Questions 13600.0
3 arcgis-desktop Answers 17687.0
4 postgis Questions 5115.0
5 postgis Answers 6335.0
6 openstreetmap Questions 1835.0
7 openstreetmap Answers 2408.0
8 grass Questions 1332.0
9 grass Answers 1562.0
10 google-maps Questions 1259.0
11 google-maps Answers 1594.0

In [35]:
print(qa_stats.to_latex(column_format='{l|l|l|r}', decimal=',', index=False))


\begin{tabular}{{l|l|l|r}}
\toprule
        Product &       Type &   Value \\
\midrule
           qgis &  Questions &  16384, \\
           qgis &    Answers &  18577, \\
 arcgis-desktop &  Questions &  13600, \\
 arcgis-desktop &    Answers &  17687, \\
        postgis &  Questions &   5115, \\
        postgis &    Answers &   6335, \\
  openstreetmap &  Questions &   1835, \\
  openstreetmap &    Answers &   2408, \\
          grass &  Questions &   1332, \\
          grass &    Answers &   1562, \\
    google-maps &  Questions &   1259, \\
    google-maps &    Answers &   1594, \\
\bottomrule
\end{tabular}


In [36]:
output_notebook()
plot = Bar(qa_stats, values='Value', group='Type', legend='top_center',\
           ylabel='Liczba pytań lub odpowiedzi', xlabel='Oprogramowanie GIS',\
           label=CatAttr(columns=['Product'], sort=False))
show(plot)


Loading BokehJS ...

In [ ]:

Wyznaczenie czasów odpowiedzi dla produktów GIS

Definicje kwantyli.

Dodatkowo należy wykonać jednego z wybranych joinów (wygenerować DF 'joined_timedelta') w zależności od żądanego zakresu analizy. W pracy użyta została wersja "dla zaakceptowanych odpowiedzi (i żadnych innych) i wskazanego produktu z tagów".


In [37]:
quantiles = [0.0, 0.01, 0.25, 0.5, 0.75, 1.0]
qlabels = ["q"+str(int(q * 100)) for q in quantiles]
qlabels


Out[37]:
['q0', 'q1', 'q25', 'q50', 'q75', 'q100']

Przykładowe joiny użyte poniżej we właściwej pętli do wyliczenia różnicy czasu dla zbioru produktów:


In [ ]:
# wszystkie z odpowiedziami (nie tylko zaakceptowanymi)
joined_timedelta = questions.join(answers_nm, questions.Id == answers.Parent) \
                               .withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))

In [ ]:
# tylko z zaakceptowanymi odpowiedziami (ale uwzględnia pozostałe odpowiedzi dla pytania)
joined_timedelta = \
    questions.join(answers, questions.Id == answers.Parent) \
    .filter(col('AcceptedAnswerId').isNotNull()) \
    .withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))

Join tylko dla zaakceptowanych odpowiedzi (i żadnych innych) i wskazanego produktu z tagów:


In [ ]:
# tylko dla zaakceptowanych odpowiedzi (i żadnych innych)
joined_timedelta = \
    questions.join(answers, questions.Id == answers.Parent) \
    .filter(col('AcceptedAnswerId').isNotNull() & (col('AcceptedAnswerId') == col('AnswerId'))) \
    .withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))

In [ ]:
joined_timedelta.printSchema()

Przykładowe obliczenie kwantyli dla czasów odpowiedzi:


In [ ]:
# 0.0, 0.01, 0.25, 0.5, 0.75, 1.0
qvals = joined_timedelta.where((joined_timedelta.TimeDelta > 0))\
                        .approxQuantile('TimeDelta', quantiles, 0)
qvals

Właściwe obliczenie czasu odpowiedzi:


In [39]:
# czas tylko zaakceptowanej odpowiedzi per gis_product
# str(datetime.timedelta(seconds=13600)) -- format 'human readable'
qa_times = pd.DataFrame(columns=['Product', 'Value', 'Type'])

for gis_p in gis_products:
    print('Wyliczam statystyki czasów odpowiedzi dla produktu: {0}'.format(gis_p))

    joined_timedelta = \
        questions.join(answers, questions.Id == answers.Parent) \
                 .filter(col('AcceptedAnswerId').isNotNull() & \
                         (col('AcceptedAnswerId') == col('AnswerId')) & \
                         posts.Tags.rlike(r'\b{0}\b'.format(gis_p))) \
                 .withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))
        
    # quantiles[] = 0.0, 0.01, 0.25, 0.5, 0.75, 1.0
    qvals = joined_timedelta.where((joined_timedelta.TimeDelta > 0))\
                            .approxQuantile('TimeDelta', quantiles, 0)
        
    # wstawienie wyników do DF (tylko q1, q25 i q50):
    for i in [1, 2, 3]:
        qa_times = qa_times.append(pd.DataFrame.from_dict([{'Product': gis_p,\
                   'Value': qvals[i]*1000,\
                   'Type':  qlabels[i]}]), ignore_index=True)

qa_times


Wyliczam statystyki czasów odpowiedzi dla produktu: qgis
Wyliczam statystyki czasów odpowiedzi dla produktu: arcgis-desktop
Wyliczam statystyki czasów odpowiedzi dla produktu: postgis
Wyliczam statystyki czasów odpowiedzi dla produktu: openstreetmap
Wyliczam statystyki czasów odpowiedzi dla produktu: grass
Wyliczam statystyki czasów odpowiedzi dla produktu: google-maps
Out[39]:
Product Type Value
0 qgis q1 271000.0
1 qgis q25 2786000.0
2 qgis q50 12445000.0
3 arcgis-desktop q1 240000.0
4 arcgis-desktop q25 1898000.0
5 arcgis-desktop q50 7585000.0
6 postgis q1 310000.0
7 postgis q25 3327000.0
8 postgis q50 13774000.0
9 openstreetmap q1 458000.0
10 openstreetmap q25 4740000.0
11 openstreetmap q50 20988000.0
12 grass q1 467000.0
13 grass q25 6632000.0
14 grass q50 34204000.0
15 google-maps q1 531000.0
16 google-maps q25 4614000.0
17 google-maps q50 20855000.0

In [40]:
output_notebook()
plot = Bar(qa_times, values='Value', group='Type', legend='top_center',\
           ylabel='Czasy zaakceptowanych odpowiedzi (kwantyle)', xlabel='Oprogramowanie GIS',\
           label=CatAttr(columns=['Product'], sort=False))
plot.yaxis[0].formatter = DatetimeTickFormatter()
show(plot)


Loading BokehJS ...

In [41]:
print(qa_times.to_latex(index=False))


\begin{tabular}{llr}
\toprule
        Product & Type &       Value \\
\midrule
           qgis &   q1 &    271000.0 \\
           qgis &  q25 &   2786000.0 \\
           qgis &  q50 &  12445000.0 \\
 arcgis-desktop &   q1 &    240000.0 \\
 arcgis-desktop &  q25 &   1898000.0 \\
 arcgis-desktop &  q50 &   7585000.0 \\
        postgis &   q1 &    310000.0 \\
        postgis &  q25 &   3327000.0 \\
        postgis &  q50 &  13774000.0 \\
  openstreetmap &   q1 &    458000.0 \\
  openstreetmap &  q25 &   4740000.0 \\
  openstreetmap &  q50 &  20988000.0 \\
          grass &   q1 &    467000.0 \\
          grass &  q25 &   6632000.0 \\
          grass &  q50 &  34204000.0 \\
    google-maps &   q1 &    531000.0 \\
    google-maps &  q25 &   4614000.0 \\
    google-maps &  q50 &  20855000.0 \\
\bottomrule
\end{tabular}


In [ ]:


Brudnopis :) (próby różnych zapytań, eksploracja, outliers)


In [ ]:


In [ ]:


In [ ]:
joined_timedelta.where((joined_timedelta.TimeDelta > 0) & posts.Tags.rlike(r'\b{0}\b'.format('qgis'))) \
                       .approxQuantile('TimeDelta', quantiles, 0)

In [ ]:
joined_timedelta.where((joined_timedelta.TimeDelta > 0) & posts.Tags.rlike(r'\b{0}\b'.format('qgis')))\
                .sort(joined_timedelta.TimeDelta, ascending=False).collect()

In [47]:
joined_timedelta.where(joined_timedelta.TimeDelta == 0).collect()


Out[47]:
[Row(Id=183721, AcceptedAnswerId=183722, QuestionDate=datetime.datetime(2016, 3, 7, 13, 13, 14, 297000), Tags='postgis postgresql google-maps ', AnswerId=183722, Parent=183721, AnswerDate=datetime.datetime(2016, 3, 7, 13, 13, 14, 297000), TimeDelta=0)]

In [ ]:


In [ ]:
posts.where(posts.Id == 7078).collect()

In [ ]:
post_history.where(post_history.PostId == 7087).collect()

In [ ]:


In [ ]:
# df.groupBy('column').count().rdd.values().histogram()

hist_v = joined_timedelta.groupBy(joined_timedelta.TimeDelta).count().rdd.values().histogram(40)

In [ ]:
count_timedelta = joined_timedelta.groupBy(joined_timedelta.TimeDelta).count() \
                         .select(col('TimeDelta'), col('count').alias('Count'))

In [ ]:
count_timedelta.toPandas()

In [ ]:


In [ ]:


In [ ]:


In [ ]:
hist_v.printSchema()

In [ ]:
hist_v.where(hist_v.Count > 1).sort('Count', ascending=False).collect()

In [ ]:
hist_df = pd.DataFrame(list(zip(list(hist_v)[0], \
                       list(hist_v)[1])), \
                       columns=['bin','frequency'])

In [ ]:
hist_df

In [ ]:
output_notebook()

plot = Bar(hist_df, 'bin', values='frequency', \
           title="Histogram", legend=False, color="darkviolet", \
           ylabel='Freq', xlabel='Bin')
show(plot)

In [ ]:


In [ ]:


In [ ]:
# .min() -- wyszedł problem ze zmergowanymi postami
# różnica czasu jest ujemna (!!! trzeba odfiltrować merged_posts!!!)
joined_timedelta.select(joined_timedelta.TimeDelta).groupBy().mean().collect()

In [ ]:
joined_timedelta.where(joined_timedelta.TimeDelta == 0).count()

In [ ]:
posts.where(posts.Id == 203380).collect()

In [ ]:
post_history.where(post_history.PostId == 203381).collect()

In [ ]:


In [ ]:
# wszystkie z filtrem merged_posts_ids

# the `in` operator in pyspark is broken (2.2.0)
# https://issues.apache.org/jira/browse/SPARK-19701
# Funkcjonalność usunięta:
# https://github.com/apache/spark/pull/17160
#
# trzeba użyć:
# http://takwatanabe.me/pyspark/generated/generated/sql.functions.array_contains.html

#questions.filter([x for x in merged_posts_ids]).show()

# The ANTI JOIN – all values from table1 where not in table2 ????????????
# http://blog.montmere.com/2010/12/08/the-anti-join-all-values-from-table1-where-not-in-table2/

#.join(answers, questions.Id == answers.Parent) \
#                            .withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))

In [ ]:
joined_timedelta.collect()

In [ ]:
joined_timedelta.select(min(joined_timedelta.TimeDelta)).collect()

In [ ]:
joined_timedelta.where(joined_timedelta.TimeDelta == -110165685).collect()

In [ ]:
posts.where(posts.Id == 124794).collect()

In [ ]:


In [ ]:
# jesli w historii jest 37, 38, to są to zmerdżowane posty
# 37 źródło, 38 cel, 38 jest starszy?

# do obliczen czasu trzeba usunac duplikaty
# ale org posty nie maja odpowiedzi?

In [ ]:
post_links.where(post_links.RelatedPostId == 124794).collect()

In [ ]:
post_links.where(post_links.PostId == 43830).collect()

In [ ]:
answers.where(answers.Id == 194).collect()