Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ihr/disco_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ def worker_task(resp, *args, **kwargs):

class Disconnect():
def __init__(self, start=None, end=None, streamnames=None, af=4, session=None,
cache=True, cache_dir="cache/",
url='https://ihr.iijlab.net/ihr/api/disco/events/',
nb_threads=2):
cache=True, cache_dir="cache/",
url='https://ihr.iijlab.net/ihr/api/disco/events/',
nb_threads=2):
"""
:originasn: Origin ASN of interest. It can be a list of ASNs or a single
int value. Set to 0 for global hegemony.
Expand Down Expand Up @@ -149,7 +149,7 @@ def get_results(self):

if self.cache and len(all_results) > 0 and len(all_results[0]):
logging.info("caching results to disk")
json.dump(all_results, open(cache_fname, "w"))
json.dump(all_results, open(cache_fname, "w"),indent=4) #added indentation


if __name__ == "__main__":
Expand All @@ -159,4 +159,4 @@ def get_results(self):
res = Disconnect(streamnames='MX', start="2017-03-02T14:28:07", end="2017-03-03T14:28:07").get_results()

for r in res:
print(r)
print(json.dumps(r[0],indent=4))#added indentation
164 changes: 74 additions & 90 deletions ihr/hegemony.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import ujson as json
import math
from simplejson.errors import JSONDecodeError
from collections import defaultdict
import arrow
from requests_futures.sessions import FuturesSession

Expand All @@ -17,12 +16,12 @@ def worker_task(resp, *args, **kwargs):
resp.data = {}


class Hegemony():
class Hegemony:

def __init__(self, start, end, originasns=None, asns=None, af=4, session=None,
cache=True, cache_dir="cache/",
url='https://ihr.iijlab.net/ihr/api/hegemony/',
nb_threads=2):
def __init__(self, start, end, originasns=None, asns=None, af=4, session=None,
cache=True, cache_dir="cache/",
url='https://ihr.iijlab.net/ihr/api/hegemony/',
nb_threads=2):
"""
:originasn: Origin ASN of interest. It can be a list of ASNs or a single
int value. Set to 0 for global hegemony.
Expand All @@ -34,23 +33,21 @@ def __init__(self, start, end, originasns=None, asns=None, af=4, session=None,
:session: Requests session to use
:page: Page number for paginated results.
:cache: Set to False to ignore cache

:cache_dir: Directory used for cached results.
:url: API root url
:nb_threads: Maximum number of parallel downloads

Notes: By default results are cached on disk.
"""
if isinstance(asns, int):
asns = [asns]
elif asns is None:
asns = []

if isinstance(originasns, int):
originasns = [originasns]
elif originasns is None:
originasns = [None]
originasns = []

if isinstance(asns, int):
asns = [asns]
elif asns is None:
asns = [None]

self.originasns = set(originasns)
self.asns = set(asns)
Expand All @@ -64,16 +61,13 @@ def __init__(self, start, end, originasns=None, asns=None, af=4, session=None,
else:
self.session = session


self.url = url
self.cache_dir = cache_dir
if not os.path.exists(cache_dir):
os.mkdir(cache_dir)
self.params = {}
self.queries = defaultdict(list)


def query_api(self, originasn, asn, page):
def query_api(self, originasns, asns, page):
"""Single API query. Don't call this method, use get_results instead."""

params = dict(
Expand All @@ -83,100 +77,90 @@ def query_api(self, originasn, asn, page):
page=page,
format="json"
)

# add asn and originasn parameters to the query parameters if asns and originasns are provided respectively.

if asn is not None:
params["asn"]=asn
if asns:
params["asn"] = ",".join(map(str, asns))

if originasn is not None:
params["originasn"]=originasn
if originasns:
params["originasn"] = ",".join(map(str, originasns))


if originasn is None and asn is None:
if originasns is None and asns is None:
logging.error("You should give at least a origin ASN or an ASN.")
return None
self.params = params
logging.info("query results for {}, page={}".format((originasn,asn), page))
logging.info("query results for {}, page={}".format((originasns,asns), page))

'''This sends the API request asynchronously using FuturesSession.get method and specifies
worker_task as a hook to process the response in the background.'''

return self.session.get(
url=self.url, params=params,
hooks={'response': worker_task, }
)





def get_results(self):
"""Fetch AS dependencies (aka AS hegemony) results.

Return AS dependencies for the given origin AS between the start and
end dates.

:returns: Dictionary of AS dependencies.

"""

# Main loop
queries = {}

# Query the API
for originasn in self.originasns:
for asn in self.asns:
# Skip the query if we have the corresponding cache
cache_fname = "{}/hege_originasn{}_start{}_end{}_asn{}_af{}.json".format(
self.cache_dir, originasn, self.start, self.end, asn, self.af)
if self.cache and os.path.exists(cache_fname):
continue

queries[(originasn, asn)] = self.query_api(originasn, asn, 1)

# Fetch the results
for originasn in self.originasns:
for asn in self.asns:
cache_fname = "{}/hege_originasn{}_start{}_end{}_asn{}_af{}.json".format(
self.cache_dir, originasn, self.start, self.end, asn, self.af)

if self.cache and os.path.exists(cache_fname):
# get results from cache
logging.info("Get results from cache")
for res in json.load(open(cache_fname, "r")):
yield res

else:
# fetch results
all_results = []
resp = queries[(originasn,asn)].result()
logging.info("got results for {}".format((originasn,asn)))
"""Fetch AS dependencies results."""
# Skip the query if we have the corresponding cache
cache_fname = "{}/hege_originasns{}_start{}_end{}_asns{}_af{}.json".format(
self.cache_dir, "_".join(map(str, self.originasns)), self.start, self.end,
"_".join(map(str, self.asns)), self.af
)
#constructs the cache file name based on provided parameters.
if self.cache and os.path.exists(cache_fname):
# get results from cache
logging.info("Get results from cache")
with open(cache_fname, "r") as cache_file:
for res in json.load(cache_file):
yield res
else:
all_results=[]
resp = self.query_api(self.originasns,self.asns, 1).result()
if resp.ok and "results" in resp.data and len(resp.data["results"])>0:
yield resp.data["results"]
all_results.append(resp.data["results"])
else:
logging.warning("No hegemony results for origin AS={}, AS={}".format(self.originasns, self.asns))

# if results are incomplete get the other pages
if resp.data.get("next") :
nb_pages = math.ceil(resp.data["count"]/len(resp.data["results"]))
pages_queries = []
logging.info("{} more pages to query".format(nb_pages))
for p in range(2,int(nb_pages)+1):
pages_queries.append(self.query_api(self.originasns, self.asns, p))

for i, page_resp in enumerate(pages_queries):
resp= page_resp.result()
if resp.ok and "results" in resp.data and len(resp.data["results"])>0:
yield resp.data["results"]
all_results.append(resp.data["results"])
else:
logging.warning("No hegemony results for origin AS={}, AS={}".format(originasn, asn))

# if results are incomplete get the other pages
if resp.data.get("next") :
nb_pages = math.ceil(resp.data["count"]/len(resp.data["results"]))
pages_queries = []
logging.info("{} more pages to query".format(nb_pages))
for p in range(2,int(nb_pages)+1):
pages_queries.append(self.query_api(originasn, asn, p))

for i, page_resp in enumerate(pages_queries):
resp= page_resp.result()
if resp.ok and "results" in resp.data and len(resp.data["results"])>0:
yield resp.data["results"]
all_results.append(resp.data["results"])
else:
logging.warning("No hegemony results for origin AS={}, AS={}, page={}".format(originasn, asn, i+2))

if self.cache and len(all_results)>0 and len(all_results[0]) :
logging.info("caching results to disk")
json.dump(all_results, open(cache_fname, "w"))
logging.warning("No hegemony results for origin AS={}, AS={}, page={}".format(self.originasns, self.asns, i+2))

'''
If caching is disabled or the cache file doesn't exist, it queries the API for results.
If successful, it yields the results and appends them to all_results, otherwise,
it logs a warning message.
'''
if self.cache and len(all_results)>0 and len(all_results[0]) :
logging.info("caching results to disk")
json.dump(all_results, open(cache_fname, "w"),indent=4) #added indentation


if __name__ == "__main__":
FORMAT = '%(asctime)s %(processName)s %(message)s'
logging.basicConfig(format=FORMAT, filename="hegemony.log", level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
res = Hegemony(
originasns=[2907, 7922], start="2018-09-15", end="2018-09-16"
originasns=[2907, 7922], start="2019-09-15", end="2019-09-16"
).get_results()

json_data=" "
for r in res:
print(r)
print(json.dumps(r,indent=4)) #added indentation



Loading