In [2]:
%load_ext autoreload
%autoreload 2
%matplotlib notebook
import sys
sys.path.append("../../..")
In [3]:
import asyncio
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import ipaddress
import os.path
import matplotlib
import networkx as nx
matplotlib.rcParams['figure.figsize'] = (9.5, 8)
EXECUTOR = ThreadPoolExecutor()
LOOP = asyncio.new_event_loop()
EXECUTOR.submit(LOOP.run_forever)
Out[3]:
In [37]:
from netanalysis.dns.analysis.analysis_app import AnalysisApp
app = AnalysisApp(os.path.join(os.getcwd(), "..", "..", "..", "ooni_data"))
In [38]:
domain_app = app.domain_app("voluntariosxvenezuela.com")
print("Graph nodes: %s" % ", ".join(sorted(domain_app.domain_graph.nodes)))
print("Parent domains: %s" % (set(nx.dfs_preorder_nodes(app.dns_graph.reverse(copy=False), domain_app.domain)) - set([domain_app.domain])))
In [39]:
domain_app.display_graph()
In [40]:
asyncio.run_coroutine_threadsafe(domain_app.tls_verify_unknowns(), LOOP).result()
domain_app.display_graph()
In [41]:
from netanalysis.dns.analysis import classifier as dc
def analyze_measurements(domain: str, domain_graph, class_graph, results=None):
measurements = dict() # type: Dict[str, dns.DnsMeasurement]
for u, v, data in domain_graph.edges(data=True):
measurement = data.get("measurement")
if not measurement:
continue
if measurement.trust_reason:
continue
if measurement.records[0].name.lower() != domain:
continue
measurements[measurement.measurement_id] = measurement
for measurement in sorted(measurements.values(), key=lambda m: m.time):
measurement_eval = dc.Evaluation(dc.EdgeClass.UNKNOWN, None)
ips = []
for record in measurement.records:
try:
ip = record.data.ip
ips.append(ip)
except AttributeError:
continue
for ip in ips:
net = str(ipaddress.ip_network(ip).supernet(new_prefix=24))
evaluation = class_graph[domain][net]["eval"]
if evaluation.classification != dc.EdgeClass.UNKNOWN:
measurement_eval = evaluation
break
if not results or measurement_eval.classification in results:
print("{} {} {}, IPs: [{}] (DNS: {})".format(
measurement.time,
measurement.client_country,
measurement_eval.classification.name,
", ".join(str(ip) for ip in ips),
measurement.resolver_ip
))
analyze_measurements(domain_app.domain,
domain_app.domain_graph,
domain_app.classifier.class_graph,
results=[dc.EdgeClass.UNKNOWN, dc.EdgeClass.BAD])
In [42]:
domain_app.display_graph(country="VE")
In [31]:
from IPython.display import display
import ipywidgets as widgets
from netanalysis.dns.analysis import ip_info_widget
from netanalysis.ip import ip_info as ii
ip_info = ii.create_default_ip_info_service()
ip_tab = ip_info_widget.create_ip_info_widget(ip_info)
tabs = widgets.Tab()
tabs.set_title(0, "IP Info")
tabs.children = [ip_tab]
tabs
In [ ]: