From 2ae18dcf6e757d60d977f8a1b24ff86cbf574133 Mon Sep 17 00:00:00 2001 From: Rishi Mondal <146999057+MAVRICK-1@users.noreply.github.com> Date: Thu, 1 Feb 2024 05:22:04 +0530 Subject: [PATCH 1/5] reduce the number of queries. --- ihr/hegemony.py | 179 +++++++++++++++++++--------------------------- ihr/link_delay.py | 108 ++++++++++++---------------- 2 files changed, 122 insertions(+), 165 deletions(-) diff --git a/ihr/hegemony.py b/ihr/hegemony.py index afbe5a2..72cb69a 100644 --- a/ihr/hegemony.py +++ b/ihr/hegemony.py @@ -3,7 +3,6 @@ import ujson as json import math from simplejson.errors import JSONDecodeError -from collections import defaultdict import arrow from requests_futures.sessions import FuturesSession @@ -17,40 +16,25 @@ def worker_task(resp, *args, **kwargs): resp.data = {} -class Hegemony(): +class Hegemony: - def __init__(self, start, end, originasns=None, asns=None, af=4, session=None, - cache=True, cache_dir="cache/", - url='https://ihr.iijlab.net/ihr/api/hegemony/', - nb_threads=2): + def __init__(self, start, end, originasns=None, asns=None, af=4, session=None, + cache=True, cache_dir="cache/", + url='https://ihr.iijlab.net/ihr/api/hegemony/', + nb_threads=2): """ - :originasn: Origin ASN of interest. It can be a list of ASNs or a single - int value. Set to 0 for global hegemony. - :start: Start date/time. - :end: End date/time. - :asn: Return dependency only to the given ASNs. By default return all - dependencies. - :af: Adress family, default is IPv4 - :session: Requests session to use - :page: Page number for paginated results. - :cache: Set to False to ignore cache - - :cache_dir: Directory used for cached results. - :url: API root url - :nb_threads: Maximum number of parallel downloads - - Notes: By default results are cached on disk. + Initialize Hegemony object with parameters. """ + if isinstance(asns, int): + asns = [asns] + elif asns is None: + asns = [] if isinstance(originasns, int): originasns = [originasns] elif originasns is None: - originasns = [None] + originasns = [] - if isinstance(asns, int): - asns = [asns] - elif asns is None: - asns = [None] self.originasns = set(originasns) self.asns = set(asns) @@ -64,16 +48,13 @@ def __init__(self, start, end, originasns=None, asns=None, af=4, session=None, else: self.session = session - self.url = url self.cache_dir = cache_dir if not os.path.exists(cache_dir): os.mkdir(cache_dir) - self.params = {} - self.queries = defaultdict(list) - def query_api(self, originasn, asn, page): + def query_api(self, originasns, asns, page): """Single API query. Don't call this method, use get_results instead.""" params = dict( @@ -83,91 +64,78 @@ def query_api(self, originasn, asn, page): page=page, format="json" ) + + # add asn and originasn parameters to the query parameters if asns and originasns are provided respectively. - if asn is not None: - params["asn"]=asn + if asns: + params["asn"] = ",".join(map(str, asns)) - if originasn is not None: - params["originasn"]=originasn + if originasns: + params["originasn"] = ",".join(map(str, originasns)) + - if originasn is None and asn is None: + if originasns is None and asns is None: logging.error("You should give at least a origin ASN or an ASN.") return None self.params = params - logging.info("query results for {}, page={}".format((originasn,asn), page)) + logging.info("query results for {}, page={}".format((originasns,asns), page)) + + '''This sends the API request asynchronously using FuturesSession.get method and specifies + worker_task as a hook to process the response in the background.''' + return self.session.get( url=self.url, params=params, hooks={'response': worker_task, } ) - - - + + def get_results(self): - """Fetch AS dependencies (aka AS hegemony) results. - - Return AS dependencies for the given origin AS between the start and - end dates. - - :returns: Dictionary of AS dependencies. - - """ - - # Main loop - queries = {} - - # Query the API - for originasn in self.originasns: - for asn in self.asns: - # Skip the query if we have the corresponding cache - cache_fname = "{}/hege_originasn{}_start{}_end{}_asn{}_af{}.json".format( - self.cache_dir, originasn, self.start, self.end, asn, self.af) - if self.cache and os.path.exists(cache_fname): - continue - - queries[(originasn, asn)] = self.query_api(originasn, asn, 1) - - # Fetch the results - for originasn in self.originasns: - for asn in self.asns: - cache_fname = "{}/hege_originasn{}_start{}_end{}_asn{}_af{}.json".format( - self.cache_dir, originasn, self.start, self.end, asn, self.af) - - if self.cache and os.path.exists(cache_fname): - # get results from cache - logging.info("Get results from cache") - for res in json.load(open(cache_fname, "r")): - yield res - - else: - # fetch results - all_results = [] - resp = queries[(originasn,asn)].result() - logging.info("got results for {}".format((originasn,asn))) + """Fetch AS dependencies results.""" + # Skip the query if we have the corresponding cache + cache_fname = "{}/hege_originasns{}_start{}_end{}_asns{}_af{}.json".format( + self.cache_dir, "_".join(map(str, self.originasns)), self.start, self.end, + "_".join(map(str, self.asns)), self.af + ) + #constructs the cache file name based on provided parameters. + if self.cache and os.path.exists(cache_fname): + # get results from cache + logging.info("Get results from cache") + with open(cache_fname, "r") as cache_file: + for res in json.load(cache_file): + yield res + else: + all_results=[] + resp = self.query_api(self.originasns,self.asns, 1).result() + if resp.ok and "results" in resp.data and len(resp.data["results"])>0: + yield resp.data["results"] + all_results.append(resp.data["results"]) + else: + logging.warning("No hegemony results for origin AS={}, AS={}".format(self.originasns, self.asns)) + + # if results are incomplete get the other pages + if resp.data.get("next") : + nb_pages = math.ceil(resp.data["count"]/len(resp.data["results"])) + pages_queries = [] + logging.info("{} more pages to query".format(nb_pages)) + for p in range(2,int(nb_pages)+1): + pages_queries.append(self.query_api(self.originasns, self.asns, p)) + + for i, page_resp in enumerate(pages_queries): + resp= page_resp.result() if resp.ok and "results" in resp.data and len(resp.data["results"])>0: yield resp.data["results"] all_results.append(resp.data["results"]) else: - logging.warning("No hegemony results for origin AS={}, AS={}".format(originasn, asn)) - - # if results are incomplete get the other pages - if resp.data.get("next") : - nb_pages = math.ceil(resp.data["count"]/len(resp.data["results"])) - pages_queries = [] - logging.info("{} more pages to query".format(nb_pages)) - for p in range(2,int(nb_pages)+1): - pages_queries.append(self.query_api(originasn, asn, p)) - - for i, page_resp in enumerate(pages_queries): - resp= page_resp.result() - if resp.ok and "results" in resp.data and len(resp.data["results"])>0: - yield resp.data["results"] - all_results.append(resp.data["results"]) - else: - logging.warning("No hegemony results for origin AS={}, AS={}, page={}".format(originasn, asn, i+2)) - - if self.cache and len(all_results)>0 and len(all_results[0]) : - logging.info("caching results to disk") - json.dump(all_results, open(cache_fname, "w")) + logging.warning("No hegemony results for origin AS={}, AS={}, page={}".format(self.originasns, self.asns, i+2)) + + ''' + If caching is disabled or the cache file doesn't exist, it queries the API for results. + If successful, it yields the results and appends them to all_results, otherwise, + it logs a warning message. + ''' + if self.cache and len(all_results)>0 and len(all_results[0]) : + logging.info("caching results to disk") + json.dump(all_results, open(cache_fname, "w"),indent=4) #added indentation if __name__ == "__main__": @@ -175,8 +143,11 @@ def get_results(self): logging.basicConfig(format=FORMAT, filename="hegemony.log", level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') res = Hegemony( - originasns=[2907, 7922], start="2018-09-15", end="2018-09-16" + originasns=[2907, 7922], start="2019-09-15", end="2019-09-16" ).get_results() - + json_data=" " for r in res: - print(r) + print(json.dumps(r,indent=4)) #added indentation + + + diff --git a/ihr/link_delay.py b/ihr/link_delay.py index ef3d975..b23ecaa 100644 --- a/ihr/link_delay.py +++ b/ihr/link_delay.py @@ -4,7 +4,7 @@ import logging import ujson as json import math -from collections import defaultdict + from requests_futures.sessions import FuturesSession @@ -19,9 +19,9 @@ def worker_task(resp, *args, **kwargs): class Delay(): def __init__(self, start, end, asns=None, af=4, session=None, - cache=True, cache_dir="cache/", - url='https://ihr.iijlab.net/ihr/api/link/delay/', - nb_threads=2): + cache=True, cache_dir="cache/", + url='https://ihr.iijlab.net/ihr/api/link/delay/', + nb_threads=2): """ :originasn: Origin ASN of interest. It can be a list of ASNs or a single int value. Set to 0 for global hegemony. @@ -63,9 +63,8 @@ def __init__(self, start, end, asns=None, af=4, session=None, if not os.path.exists(cache_dir): os.mkdir(cache_dir) self.params = {} - self.queries = defaultdict(list) - def query_api(self, asn, page): + def query_api(self, asns, page): """Single API query. Don't call this method, use get_results instead.""" params = dict( @@ -76,13 +75,13 @@ def query_api(self, asn, page): format="json" ) - if asn is not None: - params["asn"] = asn + if asns: + params["asn"] = ",".join(map(str, asns)) else: logging.error("You should give an ASN.") return None - logging.info("query results for {}, page={}".format(asn, page)) + logging.info("query results for {}, page={}".format(asns, page)) self.params = params return self.session.get( url=self.url, params=params, @@ -99,60 +98,47 @@ def get_results(self): """ - # Main loop - queries = {} - - # Query the API - for asn in self.asns: - # Skip the query if we have the corresponding cache - cache_fname = "{}/dalay_start{}_end{}_asn{}_af{}.json".format( - self.cache_dir, self.start, self.end, asn, self.af) - if self.cache and os.path.exists(cache_fname): - continue - queries[asn] = self.query_api(asn, 1) + # Skip the query if we have the corresponding cache + cache_fname = "{}/dalay_start{}_end{}_asns{}_af{}.json".format( + self.cache_dir, self.start, self.end, "_".join(map(str, self.asns)), self.af) + + if self.cache and os.path.exists(cache_fname): + # get results from cache + logging.info("Get results from cache") + with open(cache_fname, "r") as cache_file: + for res in json.load(cache_file): + yield res - # Fetch the results + else: + # fetch results + all_results = [] + resp = self.query_api(self.asns, 1).result() + logging.info("got results for {}".format(self.asns)) + if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: + yield resp.data["results"] + all_results.append(resp.data["results"]) + else: + logging.warning("No Delay results for {}".format(self.params)) - for asn in self.asns: - cache_fname = "{}/dalay_start{}_end{}_asn{}_af{}.json".format( - self.cache_dir, self.start, self.end, asn, self.af) + # if results are incomplete get the other pages + if resp.data.get("next"): + nb_pages = math.ceil(resp.data["count"] / len(resp.data["results"])) + pages_queries = [] + logging.info("{} more pages to query".format(nb_pages)) + for p in range(2, int(nb_pages + 1)): + pages_queries.append(self.query_api(self.asns, p)) - if self.cache and os.path.exists(cache_fname): - # get results from cache - logging.info("Get results from cache") - for res in json.load(open(cache_fname, "r")): - yield res + for i, page_resp in enumerate(pages_queries): + resp = page_resp.result() + if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: + yield resp.data["results"] + all_results.append(resp.data["results"]) + else: + logging.warning("No hegemony results for {}, page={}".format(self.params, i + 2)) - else: - # fetch results - all_results = [] - resp = queries[asn].result() - logging.info("got results for {}".format(asn)) - if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: - yield resp.data["results"] - all_results.append(resp.data["results"]) - else: - logging.warning("No Delay results for {}".format(self.params)) - - # if results are incomplete get the other pages - if resp.data.get("next"): - nb_pages = math.ceil(resp.data["count"] / len(resp.data["results"])) - pages_queries = [] - logging.info("{} more pages to query".format(nb_pages)) - for p in range(2, int(nb_pages + 1)): - pages_queries.append(self.query_api(asn, p)) - - for i, page_resp in enumerate(pages_queries): - resp = page_resp.result() - if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: - yield resp.data["results"] - all_results.append(resp.data["results"]) - else: - logging.warning("No hegemony results for {}, page={}".format(self.params, i + 2)) - - if self.cache and len(all_results) > 0 and len(all_results[0]): - logging.info("caching results to disk") - json.dump(all_results, open(cache_fname, "w")) + if self.cache and len(all_results) > 0 and len(all_results[0]): + logging.info("caching results to disk") + json.dump(all_results, open(cache_fname, "w"),indent=4) if __name__ == "__main__": @@ -160,8 +146,8 @@ def get_results(self): logging.basicConfig(format=FORMAT, filename="delay.log", level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') res = Delay( - asns=[2907, 7922], start="2018-09-16", end="2018-10-16" + asns=[7922,2907], start="2019-09-15", end="2019-09-16" ).get_results() for r in res: - print(r[0]) + print(json.dumps(r,indent=4)) From 12779f427e8612b264c4538fd1dffcd03fff27b3 Mon Sep 17 00:00:00 2001 From: Rishi Mondal <146999057+MAVRICK-1@users.noreply.github.com> Date: Thu, 1 Feb 2024 05:29:02 +0530 Subject: [PATCH 2/5] Added indentation --- ihr/disco_events.py | 10 +++++----- ihr/link_delay.py | 2 +- ihr/link_forwarding.py | 10 +++++----- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ihr/disco_events.py b/ihr/disco_events.py index c72937d..402a583 100644 --- a/ihr/disco_events.py +++ b/ihr/disco_events.py @@ -19,9 +19,9 @@ def worker_task(resp, *args, **kwargs): class Disconnect(): def __init__(self, start=None, end=None, streamnames=None, af=4, session=None, - cache=True, cache_dir="cache/", - url='https://ihr.iijlab.net/ihr/api/disco/events/', - nb_threads=2): + cache=True, cache_dir="cache/", + url='https://ihr.iijlab.net/ihr/api/disco/events/', + nb_threads=2): """ :originasn: Origin ASN of interest. It can be a list of ASNs or a single int value. Set to 0 for global hegemony. @@ -149,7 +149,7 @@ def get_results(self): if self.cache and len(all_results) > 0 and len(all_results[0]): logging.info("caching results to disk") - json.dump(all_results, open(cache_fname, "w")) + json.dump(all_results, open(cache_fname, "w"),indent=4) #added indentation if __name__ == "__main__": @@ -159,4 +159,4 @@ def get_results(self): res = Disconnect(streamnames='MX', start="2017-03-02T14:28:07", end="2017-03-03T14:28:07").get_results() for r in res: - print(r) + print(json.dumps(r[0],indent=4))#added indentation diff --git a/ihr/link_delay.py b/ihr/link_delay.py index b23ecaa..2fe19fa 100644 --- a/ihr/link_delay.py +++ b/ihr/link_delay.py @@ -150,4 +150,4 @@ def get_results(self): ).get_results() for r in res: - print(json.dumps(r,indent=4)) + print(json.dumps(r[0],indent=4)) diff --git a/ihr/link_forwarding.py b/ihr/link_forwarding.py index dbba281..5b4843f 100644 --- a/ihr/link_forwarding.py +++ b/ihr/link_forwarding.py @@ -19,9 +19,9 @@ def worker_task(resp, *args, **kwargs): class Forwarding(): def __init__(self, start, end, asns=None, af=4, session=None, - cache=True, cache_dir="cache/", - url='https://ihr.iijlab.net/ihr/api/link/forwarding/', - nb_threads=2): + cache=True, cache_dir="cache/", + url='https://ihr.iijlab.net/ihr/api/link/forwarding/', + nb_threads=2): """ :originasn: Origin ASN of interest. It can be a list of ASNs or a single int value. Set to 0 for global hegemony. @@ -152,7 +152,7 @@ def get_results(self): if self.cache and len(all_results) > 0 and len(all_results[0]): logging.info("caching results to disk") - json.dump(all_results, open(cache_fname, "w")) + json.dump(all_results, open(cache_fname, "w"),indent=4)#added indentation if __name__ == "__main__": @@ -164,4 +164,4 @@ def get_results(self): ).get_results() for r in res: - print(r[0]) + print(json.dumps(r[0],indent=4)) #added indentation From 720943b2a1703b0b827201704915b9ef1c60f4a8 Mon Sep 17 00:00:00 2001 From: Rishi Mondal <146999057+MAVRICK-1@users.noreply.github.com> Date: Thu, 1 Feb 2024 06:01:01 +0530 Subject: [PATCH 3/5] small changes --- ihr/hegemony.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/ihr/hegemony.py b/ihr/hegemony.py index 72cb69a..dd35c25 100644 --- a/ihr/hegemony.py +++ b/ihr/hegemony.py @@ -23,7 +23,20 @@ def __init__(self, start, end, originasns=None, asns=None, af=4, session=None, url='https://ihr.iijlab.net/ihr/api/hegemony/', nb_threads=2): """ - Initialize Hegemony object with parameters. + :originasn: Origin ASN of interest. It can be a list of ASNs or a single + int value. Set to 0 for global hegemony. + :start: Start date/time. + :end: End date/time. + :asn: Return dependency only to the given ASNs. By default return all + dependencies. + :af: Adress family, default is IPv4 + :session: Requests session to use + :page: Page number for paginated results. + :cache: Set to False to ignore cache + :cache_dir: Directory used for cached results. + :url: API root url + :nb_threads: Maximum number of parallel downloads + Notes: By default results are cached on disk. """ if isinstance(asns, int): asns = [asns] From ef8731aee1e2cf74a35d94587477a495913491c6 Mon Sep 17 00:00:00 2001 From: Rishi Mondal <146999057+MAVRICK-1@users.noreply.github.com> Date: Fri, 2 Feb 2024 09:01:31 +0530 Subject: [PATCH 4/5] Updated the version Updated the version from 1.2 to 1.3 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index c5f9e7c..a67015b 100755 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ setup( name='abondance', - version='0.1.2', + version='0.1.3', description="Pyhton library for Internet Health Report API", long_description=readme, long_description_content_type="text/markdown", From 08c121c341b8758c98de968b76e6eb4938c01df3 Mon Sep 17 00:00:00 2001 From: Rishi Mondal <146999057+MAVRICK-1@users.noreply.github.com> Date: Fri, 2 Feb 2024 10:19:15 +0530 Subject: [PATCH 5/5] Update link_forwarding.py Reduced the queries --- ihr/link_forwarding.py | 97 ++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 56 deletions(-) diff --git a/ihr/link_forwarding.py b/ihr/link_forwarding.py index 5b4843f..4f2f153 100644 --- a/ihr/link_forwarding.py +++ b/ihr/link_forwarding.py @@ -65,7 +65,7 @@ def __init__(self, start, end, asns=None, af=4, session=None, self.params = {} self.queries = defaultdict(list) - def query_api(self, asn, page): + def query_api(self, asns, page): """Single API query. Don't call this method, use get_results instead.""" params = dict( @@ -76,13 +76,13 @@ def query_api(self, asn, page): format="json" ) - if asn is not None: - params["asn"] = asn + if asns: + params["asn"] = ",".join(map(str, asns)) else: logging.error("You should give an ASN.") return None - logging.info("query results for {}, page={}".format(asn, page)) + logging.info("query results for {}, page={}".format(asns, page)) self.params = params return self.session.get( url=self.url, params=params, @@ -98,61 +98,46 @@ def get_results(self): :returns: Dictionary of AS dependencies. """ + # Skip the query if we have the corresponding cache + cache_fname = "{}/FA_start{}_end{}_asn{}_af{}.json".format( + self.cache_dir, self.start, self.end, "_".join(map(str, self.asns)), self.af) + if self.cache and os.path.exists(cache_fname): + # get results from cache + logging.info("Get results from cache") + with open(cache_fname, "r") as cache_file: + for res in json.load(cache_file): + yield res - # Main loop - queries = {} - - # Query the API - for asn in self.asns: - # Skip the query if we have the corresponding cache - cache_fname = "{}/FA_start{}_end{}_asn{}_af{}.json".format( - self.cache_dir, self.start, self.end, asn, self.af) - if self.cache and os.path.exists(cache_fname): - continue - queries[asn] = self.query_api(asn, 1) - - # Fetch the results + else: + # fetch results + all_results = [] + resp = self.query_api(self.asns, 1).result() + logging.info("got results for {}".format(self.asns)) + if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: + yield resp.data["results"] + all_results.append(resp.data["results"]) + else: + logging.warning("No Delay results for {}".format(self.params)) - for asn in self.asns: - cache_fname = "{}/FA_start{}_end{}_asn{}_af{}.json".format( - self.cache_dir, self.start, self.end, asn, self.af) + # if results are incomplete get the other pages + if resp.data.get("next"): + nb_pages = math.ceil(resp.data["count"] / len(resp.data["results"])) + pages_queries = [] + logging.info("{} more pages to query".format(nb_pages)) + for p in range(2, int(nb_pages + 1)): + pages_queries.append(self.query_api(self.asns, p)) - if self.cache and os.path.exists(cache_fname): - # get results from cache - logging.info("Get results from cache") - for res in json.load(open(cache_fname, "r")): - yield res + for i, page_resp in enumerate(pages_queries): + resp = page_resp.result() + if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: + yield resp.data["results"] + all_results.append(resp.data["results"]) + else: + logging.warning("No hegemony results for {}, page={}".format(self.params, i + 2)) - else: - # fetch results - all_results = [] - resp = queries[asn].result() - logging.info("got results for {}".format(asn)) - if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: - yield resp.data["results"] - all_results.append(resp.data["results"]) - else: - logging.warning("No Delay results for {}".format(self.params)) - - # if results are incomplete get the other pages - if resp.data.get("next"): - nb_pages = math.ceil(resp.data["count"] / len(resp.data["results"])) - pages_queries = [] - logging.info("{} more pages to query".format(nb_pages)) - for p in range(2, int(nb_pages + 1)): - pages_queries.append(self.query_api(asn, p)) - - for i, page_resp in enumerate(pages_queries): - resp = page_resp.result() - if resp.ok and "results" in resp.data and len(resp.data["results"]) > 0: - yield resp.data["results"] - all_results.append(resp.data["results"]) - else: - logging.warning("No hegemony results for {}, page={}".format(self.params, i + 2)) - - if self.cache and len(all_results) > 0 and len(all_results[0]): - logging.info("caching results to disk") - json.dump(all_results, open(cache_fname, "w"),indent=4)#added indentation + if self.cache and len(all_results) > 0 and len(all_results[0]): + logging.info("caching results to disk") + json.dump(all_results, open(cache_fname, "w"),indent=4)#added indentation if __name__ == "__main__": @@ -160,7 +145,7 @@ def get_results(self): logging.basicConfig(format=FORMAT, filename="Forwarding_Alarms.log", level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S') res = Forwarding( - asns=[2907, 7922], start="2018-09-15", end="2018-10-16" + asns=[2907, 7922], start="2019-09-15", end="2019-9-16" ).get_results() for r in res: