In [1]:
import re
import ast
import pyspark
import time
import datetime
import html
from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
#visualisation
from bokeh.plotting import figure, show, output_file, output_notebook
from bokeh.charts import Histogram, Scatter, Bar, TimeSeries, BoxPlot, show, \
output_file, output_notebook, color, marker, defaults
from bokeh.charts.attributes import ColorAttr, CatAttr
from bokeh.layouts import column
from bokeh.models import NumeralTickFormatter, DatetimeTickFormatter
import pandas as pd
In [2]:
sc = SparkContext()
sqlContext = SQLContext(sc)
In [3]:
# Domyślny rozmiar wykresów Bokeh:
defaults.width = 950
defaults.height = 600
Dane z archiwum forum GIS Stack Exchange (https://archive.org/download/stackexchange).
Funkcja pomocnicza attribute_search(attribute, string) jest wykorzystywana podczas wczytywania wierszy XML do obiektu Row pyspark. Podajemy 'attribute' jako nazwę atrybutu XML, którego wartość chcemy otrzymać oraz 'string' -- wiersz tekstu pobrany z pliku XML.
In [4]:
# Funkcja pomocnicza: wyszukiwanie podanego atrybutu w linii pliku XML,
# zwraca wartość znalezionego atrybutu w string lub None jeśli atrybut nie istnieje
def attribute_search(attribute, my_string):
result = re.search(attribute + '=\"(.*?)\"', my_string)
if result:
return result.group(1).replace('"', '')
else:
return None
In [5]:
# Funkcje pomocnicze: interpretacja plików XML (różne schematy danych)
# ze Stackoverflow
def tags_from_xml(line):
c = line.replace('<row', '').replace('/>', '')
row = dict()
row['Id'] = int(attribute_search('Id', c))
row['TagName'] = attribute_search('TagName', c)
count = attribute_search('Count', c)
row['Count'] = int(count) if count else None
excerpt = attribute_search('ExcerptPostId', c)
row['ExcerptPostId'] = int(excerpt) if excerpt else None
wiki = attribute_search('WikiPostId', c)
row['WikiPostId'] = int(wiki) if wiki else None
return pyspark.Row(**row)
def badges_from_xml(line):
c = line.replace('<row', '').replace('/>', '')
row = dict()
row['Id'] = int(attribute_search('Id', c))
row['UserId'] = int(attribute_search('UserId', c))
row['Name'] = attribute_search('Name', c)
row['Date'] = datetime.datetime.strptime(attribute_search('Date', c), "%Y-%m-%dT%H:%M:%S.%f")
row['Class'] = int(attribute_search('Class', c))
row['TagBased'] = ast.literal_eval(attribute_search('TagBased', c))
return pyspark.Row(**row)
def users_from_xml(line):
c = line.replace('<row', '').replace('/>', '')
row = dict()
row['Id'] = int(attribute_search('Id', c))
row['Reputation'] = int(attribute_search('Reputation', c))
row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
row['DisplayName'] = attribute_search('DisplayName', c)
row['LastAccessDate'] = datetime.datetime.strptime(attribute_search('LastAccessDate', c), "%Y-%m-%dT%H:%M:%S.%f")
row['WebsiteUrl'] = attribute_search('WebsiteUrl', c)
row['Location'] = attribute_search('Location', c)
age = attribute_search('Age', c)
row['Age'] = int(age) if age else None
row['Views'] = int(attribute_search('Views', c))
row['UpVotes'] = int(attribute_search('UpVotes', c))
row['DownVotes'] = int(attribute_search('DownVotes', c))
return pyspark.Row(**row)
def posts_from_xml(line):
c = line.replace('<row', '').replace('/>', '')
row = dict()
row['Id'] = int(attribute_search('Id', c))
row['PostTypeId'] = int(attribute_search('PostTypeId', c))
found_id = attribute_search('ParentId', c)
row['ParentId'] = int(found_id) if found_id else None
found_id = attribute_search('AcceptedAnswerId', c)
row['AcceptedAnswerId'] = int(found_id) if found_id else None
row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
row['Score'] = int(attribute_search('Score', c))
vc = attribute_search('ViewCount', c)
row['ViewCount'] = int(vc) if vc else None
owner = attribute_search('OwnerUserId', c)
row['OwnerUserId'] = int(owner) if owner else None
lasted = attribute_search('LastEditorUserId', c)
row['LastEditorUserId'] = int(lasted) if lasted else None
row['Body'] = re.sub('(<!--.*?-->|<[^>]*>)', '', html.unescape(attribute_search('Body', c)))
title = attribute_search('Title', c)
row['Title'] = title if title else None
tags = attribute_search('Tags', c)
row['Tags'] = html.unescape(tags).replace('<', '').replace('>', ' ') if tags else None
date = attribute_search('ClosedDate', c)
row['ClosedDate'] = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f") if date else None
count = attribute_search('AnswerCount', c)
row['AnswerCount'] = int(count) if count else None
count = attribute_search('CommentCount', c)
row['CommentCount'] = int(count) if count else None
count = attribute_search('FavoriteCount', c)
row['FavoriteCount'] = int(count) if count else None
return pyspark.Row(**row)
def comments_from_xml(line):
c = line.replace('<row', '').replace('/>', '')
row = dict()
row['Id'] = int(attribute_search('Id', c))
row['PostId'] = int(attribute_search('PostId', c))
row['Score'] = int(attribute_search('Score', c))
row['Text'] = re.sub('(<!--.*?-->|<[^>]*>)', '', html.unescape(attribute_search('Text', c)))
row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
user = attribute_search('UserId', c)
row['UserId'] = int(user) if user else None
return pyspark.Row(**row)
def post_history_from_xml(line):
c = line.replace('<row', '').replace('/>', '')
row = dict()
row['Id'] = int(attribute_search('Id', c))
row['PostHistoryTypeId'] = int(attribute_search('PostHistoryTypeId', c))
row['PostId'] = int(attribute_search('PostId', c))
comm = attribute_search('Comment', c)
row['Comment'] = comm if comm else None
text = attribute_search('Text', c)
row['Text'] = re.sub('(<!--.*?-->|<[^>]*>)', '', html.unescape(text)) if text else None
return pyspark.Row(**row)
def post_links_from_xml(line):
c = line.replace('<row', '').replace('/>', '')
row = dict()
row['Id'] = int(attribute_search('Id', c))
row['CreationDate'] = datetime.datetime.strptime(attribute_search('CreationDate', c), "%Y-%m-%dT%H:%M:%S.%f")
row['PostId'] = int(attribute_search('PostId', c))
row['RelatedPostId'] = int(attribute_search('RelatedPostId', c))
row['LinkTypeId'] = int(attribute_search('LinkTypeId', c))
return pyspark.Row(**row)
In [6]:
# Wczytanie danych do RDD z plików XML, a następnie konwersja RDD do DF:
xml_load_path = 'file:///home/marek/Dokumenty/Notebooks/bd/gis_stack_spark/data/'
# Słownik: nazwa pliku i odpowiadająca mu funkcja pomocnicza interpretująca
# schemat danych w pliku XML
xml_load_list = {'Tags.xml': tags_from_xml, 'Badges.xml': badges_from_xml, \
'Users.xml': users_from_xml,'Posts.xml': posts_from_xml, \
'Comments.xml': comments_from_xml,'PostHistory.xml': post_history_from_xml, \
'PostLinks.xml': post_links_from_xml}
tags_rdd = sc.textFile(xml_load_path + 'Tags.xml').filter(lambda line: "row" in line) \
.map(lambda l: xml_load_list['Tags.xml'](l))
badges_rdd = sc.textFile(xml_load_path + 'Badges.xml').filter(lambda line: "row" in line) \
.map(lambda l: xml_load_list['Badges.xml'](l))
users_rdd = sc.textFile(xml_load_path + 'Users.xml').filter(lambda line: "row" in line) \
.map(lambda l: xml_load_list['Users.xml'](l))
posts_rdd = sc.textFile(xml_load_path + 'Posts.xml').filter(lambda line: "row" in line) \
.map(lambda l: xml_load_list['Posts.xml'](l))
comments_rdd = sc.textFile(xml_load_path + 'Comments.xml').filter(lambda line: "row" in line) \
.map(lambda l: xml_load_list['Comments.xml'](l))
post_history_rdd = sc.textFile(xml_load_path + 'PostHistory.xml').filter(lambda line: "row" in line) \
.map(lambda l: xml_load_list['PostHistory.xml'](l))
post_links_rdd = sc.textFile(xml_load_path + 'PostLinks.xml').filter(lambda line: "row" in line) \
.map(lambda l: xml_load_list['PostLinks.xml'](l))
# Konwersja na DataFrame:
users = sqlContext.createDataFrame(users_rdd)
badges = sqlContext.createDataFrame(badges_rdd)
posts = sqlContext.createDataFrame(posts_rdd)
tags = sqlContext.createDataFrame(tags_rdd)
comments = sqlContext.createDataFrame(comments_rdd)
post_history = sqlContext.createDataFrame(post_history_rdd)
post_links = sqlContext.createDataFrame(post_links_rdd)
Ze zbioru postów należy usunąć posty: scalone (merged) oraz duplikaty (duplicate). Scalone posty mają często ujemne czasy odpowiedzi, duplikaty -- bardzo długie czasy odpowiedzi (jeśli np. nowy post jest duplikatem starego postu).
Posty mające w historii (tabela post_history): PostHistoryTypeId:
Przykłady problematycznych postów:
Scalone posty miewają zamienione oryginalne pytanie z późniejszym, co sprawia, że czas odpowiedzi jest przed czasem zadania pytania.
Przykład scalonych postów (ujemny wyliczony czas odpowiedzi): https://gis.stackexchange.com/questions/124794/installing-qgis-with-ecw-support-on-mac
Uwaga: w dumpie z 14 marca 2017 ma 2 dodatkowe linki (scalenia), obecnie (maj 2017) są 3 linki (scalenia).
Przykład wpisu z czasem odpowiedzi = 0 sekund: https://gis.stackexchange.com/questions/203380/how-to-set-up-python-arcpy-with-arcgis-pro-1-3
In [7]:
# ze zbioru postów należy usunąć posty: zmerdżowane (merged)
# oraz duplikaty (duplicate). Merged mają często ujemne czasy
# odpowiedzi, duplikaty -- bardzo długie czasy (jeśli np. nowy
# post jest duplikatem starego postu)
merged_posts_ids = post_history.select(post_history.PostId) \
.where((post_history.PostHistoryTypeId == 37) | \
(post_history.PostHistoryTypeId == 38))
duplicate_posts_ids = post_links.select(post_links.PostId).where(post_links.LinkTypeId == 3)
remove_ids = merged_posts_ids.union(duplicate_posts_ids).distinct()
remove_ids.count()
Out[7]:
In [8]:
# anti_join using a dataframe:
# tbl1.as("a").join(tbl2.as("b"), $"a.id" === $"b.id", "left_anti")
# This PR provides serves as the basis for implementing `NOT EXISTS` and `NOT IN
# (...)` correlated sub-queries. It would also serve as good basis for
# implementing an more efficient `EXCEPT` operator.
# nadpisujemy 'posts' za pomocą DF z usuniętymi:
# - duplikatami postów
# - postami scalonymi (merged)
# Id tych postów są w remove_ids
posts = posts.join(remove_ids, posts.Id == remove_ids.PostId, 'left_anti')
In [9]:
posts.count()
Out[9]:
In [10]:
users_by_posts = posts.groupBy('OwnerUserId').count()\
.select(col('OwnerUserId').alias('user'), col('count').alias('number_posts'))\
.groupBy('number_posts').count()\
.select(col('number_posts'), col('count').alias('number_users'))\
.orderBy('number_posts')
In [11]:
# Wersja z RDD i mapowaniem z lambdą:
# x = users_by_posts.rdd.map(lambda x: x.number_posts).collect()
# y = users_by_posts.rdd.map(lambda y: y.number_users).collect()
# lub z konwersją do pandas dataframe, a następnie do listy:
x = users_by_posts.toPandas()['number_posts'].tolist()
y = users_by_posts.toPandas()['number_users'].tolist()
Użycie Bokeh do wyświetlenia danych o użytkownikach i napisanych postach:
In [12]:
output_notebook()
plot = figure(plot_width=950, plot_height=600,\
x_axis_type="log", y_axis_type="log",\
x_axis_label="Liczba postów", y_axis_label="Liczba użytkowników")
plot.circle(x, y, size=6, color="green", alpha=0.4)
show(plot)
In [13]:
popular_tags = tags.select(tags.TagName, tags.Count)\
.sort(tags.Count, ascending=False).toPandas()
In [14]:
#.to_latex(column_format='l|r|r', decimal=','))
popular_tags.head(20)
Out[14]:
In [15]:
output_notebook()
plot = Bar(popular_tags.head(20), values='Count', label=CatAttr(columns=['TagName'], sort=False), \
title="Najpopularniejsze tagi na forum GIS", legend=False, color="darkviolet", \
ylabel='Liczba postów', xlabel='Tag (nazwa)')
show(plot)
In [16]:
questions_vs_answers = posts.select(col('Id'), col('AnswerCount')).where(posts.PostTypeId == 1)\
.groupBy('AnswerCount').count()\
.select(col('AnswerCount').alias('answers'), col('count').alias('number_questions'))\
.orderBy('answers')
total_questions = posts.select(posts.Id).where(posts.PostTypeId == 1).count()
total_answers = posts.select(posts.Id).where(posts.PostTypeId == 2).count()
unanswered_questions = questions_vs_answers.select(col('number_questions')) \
.where(questions_vs_answers.answers == 0) \
.collect()[0].number_questions
In [17]:
print("Całkowita liczba postów (bez duplikatów): {}".format(posts.count()))
print("Liczba pytań na forum GIS: {}".format(total_questions))
print("Liczba odpowiedzi na forum GIS: {}".format(total_answers))
print("Liczba pytań bez odpowiedzi: {}".format(unanswered_questions))
print("Procent pytań bez odpowiedzi: {:.2%}".format(unanswered_questions/total_questions))
Inny typ wpisu (wiki -- nie jest to ani pytanie, ani odpowiedź):
In [18]:
posts.select(posts.Id).where(posts.PostTypeId == 4).count()
Out[18]:
Aktywność community GIS: Number of Questions vs Answers
In [19]:
output_notebook()
plot = Bar(questions_vs_answers.toPandas().head(9), 'answers', values='number_questions', \
title="Odpowiedzi vs liczba pytań", legend=False, color="navy", \
ylabel='Pytania', xlabel='Odpowiedzi')
show(plot)
In [20]:
questions_vs_answers.toPandas().head(9)
Out[20]:
In [22]:
print(questions_vs_answers.toPandas().head(9).to_latex(index=False))
In [23]:
gis_products = ['qgis', 'arcgis-desktop', 'postgis', 'openstreetmap', 'grass', 'google-maps']
In [ ]:
# lub analiza tylko dla produktów ArcGIS (gdyż występują różne wersje w tagach)
#gis_products = ['arcgis-desktop', 'arcgis-server', 'arcgis-10.0', \
# 'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4', 'arcgis-10.5']
gis_products = ['arcgis-10.0', \
'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4', 'arcgis-10.5']
In [24]:
# Wyliczamy totals (dla wszystkich postow) w gis_posts_data.
# Dzięki temu mamy DF do którego potem doklejamy kolumny
# z wynikami dla innych tagów.
print('Wyliczam statystyki dla wszystkich postów.')
gis_posts_data = posts.filter(posts.Tags.isNotNull()).select(concat_ws('.', year("CreationDate"), \
lpad(month("CreationDate"), 2, "0")).alias('Months')) \
.groupBy('Months').count().sort('Months').toPandas()
gis_posts_data.rename(columns={'count': 'wszystkie'}, inplace=True)
for gis_p in gis_products:
print('Wyliczam posty dla produktu: {0}'.format(gis_p))
product_posts = posts.filter(posts.Tags.isNotNull() & posts.Tags.rlike(r'\b{0}\b'.format(gis_p))) \
.select(concat_ws('.', year("CreationDate"), lpad(month("CreationDate"), 2, "0")).alias('Months')) \
.groupBy('Months').count().sort('Months').toPandas()
product_posts.rename(columns={'count': gis_p}, inplace=True)
gis_posts_data = pd.merge(gis_posts_data, product_posts, on='Months', how='outer')
print('Gotowe.')
In [25]:
# Opcjonalnie: jako nowe kolumny
# Obliczanie % postow dla produktu ws totals
# np. dla produktu qgis obliczamy qgis_p jako
# (qgis / wszystkie) * 100%
for gis_p in gis_products:
gis_posts_data['%' + gis_p] = (gis_posts_data[gis_p].fillna(0) \
/ gis_posts_data.wszystkie) * 100
In [26]:
# Pokaż dane:
gis_posts_data
Out[26]:
Wyświetl trend czasowy -- liczba postów na miesiąc:
In [ ]:
# Niewygładzony
output_notebook()
tsline = TimeSeries(gis_posts_data, x='Months', y=gis_products, \
title="Posty na miesiąc (liczba)", xlabel='Data (miesiące)', ylabel='Produkty GIS', legend=True)
show(column(tsline))
In [27]:
# Trend czasowy wygładzony -- posty na miesiąc
output_notebook()
tsline = TimeSeries(gis_posts_data.rolling(on='Months', window=4).mean(), x='Months', y=gis_products,\
title="Posty na miesiąc (liczba), wygładzone", \
xlabel='Data (miesiące)', ylabel='Produkty GIS', legend=True)
show(column(tsline))
In [28]:
# Trend czasowy -- posty na miesiąc w % do całości
output_notebook()
tsline = TimeSeries(gis_posts_data.rolling(on='Months', window=4).mean(), x='Months',\
y=['%{0}'.format(i) for i in gis_products],\
title="Posty na miesiąc (%całości)",\
xlabel='Data (miesiące)', ylabel='Produkty GIS', legend=True)
show(column(tsline))
In [29]:
gis_products = ['qgis', 'arcgis-desktop', 'postgis', 'openstreetmap', 'grass', 'google-maps']
#gis_products = ['arcgis-desktop', 'arcgis-server', 'arcgis-10.0', \
# 'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4']
#gis_products = ['arcgis-10.0', \
# 'arcgis-10.1', 'arcgis-10.2', 'arcgis-10.3', 'arcgis-10.4']
Funkcja UDF Sparka do obliczenia różnicy czasu (w sekundach):
In [30]:
# x, y -- parametry typu datetime
def time_delta(x, y):
delta = int((y - x).total_seconds())
return delta
time_delta_udf = udf(time_delta, returnType = IntegerType())
Utworzenie dwóch DF ('questions' i 'answers') będących podziałem 'posts' na pytania i odpowiedzi z przepisaniem kolumn potrzebnych do wyliczenia różnicy czasu pytanie-odpowiedź.
In [31]:
# Czas odpowiedzi
# posts -- PostTypeId = 1 (Question), 2 (Answer)
questions = posts.select(posts.Id, posts.AcceptedAnswerId,\
posts.CreationDate.alias('QuestionDate'),\
posts.Tags).where(posts.PostTypeId == 1)
# alias('Parent') to obejście problemu
# org.apache.spark.sql.AnalysisException:
# resolved attribute(s) ID missing from ID in operator !Join
# https://issues.apache.org/jira/browse/SPARK-10925
answers = posts.select(posts.Id.alias('AnswerId'), posts.ParentId.alias('Parent'),\
posts.CreationDate.alias('AnswerDate')).where(posts.PostTypeId == 2)
In [32]:
# Weryfikacja schematu DF:
questions.printSchema()
answers.printSchema()
In [33]:
posts.printSchema()
In [34]:
# statystyki Q&A dla całości forum GIS (tylko z tabeli 'posts'):
#print('Wyliczam statystyki Q&A dla całego forum.')
qa_stats = pd.DataFrame(columns=['Product', 'Value', 'Type'])
#qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': 'GIS',\
# 'Value': posts.where(posts.PostTypeId == 1).count(),\
# 'Type': 'Questions'}]), ignore_index=True)
#qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': 'GIS',\
# 'Value': posts.where(posts.PostTypeId == 2).count(),\
# 'Type': 'Answers'}]), ignore_index=True)
# dla produktów:
for gis_p in gis_products:
print('Wyliczam statystyki Q&A dla produktu: {0}'.format(gis_p))
prod_ques = posts.select(posts.Id, posts.AnswerCount, posts.Tags)\
.where((posts.PostTypeId == 1) & posts.Tags.rlike(r'\b{0}\b'.format(gis_p)))
prod_ans = prod_ques.groupBy()\
.sum('AnswerCount')\
.select(col('sum(AnswerCount)').alias('Answers'))
qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': gis_p,\
'Value': prod_ques.count(),\
'Type': 'Questions'}]), ignore_index=True)
qa_stats = qa_stats.append(pd.DataFrame.from_dict([{'Product': gis_p,\
'Value': prod_ans.collect()[0].Answers,\
'Type': 'Answers'}]), ignore_index=True)
print('Gotowe.')
qa_stats
Out[34]:
In [35]:
print(qa_stats.to_latex(column_format='{l|l|l|r}', decimal=',', index=False))
In [36]:
output_notebook()
plot = Bar(qa_stats, values='Value', group='Type', legend='top_center',\
ylabel='Liczba pytań lub odpowiedzi', xlabel='Oprogramowanie GIS',\
label=CatAttr(columns=['Product'], sort=False))
show(plot)
In [ ]:
Definicje kwantyli.
Dodatkowo należy wykonać jednego z wybranych joinów (wygenerować DF 'joined_timedelta') w zależności od żądanego zakresu analizy. W pracy użyta została wersja "dla zaakceptowanych odpowiedzi (i żadnych innych) i wskazanego produktu z tagów".
In [37]:
quantiles = [0.0, 0.01, 0.25, 0.5, 0.75, 1.0]
qlabels = ["q"+str(int(q * 100)) for q in quantiles]
qlabels
Out[37]:
Przykładowe joiny użyte poniżej we właściwej pętli do wyliczenia różnicy czasu dla zbioru produktów:
In [ ]:
# wszystkie z odpowiedziami (nie tylko zaakceptowanymi)
joined_timedelta = questions.join(answers_nm, questions.Id == answers.Parent) \
.withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))
In [ ]:
# tylko z zaakceptowanymi odpowiedziami (ale uwzględnia pozostałe odpowiedzi dla pytania)
joined_timedelta = \
questions.join(answers, questions.Id == answers.Parent) \
.filter(col('AcceptedAnswerId').isNotNull()) \
.withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))
Join tylko dla zaakceptowanych odpowiedzi (i żadnych innych) i wskazanego produktu z tagów:
In [ ]:
# tylko dla zaakceptowanych odpowiedzi (i żadnych innych)
joined_timedelta = \
questions.join(answers, questions.Id == answers.Parent) \
.filter(col('AcceptedAnswerId').isNotNull() & (col('AcceptedAnswerId') == col('AnswerId'))) \
.withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))
In [ ]:
joined_timedelta.printSchema()
Przykładowe obliczenie kwantyli dla czasów odpowiedzi:
In [ ]:
# 0.0, 0.01, 0.25, 0.5, 0.75, 1.0
qvals = joined_timedelta.where((joined_timedelta.TimeDelta > 0))\
.approxQuantile('TimeDelta', quantiles, 0)
qvals
In [39]:
# czas tylko zaakceptowanej odpowiedzi per gis_product
# str(datetime.timedelta(seconds=13600)) -- format 'human readable'
qa_times = pd.DataFrame(columns=['Product', 'Value', 'Type'])
for gis_p in gis_products:
print('Wyliczam statystyki czasów odpowiedzi dla produktu: {0}'.format(gis_p))
joined_timedelta = \
questions.join(answers, questions.Id == answers.Parent) \
.filter(col('AcceptedAnswerId').isNotNull() & \
(col('AcceptedAnswerId') == col('AnswerId')) & \
posts.Tags.rlike(r'\b{0}\b'.format(gis_p))) \
.withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))
# quantiles[] = 0.0, 0.01, 0.25, 0.5, 0.75, 1.0
qvals = joined_timedelta.where((joined_timedelta.TimeDelta > 0))\
.approxQuantile('TimeDelta', quantiles, 0)
# wstawienie wyników do DF (tylko q1, q25 i q50):
for i in [1, 2, 3]:
qa_times = qa_times.append(pd.DataFrame.from_dict([{'Product': gis_p,\
'Value': qvals[i]*1000,\
'Type': qlabels[i]}]), ignore_index=True)
qa_times
Out[39]:
In [40]:
output_notebook()
plot = Bar(qa_times, values='Value', group='Type', legend='top_center',\
ylabel='Czasy zaakceptowanych odpowiedzi (kwantyle)', xlabel='Oprogramowanie GIS',\
label=CatAttr(columns=['Product'], sort=False))
plot.yaxis[0].formatter = DatetimeTickFormatter()
show(plot)
In [41]:
print(qa_times.to_latex(index=False))
In [ ]:
In [ ]:
In [ ]:
In [ ]:
joined_timedelta.where((joined_timedelta.TimeDelta > 0) & posts.Tags.rlike(r'\b{0}\b'.format('qgis'))) \
.approxQuantile('TimeDelta', quantiles, 0)
In [ ]:
joined_timedelta.where((joined_timedelta.TimeDelta > 0) & posts.Tags.rlike(r'\b{0}\b'.format('qgis')))\
.sort(joined_timedelta.TimeDelta, ascending=False).collect()
In [47]:
joined_timedelta.where(joined_timedelta.TimeDelta == 0).collect()
Out[47]:
In [ ]:
In [ ]:
posts.where(posts.Id == 7078).collect()
In [ ]:
post_history.where(post_history.PostId == 7087).collect()
In [ ]:
In [ ]:
# df.groupBy('column').count().rdd.values().histogram()
hist_v = joined_timedelta.groupBy(joined_timedelta.TimeDelta).count().rdd.values().histogram(40)
In [ ]:
count_timedelta = joined_timedelta.groupBy(joined_timedelta.TimeDelta).count() \
.select(col('TimeDelta'), col('count').alias('Count'))
In [ ]:
count_timedelta.toPandas()
In [ ]:
In [ ]:
In [ ]:
In [ ]:
hist_v.printSchema()
In [ ]:
hist_v.where(hist_v.Count > 1).sort('Count', ascending=False).collect()
In [ ]:
hist_df = pd.DataFrame(list(zip(list(hist_v)[0], \
list(hist_v)[1])), \
columns=['bin','frequency'])
In [ ]:
hist_df
In [ ]:
output_notebook()
plot = Bar(hist_df, 'bin', values='frequency', \
title="Histogram", legend=False, color="darkviolet", \
ylabel='Freq', xlabel='Bin')
show(plot)
In [ ]:
In [ ]:
In [ ]:
# .min() -- wyszedł problem ze zmergowanymi postami
# różnica czasu jest ujemna (!!! trzeba odfiltrować merged_posts!!!)
joined_timedelta.select(joined_timedelta.TimeDelta).groupBy().mean().collect()
In [ ]:
joined_timedelta.where(joined_timedelta.TimeDelta == 0).count()
In [ ]:
posts.where(posts.Id == 203380).collect()
In [ ]:
post_history.where(post_history.PostId == 203381).collect()
In [ ]:
In [ ]:
# wszystkie z filtrem merged_posts_ids
# the `in` operator in pyspark is broken (2.2.0)
# https://issues.apache.org/jira/browse/SPARK-19701
# Funkcjonalność usunięta:
# https://github.com/apache/spark/pull/17160
#
# trzeba użyć:
# http://takwatanabe.me/pyspark/generated/generated/sql.functions.array_contains.html
#questions.filter([x for x in merged_posts_ids]).show()
# The ANTI JOIN – all values from table1 where not in table2 ????????????
# http://blog.montmere.com/2010/12/08/the-anti-join-all-values-from-table1-where-not-in-table2/
#.join(answers, questions.Id == answers.Parent) \
# .withColumn('TimeDelta', time_delta_udf(questions.QuestionDate, answers.AnswerDate))
In [ ]:
joined_timedelta.collect()
In [ ]:
joined_timedelta.select(min(joined_timedelta.TimeDelta)).collect()
In [ ]:
joined_timedelta.where(joined_timedelta.TimeDelta == -110165685).collect()
In [ ]:
posts.where(posts.Id == 124794).collect()
In [ ]:
In [ ]:
# jesli w historii jest 37, 38, to są to zmerdżowane posty
# 37 źródło, 38 cel, 38 jest starszy?
# do obliczen czasu trzeba usunac duplikaty
# ale org posty nie maja odpowiedzi?
In [ ]:
post_links.where(post_links.RelatedPostId == 124794).collect()
In [ ]:
post_links.where(post_links.PostId == 43830).collect()
In [ ]:
answers.where(answers.Id == 194).collect()