diff --git a/.gitignore b/.gitignore index 2ae6d3de3..08e22ea94 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ data/* results/* !results/*.png +website venv diff --git a/algos.yaml b/algos.yaml index de1071504..e68ebd0fc 100644 --- a/algos.yaml +++ b/algos.yaml @@ -10,71 +10,100 @@ float: arg-groups: - {"M": 4, "efConstruction": 500} query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-8: - # arg-groups: - # - {"M": 8, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-12: - # arg-groups: - # - {"M": 12, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-16: - # arg-groups: - # - {"M": 16, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-24: - # arg-groups: - # - {"M": 24, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-36: - # arg-groups: - # - {"M": 36, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-48: - # arg-groups: - # - {"M": 48, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-64: - # arg-groups: - # - {"M": 64, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] - # M-96: - # arg-groups: - # - {"M": 96, "efConstruction": 500} - # query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-8: + arg-groups: + - {"M": 8, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-12: + arg-groups: + - {"M": 12, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-16: + arg-groups: + - {"M": 16, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-24: + arg-groups: + - {"M": 24, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-36: + arg-groups: + - {"M": 36, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-48: + arg-groups: + - {"M": 48, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-64: + arg-groups: + - {"M": 64, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-96: + arg-groups: + - {"M": 96, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] redisearch-flat: docker-tag: ann-benchmarks-redisearch module: ann_benchmarks.algorithms.redisearch constructor: RediSearch base-args: ["FLAT", "@metric", "@connection"] run-groups: - BS-2^20: - arg-groups: - - {"BLOCK_SIZE": 1048576} - # M-8: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} - # M-12: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} - # M-16: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} - # M-24: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} - # M-36: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} - # M-48: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} - # M-64: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} - # M-96: - # arg-groups: - # - {"BLOCK_SIZE": 1048576} + BS-2^10: + arg-groups: + - {"BLOCK_SIZE": 1024} + + vecsim-hnsw: + docker-tag: ann-benchmarks-vecsim + module: ann_benchmarks.algorithms.vecsim-hnsw + constructor: VecSimHnsw + base-args: ["@metric"] + run-groups: + M-4: + arg-groups: + - {"M": 4, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-8: + arg-groups: + - {"M": 8, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-12: + arg-groups: + - {"M": 12, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-16: + arg-groups: + - {"M": 16, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-24: + arg-groups: + - {"M": 24, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-36: + arg-groups: + - {"M": 36, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-48: + arg-groups: + - {"M": 48, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-64: + arg-groups: + - {"M": 64, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-96: + arg-groups: + - {"M": 96, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + pinecone: + docker-tag: ann-benchmarks-pinecone + module: ann_benchmarks.algorithms.pinecone + constructor: Pinecone + base-args: ["@metric", "@dimension", "@connection"] + run-groups: + approximated: + args: [['approximated']] + exact: + args: [['exact']] sptag: docker-tag: ann-benchmarks-sptag module: ann_benchmarks.algorithms.sptag @@ -133,10 +162,10 @@ float: base: args: [[400, 1024, 4096, 8192, 16384], [1, 10, 40, 100, 200]] - hnswlib: + vecsim-hnsw-blocks: docker-tag: ann-benchmarks-hnswlib - module: ann_benchmarks.algorithms.hnswlib - constructor: HnswLib + module: ann_benchmarks.algorithms.vecsim-hnsw + constructor: VecSimHnsw base-args: ["@metric"] run-groups: M-4: @@ -240,14 +269,52 @@ float: # This run group produces 3 algorithm instances -- Annoy("angular", # 100), Annoy("angular", 200), and Annoy("angular", 400) -- each of # which will be used to run 12 different queries. - milvus: + milvus-hnsw: docker-tag: ann-benchmarks-milvus module: ann_benchmarks.algorithms.milvus constructor: Milvus - base-args: ["@metric"] + base-args: ["@metric", "@dimension", "@connection", "HNSW"] + run-groups: + M-4: + arg-groups: + - {"M": 4, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-8: + arg-groups: + - {"M": 8, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-12: + arg-groups: + - {"M": 12, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-16: + arg-groups: + - {"M": 16, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-24: + arg-groups: + - {"M": 24, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-36: + arg-groups: + - {"M": 36, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-48: + arg-groups: + - {"M": 48, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-64: + arg-groups: + - {"M": 64, "efConstruction": 500} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + milvus-ivf: + docker-tag: ann-benchmarks-milvus + module: ann_benchmarks.algorithms.milvus + constructor: Milvus + base-args: ["@metric", "@dimension", "@connection"] run-groups: milvus: - args: [['IVF_FLAT', 'IVF_SQ8'], [100, 300, 1000, 3000, 10000, 30000]] # nlist + args: [['IVF_FLAT', 'IVF_SQ8'], [{"nlist": 100}, {"nlist": 300}, {"nlist": 1000}, {"nlist": 3000}, {"nlist": 10000}, {"nlist": 30000}]] # nlist query-args: [[1, 3, 10, 30, 100, 300]] # nprobe (should be <= nlist) nearpy: disabled: true @@ -301,7 +368,7 @@ float: M-96: arg-groups: - {"M": 96, "efConstruction": 500} - query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] bruteforce: disabled: true @@ -648,14 +715,6 @@ float: - {"n_neighbors": 60, "diversify_prob": 0.0, "pruning_degree_multiplier":[2.0, 3.0], "leaf_size": 48} query-args: [[0.0, 0.04, 0.08, 0.12, 0.16, 0.20, 0.24, 0.28, 0.32, 0.36]] - elasticsearch: - docker-tag: ann-benchmarks-elasticsearch - module: ann_benchmarks.algorithms.elasticsearch - constructor: ElasticsearchScriptScoreQuery - base-args: [ "@metric", "@dimension" ] - run-groups: - empty: - args: [] elastiknn-l2lsh: docker-tag: ann-benchmarks-elastiknn module: ann_benchmarks.algorithms.elastiknn @@ -950,10 +1009,44 @@ float: docker-tag: ann-benchmarks-elasticsearch module: ann_benchmarks.algorithms.elasticsearch constructor: ElasticsearchScriptScoreQuery - base-args: [ "@metric", "@dimension" ] + base-args: [ "@metric", "@dimension", "@connection" ] run-groups: - empty: - args: [] + M-4: + arg-groups: + - {"m": 4, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-8: + arg-groups: + - {"m": 8, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-12: + arg-groups: + - {"m": 12, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-16: + arg-groups: + - {"m": 16, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-24: + arg-groups: + - {"m": 24, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-36: + arg-groups: + - {"m": 36, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-48: + arg-groups: + - {"m": 48, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-64: + arg-groups: + - {"m": 64, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-96: + arg-groups: + - {"m": 96, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] opensearchknn: docker-tag: ann-benchmarks-opensearchknn module: ann_benchmarks.algorithms.opensearchknn diff --git a/ann_benchmarks/algorithms/definitions.py b/ann_benchmarks/algorithms/definitions.py index 5c4ed31da..d04ceac3b 100644 --- a/ann_benchmarks/algorithms/definitions.py +++ b/ann_benchmarks/algorithms/definitions.py @@ -12,7 +12,7 @@ Definition = collections.namedtuple( 'Definition', - ['algorithm', 'constructor', 'module', 'docker_tag', + ['algorithm', 'run_group', 'constructor', 'module', 'docker_tag', 'arguments', 'query_argument_groups', 'disabled']) @@ -96,8 +96,20 @@ def get_unique_algorithms(definition_file): return list(sorted(algos)) +def get_run_groups(definition_file, algo = None): + definitions = _get_definitions(definition_file) + run_groups = set() + for point in definitions: + for metric in definitions[point]: + for algorithm in definitions[point][metric]: + if algo == None or algo == algorithm: + for run_group in definitions[point][metric][algorithm]['run-groups'].keys(): + run_groups.add(run_group) + return list(sorted(run_groups)) + + def get_definitions(definition_file, dimension, point_type="float", - distance_metric="euclidean", count=10, conn_params=dict()): + distance_metric="euclidean", count=10, conn_params={'host': None, 'port': None, 'auth': None, 'user': None, 'cluster': False, 'shards': 1}): definitions = _get_definitions(definition_file) algorithm_definitions = {} @@ -116,7 +128,7 @@ def get_definitions(definition_file, dimension, point_type="float", if "base-args" in algo: base_args = algo["base-args"] - for run_group in algo["run-groups"].values(): + for run_group_name, run_group in algo["run-groups"].items(): if "arg-groups" in run_group: groups = [] for arg_group in run_group["arg-groups"]: @@ -163,6 +175,7 @@ def get_definitions(definition_file, dimension, point_type="float", aargs = [_substitute_variables(arg, vs) for arg in aargs] definitions.append(Definition( algorithm=name, + run_group = run_group_name, docker_tag=algo['docker-tag'], module=algo['module'], constructor=algo['constructor'], diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 95eccab09..989cf4aa8 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -4,11 +4,12 @@ """ import logging from time import sleep +from os import environ from urllib.error import URLError -from urllib.request import Request, urlopen -from elasticsearch import Elasticsearch +from elasticsearch import Elasticsearch, BadRequestError from elasticsearch.helpers import bulk +from elastic_transport.client_utils import DEFAULT from ann_benchmarks.algorithms.base import BaseANN @@ -20,19 +21,18 @@ # logging.basicConfig(level=logging.INFO) # logging.getLogger("elasticsearch").setLevel(logging.INFO) -def es_wait(): +def es_wait(es): print("Waiting for elasticsearch health endpoint...") - req = Request("http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=1s") for i in range(30): try: - res = urlopen(req) - if res.getcode() == 200: + res = es.cluster.health(wait_for_status='yellow', timeout='1s') + if not res['timed_out']: # then status is OK print("Elasticsearch is ready") return except URLError: pass sleep(1) - raise RuntimeError("Failed to connect to local elasticsearch") + raise RuntimeError("Failed to connect to elasticsearch server") class ElasticsearchScriptScoreQuery(BaseANN): @@ -42,57 +42,62 @@ class ElasticsearchScriptScoreQuery(BaseANN): - Dense vector queries: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-script-score-query.html """ - def __init__(self, metric: str, dimension: int): - self.name = f"elasticsearch-script-score-query_metric={metric}_dimension={dimension}" - self.metric = metric + def __init__(self, metric: str, dimension: int, conn_params, method_param): + self.name = f"elasticsearch-script-score-query_metric={metric}_dimension={dimension}_params{method_param}" + self.metric = {"euclidean": 'l2_norm', "angular": 'cosine'}[metric] + self.method_param = method_param self.dimension = dimension - self.index = f"es-ssq-{metric}-{dimension}" - self.es = Elasticsearch(["http://localhost:9200"]) + self.timeout = 60 * 60 + h = conn_params['host'] if conn_params['host'] is not None else 'localhost' + p = conn_params['port'] if conn_params['port'] is not None else '9200' + u = conn_params['user'] if conn_params['user'] is not None else 'elastic' + a = conn_params['auth'] if conn_params['auth'] is not None else '' + self.index = "ann_benchmark" + self.shards = conn_params['shards'] + try: + self.es = Elasticsearch(f"http://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), refresh_interval=-1) + self.es.info() + except Exception: + self.es = Elasticsearch(f"https://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) self.batch_res = [] - if self.metric == "euclidean": - self.script = "1 / (1 + l2norm(params.query_vec, \"vec\"))" - elif self.metric == "angular": - self.script = "1.0 + cosineSimilarity(params.query_vec, \"vec\")" - else: - raise NotImplementedError(f"Not implemented for metric {self.metric}") - es_wait() + es_wait(self.es) def fit(self, X): - body = dict(settings=dict(number_of_shards=1, number_of_replicas=0)) - mapping = dict( + mappings = dict( properties=dict( id=dict(type="keyword", store=True), - vec=dict(type="dense_vector", dims=self.dimension) + vec=dict( + type="dense_vector", + dims=self.dimension, + similarity=self.metric, + index=True, + index_options=self.method_param + ) ) ) - self.es.indices.create(self.index, body=body) - self.es.indices.put_mapping(mapping, self.index) + try: + self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=self.shards, number_of_replicas=0)) + except BadRequestError as e: + if 'resource_already_exists_exception' not in e.message: raise e def gen(): for i, vec in enumerate(X): - yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(i + 1) } + yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(i) } (_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9) assert len(errors) == 0, errors - self.es.indices.refresh(self.index) - self.es.indices.forcemerge(self.index, max_num_segments=1) + self.es.indices.refresh(index=self.index) + self.es.indices.forcemerge(index=self.index, max_num_segments=1) + + def set_query_arguments(self, ef): + self.ef = ef def query(self, q, n): - body = dict( - query=dict( - script_score=dict( - query=dict(match_all=dict()), - script=dict( - source=self.script, - params=dict(query_vec=q.tolist()) - ) - ) - ) - ) - res = self.es.search(index=self.index, body=body, size=n, _source=False, docvalue_fields=['id'], - stored_fields="_none_", filter_path=["hits.hits.fields.id"]) - return [int(h['fields']['id'][0]) - 1 for h in res['hits']['hits']] + knn = dict(field='vec', query_vector=q.tolist(), k=n, num_candidates=self.ef) + res = self.es.knn_search(index=self.index, knn=knn, source=False, docvalue_fields=['id'], + stored_fields="_none_", filter_path=["hits.hits.fields.id"]) + return [int(h['fields']['id'][0]) for h in res['hits']['hits']] def batch_query(self, X, n): self.batch_res = [self.query(q, n) for q in X] diff --git a/ann_benchmarks/algorithms/milvus.py b/ann_benchmarks/algorithms/milvus.py index 50b6940a6..da961d408 100644 --- a/ann_benchmarks/algorithms/milvus.py +++ b/ann_benchmarks/algorithms/milvus.py @@ -1,45 +1,99 @@ from __future__ import absolute_import -import milvus +from sqlite3 import paramstyle +from pymilvus import ( + connections, + utility, + FieldSchema, + CollectionSchema, + DataType, + IndexType, + Collection, +) import numpy import sklearn.preprocessing from ann_benchmarks.algorithms.base import BaseANN +import sys class Milvus(BaseANN): - def __init__(self, metric, index_type, nlist): - self._nlist = nlist - self._nprobe = None - self._metric = metric - self._milvus = milvus.Milvus() - self._milvus.connect(host='localhost', port='19530') - self._table_name = 'test01' + def __init__(self, metric, dim, conn_params, index_type, method_params): + self._host = conn_params['host'] + self._port = conn_params['port'] # 19530 self._index_type = index_type + self._method_params = method_params + self._metric = {'angular': 'IP', 'euclidean': 'L2'}[metric] + self._query_params = dict() + connections.connect(host=conn_params['host'], port=conn_params['port']) + try: + fields = [ + FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=dim) + ] + schema = CollectionSchema(fields) + if utility.has_collection('milvus'): + self._milvus = Collection('milvus') + else: + self._milvus = Collection('milvus', schema) + except: + self._milvus = Collection('milvus') + print('initialization completed!') + + def fit(self, X, offset=0, limit=None): + limit = limit if limit else len(X) + X = X[offset:limit] + if self._metric == 'IP': + X = sklearn.preprocessing.normalize(X) - def fit(self, X): - if self._metric == 'angular': - X = sklearn.preprocessing.normalize(X, axis=1, norm='l2') + X = X.tolist() + bulk_size = 1000 * 1024 * 1024 // (sys.getsizeof(X[0])) # approximation for milvus insert limit (1024MB) + for bulk in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: + print(f'inserting vectors {offset} to {offset + len(bulk) - 1}') + self._milvus.insert([list(range(offset, offset + len(bulk))), bulk]) + offset += len(bulk) - self._milvus.create_table({'table_name': self._table_name, 'dimension': X.shape[1]}) - vector_ids = [id for id in range(len(X))] - self._milvus.insert(table_name=self._table_name, records=X.tolist(), ids=vector_ids) - index_type = getattr(milvus.IndexType, self._index_type) # a bit hacky but works - self._milvus.create_index(self._table_name, {'index_type': index_type, 'nlist': self._nlist}) + if not self._milvus.has_index(): + print('indexing...', end=' ') + try: + self._milvus.create_index('vector', {'index_type': self._index_type, 'metric_type':self._metric, 'params':self._method_params}) + print('done!') + except: + print('failed!') + - def set_query_arguments(self, nprobe): - if nprobe > self._nlist: - print('warning! nprobe > nlist') - nprobe = self._nlist - self._nprobe = nprobe + def set_query_arguments(self, param): + if self._milvus.has_index(): + print('waiting for index... ', end='') + if utility.wait_for_index_building_complete('milvus', 'vector'): + print('done!') + self._milvus.load() + print('waiting for data to be loaded... ', end='') + utility.wait_for_loading_complete('milvus') + print('done!') + else: raise Exception('index has error') + else: raise Exception('index is missing') + if 'IVF_' in self._index_type: + if param > self._method_params['nlist']: + print('warning! nprobe > nlist') + param = self._method_params['nlist'] + self._query_params['nprobe'] = param + if 'HNSW' in self._index_type: + self._query_params['ef'] = param def query(self, v, n): - if self._metric == 'angular': + if self._metric == 'IP': v /= numpy.linalg.norm(v) v = v.tolist() - status, results = self._milvus.search(table_name=self._table_name, query_records=[v], top_k=n, nprobe=self._nprobe) + results = self._milvus.search([v], 'vector', {'metric_type':self._metric, 'params':self._query_params}, limit=n) if not results: return [] # Seems to happen occasionally, not sure why result_ids = [result.id for result in results[0]] return result_ids def __str__(self): - return 'Milvus(index_type=%s, nlist=%d, nprobe=%d)' % (self._index_type, self._nlist, self._nprobe) + return 'Milvus(index_type=%s, method_params=%s, query_params=%s)' % (self._index_type, str(self._method_params), str(self._query_params)) + + def freeIndex(self): + utility.drop_collection("mlivus") + + def done(self): + connections.disconnect('default') diff --git a/ann_benchmarks/algorithms/pinecone.py b/ann_benchmarks/algorithms/pinecone.py new file mode 100644 index 000000000..f7d09c174 --- /dev/null +++ b/ann_benchmarks/algorithms/pinecone.py @@ -0,0 +1,39 @@ +from __future__ import absolute_import +from sqlite3 import paramstyle +from ann_benchmarks.algorithms.base import BaseANN +import sys +import pinecone + +class Pinecone(BaseANN): + def __init__(self, metric, dim, conn_params, type): + pinecone.init(api_key=conn_params['auth']) + m = {'angular': 'cosine', 'euclidean': 'euclidean'}[metric] + self.name = 'ann-benchmark' + if self.name not in pinecone.list_indexes(): + pinecone.create_index(self.name, dimension=dim, metric=m, + index_type=type, shards=int(conn_params["shards"]), ) + self.index = pinecone.Index(self.name) + + def fit(self, X, offset=0, limit=None): + limit = limit if limit else len(X) + + bulk = [(str(i), X[i].tolist()) for i in range(offset, limit)] + # approximation for pinecone insert limit (2MB or 1000 vectors) + batch_size = min(1000, 2 * 1024 * 1024 // (sys.getsizeof(bulk[-1]))) # bulk[-1] should be the largest (longest name) + + for batch in [bulk[i: i+batch_size] for i in range(0, len(bulk), batch_size)]: + # print(f'inserting vectors {batch[0][0]} to {batch[-1][0]}') + self.index.upsert(batch) + + # print(self.index.describe_index_stats()) + # print(pinecone.describe_index(self.name)) + + def query(self, v, n): + res = self.index.query(v.tolist(), top_k=n) + return [int(e['id']) for e in res['matches']] + + def freeIndex(self): + pinecone.delete_index(self.name) + + def __str__(self): + return f'Pinecone({pinecone.describe_index(self.name)})' diff --git a/ann_benchmarks/algorithms/redisearch.py b/ann_benchmarks/algorithms/redisearch.py index 318f2f44d..c1f3ea201 100644 --- a/ann_benchmarks/algorithms/redisearch.py +++ b/ann_benchmarks/algorithms/redisearch.py @@ -1,8 +1,10 @@ from __future__ import absolute_import +from optparse import Values from redis import Redis from redis.cluster import RedisCluster from ann_benchmarks.constants import INDEX_DIR from ann_benchmarks.algorithms.base import BaseANN +import math class RediSearch(BaseANN): @@ -12,37 +14,81 @@ def __init__(self, algo, metric, conn_params, method_param): self.algo = algo self.name = 'redisearch-%s (%s)' % (self.algo, self.method_param) self.index_name = "ann_benchmark" + self.text = None + self.ef = None redis = RedisCluster if conn_params['cluster'] else Redis - self.redis = redis(host=conn_params["host"], port=conn_params["port"], - password=conn_params["auth"], username=conn_params["user"], - decode_responses=False) + host = conn_params["host"] if conn_params["host"] else 'localhost' + port = conn_params["port"] if conn_params["port"] else 6379 + self.redis = redis(host=host, port=port, decode_responses=False, + password=conn_params["auth"], username=conn_params["user"]) + self.shards = int(conn_params["shards"]) + if conn_params['cluster']: + self.shards = len(self.redis.get_primaries()) - def fit(self, X, offset=0, limit=None): + def fit(self, X, offset=0, limit=None, hybrid_buckets = None): limit = limit if limit else len(X) try: + args = [self.index_name, 'SCHEMA'] + if hybrid_buckets: + args.extend(['n', 'NUMERIC', 't', 'TEXT']) # https://oss.redis.com/redisearch/master/Commands/#ftcreate if self.algo == "HNSW": - self.redis.execute_command('FT.CREATE', self.index_name, 'SCHEMA', 'vector', 'VECTOR', self.algo, '12', 'TYPE', 'FLOAT32', 'DIM', len(X[0]), 'DISTANCE_METRIC', self.metric, 'INITIAL_CAP', len(X), 'M', self.method_param['M'] , 'EF_CONSTRUCTION', self.method_param["efConstruction"], target_nodes='random') + args.extend(['vector', 'VECTOR', self.algo, '10', 'TYPE', 'FLOAT32', 'DIM', len(X[0]), 'DISTANCE_METRIC', self.metric, 'M', self.method_param['M'], 'EF_CONSTRUCTION', self.method_param["efConstruction"]]) elif self.algo == "FLAT": - self.redis.execute_command('FT.CREATE', self.index_name, 'SCHEMA', 'vector', 'VECTOR', self.algo, '10', 'TYPE', 'FLOAT32', 'DIM', len(X[0]), 'DISTANCE_METRIC', self.metric, 'INITIAL_CAP', len(X), 'BLOCK_SIZE', self.method_param['BLOCK_SIZE'], target_nodes='random') + args.extend(['vector', 'VECTOR', self.algo, '6', 'TYPE', 'FLOAT32', 'DIM', len(X[0]), 'DISTANCE_METRIC', self.metric]) + print("Calling FT.CREATE", *args) + self.redis.execute_command('FT.CREATE', *args, target_nodes='primaries') except Exception as e: if 'Index already exists' not in str(e): raise - - for i in range(offset, limit): - self.redis.execute_command('HSET', f'ann_{i}', 'vector', X[i].tobytes()) + p = self.redis.pipeline(transaction=False) + count = 0 + if hybrid_buckets: + print('running hybrid') + for bucket in hybrid_buckets.values(): + ids = bucket['ids'] + text = bucket['text'].decode() + number = bucket['number'] + print('calling HSET', f'', 'vector', '', 't', text, 'n', number) + for id in ids: + if id >= offset and id < limit: + p.execute_command('HSET', int(id), 'vector', X[id].tobytes(), 't', text, 'n', int(number)) + count+=1 + if count % 1000 == 0: + p.execute() + p.reset() + p.execute() + else: + for i in range(offset, limit): + p.execute_command('HSET', i, 'vector', X[i].tobytes()) + count+=1 + if count % 1000 == 0: + p.execute() + p.reset() + p.execute() def set_query_arguments(self, ef): self.ef = ef + def set_hybrid_query(self, text): + self.text = text + def query(self, v, k): # https://oss.redis.com/redisearch/master/Commands/#ftsearch qparams = f' EF_RUNTIME {self.ef}' if self.algo == 'HNSW' else '' - vq = f'*=>[TOP_K {k} @vector $BLOB {qparams}]' - q = ['FT.SEARCH', self.index_name, vq, 'NOCONTENT', 'SORTBY', '__vector_score', 'LIMIT', '0', str(k), 'PARAMS', '2', 'BLOB', v.tobytes()] - return [int(doc.replace(b'ann_',b'')) for doc in self.redis.execute_command(*q, target_nodes='random')[1:]] + if self.text: + vq = f'(@t:{self.text})=>[KNN {k} @vector $BLOB {qparams}]' + else: + vq = f'*=>[KNN {k} @vector $BLOB {qparams}]' + q = ['FT.SEARCH', self.index_name, vq, 'NOCONTENT', 'SORTBY', '__vector_score', 'LIMIT', '0', str(k), 'PARAMS', '2', 'BLOB', v.tobytes(), 'DIALECT', '2', 'TIMEOUT', '0'] + return [int(doc) for doc in self.redis.execute_command(*q, target_nodes='random')[1:]] def freeIndex(self): self.redis.execute_command("FLUSHALL") + def __str__(self): + res = self.name + if self.ef is not None: + res += f", efRuntime: {self.ef}" + return res diff --git a/ann_benchmarks/algorithms/vecsim-hnsw.py b/ann_benchmarks/algorithms/vecsim-hnsw.py new file mode 100644 index 000000000..d85fe7836 --- /dev/null +++ b/ann_benchmarks/algorithms/vecsim-hnsw.py @@ -0,0 +1,43 @@ +from __future__ import absolute_import +import os +from VecSim import * +import numpy as np +from ann_benchmarks.constants import INDEX_DIR +from ann_benchmarks.algorithms.base import BaseANN + + +class VecSimHnsw(BaseANN): + def __init__(self, metric, method_param): + self.metric = {'angular': VecSimMetric_Cosine, 'euclidean': VecSimMetric_L2}[metric] + self.method_param = method_param + # print(self.method_param,save_index,query_param) + self.ef = None + self.name = 'VecSim-hnsw (%s)' % (self.method_param) + + def fit(self, X): + hnswparams = HNSWParams() + hnswparams.M =self.method_param['M'] + hnswparams.efConstruction = self.method_param['efConstruction'] + hnswparams.initialCapacity = len(X) + hnswparams.dim = len(X[0]) + hnswparams.type = VecSimType_FLOAT32 + hnswparams.metric = self.metric + hnswparams.multi = False + + self.index = HNSWIndex(hnswparams) + + for i, vector in enumerate(X): + self.index.add_vector(vector, i) + + def set_query_arguments(self, ef): + self.ef = ef + self.index.set_ef(ef) + + def query(self, v, n): + return self.index.knn_query(np.expand_dims(v, axis=0), k=n)[0][0] + + def freeIndex(self): + del self.index + + def __str__(self): + return f"{self.name}, efRuntime: {self.ef}" diff --git a/ann_benchmarks/datasets.py b/ann_benchmarks/datasets.py index f89069ebe..8d1549ce6 100644 --- a/ann_benchmarks/datasets.py +++ b/ann_benchmarks/datasets.py @@ -1,3 +1,4 @@ +from copyreg import pickle import h5py import numpy import os @@ -7,6 +8,7 @@ from urllib.request import urlretrieve from ann_benchmarks.distance import dataset_transform +import urllib.parse def download(src, dst): @@ -18,14 +20,26 @@ def download(src, dst): def get_dataset_fn(dataset): if not os.path.exists('data'): - os.mkdir('data') + try: + os.mkdir('data') + except FileExistsError: + pass # fixes race condition return os.path.join('data', '%s.hdf5' % dataset) def get_dataset(which): hdf5_fn = get_dataset_fn(which) try: - url = 'http://ann-benchmarks.com/%s.hdf5' % which + if 'dbpedia' in which: + url = 'https://s3.us-east-1.amazonaws.com/benchmarks.redislabs/vecsim/dbpedia/dbpedia-768.hdf5' + elif 'amazon-reviews' in which: + url = 'https://s3.us-east-1.amazonaws.com/benchmarks.redislabs/vecsim/amazon_reviews/amazon-reviews-384.hdf5' + elif 'hybrid' in which: + url = 'https://s3.us-east-1.amazonaws.com/benchmarks.redislabs/vecsim/hybrid_datasets/%s.hdf5' % urllib.parse.quote(which) + elif 'Text-to-Image' in which: + url = 'https://s3.us-east-1.amazonaws.com/benchmarks.redislabs/vecsim/big_ann/%s.hdf5' % urllib.parse.quote(which) + else: + url = 'http://ann-benchmarks.com/%s.hdf5' % which download(url, hdf5_fn) except: print("Cannot download %s" % url) @@ -40,7 +54,6 @@ def get_dataset(which): return hdf5_f, dimension - # Everything below this line is related to creating datasets # You probably never need to do this at home, # just rely on the prepared datasets at http://ann-benchmarks.com @@ -425,6 +438,129 @@ def lastfm(out_fn, n_dimensions, test_size=50000): # as the inner product on the untransformed data write_output(item_factors, user_factors, out_fn, 'angular') +def parse_dbpedia_data(source_file, max_docs: int): + import re + """ + Parses the input file of abstracts and returns an iterable + :param max_docs: maximum number of input documents to process; -1 for no limit + :param source_file: input file + :return: yields document by document to the consumer + """ + global VERBOSE + count = 0 + max_tokens = 0 + + if -1 < max_docs < 50: + VERBOSE = True + + percent = 0.1 + bulk_size = (percent / 100) * max_docs + + print(f"bulk_size={bulk_size}") + + if bulk_size <= 0: + bulk_size = 1000 + + for line in source_file: + line = line.decode("utf-8") + + # skip commented out lines + comment_regex = '^#' + if re.search(comment_regex, line): + continue + + token_size = len(line.split()) + if token_size > max_tokens: + max_tokens = token_size + + # skip lines with 20 tokens or less, because they tend to contain noise + # (this may vary in your dataset) + if token_size <= 20: + continue + + first_url_regex = '^<([^\>]+)>\s*' + + x = re.search(first_url_regex, line) + if x: + url = x.group(1) + # also remove the url from the string + line = re.sub(first_url_regex, '', line) + else: + url = '' + + # remove the second url from the string: we don't need to capture it, because it is repetitive across + # all abstracts + second_url_regex = '^<[^\>]+>\s*' + line = re.sub(second_url_regex, '', line) + + # remove some strange line ending, that occurs in many abstracts + language_at_ending_regex = '@en \.\n$' + line = re.sub(language_at_ending_regex, '', line) + + # form the input object for this abstract + doc = { + "_text_": line, + "url": url, + "id": count+1 + } + + yield doc + count += 1 + + if count % bulk_size == 0: + print(f"Processed {count} documents", end="\r") + + if count == max_docs: + break + + source_file.close() + print("Maximum tokens observed per abstract: {}".format(max_tokens)) + +def dbpedia(out_fn): + import bz2 + from sentence_transformers import SentenceTransformer + import torch + device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + print(device) + local_fn = "long_abstracts_en.ttl.bz2" + url = "http://downloads.dbpedia.org/2016-10/core-i18n/en/long_abstracts_en.ttl.bz2" + download(url, local_fn) + source_file = bz2.BZ2File(local_fn, "r") + docs_iter = parse_dbpedia_data(source_file=source_file, max_docs=1000000) + text = [] + for doc in docs_iter: + text.append(doc['_text_']) + model = SentenceTransformer('bert-base-nli-mean-tokens') + model.to(device) + sentence_embeddings = model.encode(text, show_progress_bar=True) + write_output(sentence_embeddings, sentence_embeddings[:10000], out_fn, 'angular') + + +def amazon_reviews(out_fn): + import os + import math + import pickle + import numpy as np + subsets = ['Wireless_v1_00', 'Watches_v1_00', 'Video_Games_v1_00', 'Video_DVD_v1_00', 'Video_v1_00', 'Toys_v1_00', 'Tools_v1_00', 'Sports_v1_00', 'Software_v1_00', 'Shoes_v1_00', 'Pet_Products_v1_00', 'Personal_Care_Appliances_v1_00', 'PC_v1_00', 'Outdoors_v1_00', 'Office_Products_v1_00', 'Musical_Instruments_v1_00', 'Music_v1_00', 'Mobile_Electronics_v1_00', 'Mobile_Apps_v1_00', 'Major_Appliances_v1_00', 'Luggage_v1_00', 'Lawn_and_Garden_v1_00', 'Kitchen_v1_00', 'Jewelry_v1_00', 'Home_Improvement_v1_00', 'Home_Entertainment_v1_00', 'Home_v1_00', 'Health_Personal_Care_v1_00', 'Grocery_v1_00', 'Gift_Card_v1_00', 'Furniture_v1_00', 'Electronics_v1_00', 'Digital_Video_Games_v1_00', 'Digital_Video_Download_v1_00', 'Digital_Software_v1_00', 'Digital_Music_Purchase_v1_00', 'Digital_Ebook_Purchase_v1_00', 'Camera_v1_00', 'Books_v1_00', 'Beauty_v1_00', 'Baby_v1_00', 'Automotive_v1_00', 'Apparel_v1_00', 'Digital_Ebook_Purchase_v1_01', 'Books_v1_01', 'Books_v1_02'] + train_set = None + test_set = None + for i, subset in enumerate(subsets): + url = f'https://s3.us-east-1.amazonaws.com/benchmarks.redislabs/vecsim/amazon_reviews/{subset}_embeddings' + local_fn = f'{subset}_embeddings' + download(url, local_fn) + subset_embeddings = pickle.load(open(local_fn, "rb")) + if i==0: + train_set = subset_embeddings + test_set = subset_embeddings[:math.ceil(10000/len(subsets))] + else: + train_set = np.append(train_set, subset_embeddings, axis =0) + test_set = np.append(test_set, subset_embeddings[:math.ceil(10000/len(subsets))], axis=0) + print(subset_embeddings.shape) + print(train_set.shape) + print(test_set.shape) + os.remove(local_fn) + write_output(train_set, test_set[:10000], out_fn, 'angular') + DATASETS = { 'deep-image-96-angular': deep_image, @@ -463,4 +599,22 @@ def lastfm(out_fn, n_dimensions, test_size=50000): 'sift-256-hamming': lambda out_fn: sift_hamming( out_fn, 'sift.hamming.256'), 'kosarak-jaccard': lambda out_fn: kosarak(out_fn), + 'dbpedia-768' : lambda out_fn: dbpedia(out_fn), + 'amazon-reviews-384': lambda out_fn: amazon_reviews(out_fn), } + + + + +big_ann_datasets = [f'Text-to-Image-{x}' for x in ['10M', '20M', '30M', '40M', '50M', '60M', '70M', '80M', '90M', '100M']] +for dataset in big_ann_datasets: + DATASETS[dataset] = lambda fn: () + + +hybrid_datasets = ['glove-200-angular', 'gist-960-euclidean', 'deep-image-96-angular', 'fashion-mnist-784-euclidean'] +hybrid_datasets.extend(big_ann_datasets) +percentiles= ['0.5', '1', '2', '5', '10', '20', '50'] +for dataset in hybrid_datasets: + for percentile in percentiles: + DATASETS[f'{dataset}-hybrid-{percentile}'] = lambda fn: () + diff --git a/ann_benchmarks/main.py b/ann_benchmarks/main.py index 744dde61c..ab6bd6cbc 100644 --- a/ann_benchmarks/main.py +++ b/ann_benchmarks/main.py @@ -11,6 +11,7 @@ import shutil import sys import traceback +import time from ann_benchmarks.datasets import get_dataset, DATASETS from ann_benchmarks.constants import INDEX_DIR @@ -74,6 +75,11 @@ def main(): metavar='NAME', help='run only the named algorithm', default=None) + parser.add_argument( + '--run-group', + metavar='NAME', + help='run only the named run group', + default=None) parser.add_argument( '--docker-tag', metavar='NAME', @@ -139,12 +145,12 @@ def main(): '--host', metavar='NAME', help='host name or IP', - default="localhost") + default=None) parser.add_argument( '--port', type=positive_int, help='the port "host" is listening on', - default=6379) + default=None) parser.add_argument( '--auth', '-a', metavar='PASSWORD', @@ -165,8 +171,14 @@ def main(): '--client-id', metavar='NUM', type=positive_int, - help='specific client id (among the total client)', + help='specific client id (among the total clients)', default=1) + parser.add_argument( + '--shards', + type=str, + metavar='NUM', + default="1", + help='specify number of shards') args = parser.parse_args() if args.timeout == -1: @@ -175,13 +187,13 @@ def main(): if args.list_algorithms: list_algorithms(args.definitions) sys.exit(0) - + if args.build_only and args.test_only: raise Exception('Nothing to run (build only and test only was specified)') if (args.build_only or args.test_only) and not args.local: raise Exception('Can\'t run build or test only on docker') - conn_params = {'host': args.host, 'port': args.port, 'auth': args.auth, 'user': args.user, 'cluster': args.cluster} + conn_params = {'host': args.host, 'port': args.port, 'auth': args.auth, 'user': args.user, 'cluster': args.cluster, 'shards': args.shards} if args.total_clients < args.client_id: raise Exception('must satisfy 1 <= client_id <= total_clients') @@ -226,9 +238,13 @@ def main(): random.shuffle(definitions) if args.algorithm: - logger.info(f'running only {args.algorithm}') + logger.info(f'running only {args.algorithm} algorithms') definitions = [d for d in definitions if d.algorithm == args.algorithm] + if args.run_group: + logger.info(f'running only {args.run_group} run groups') + definitions = [d for d in definitions if d.run_group == args.run_group] + if not args.local: # See which Docker images we have available docker_client = docker.from_env() @@ -292,11 +308,18 @@ def _test(df): queue = multiprocessing.Queue() for definition in definitions: queue.put(definition) - if args.batch and args.parallelism > 1: - raise Exception(f"Batch mode uses all available CPU resources, --parallelism should be set to 1. (Was: {args.parallelism})") - workers = [multiprocessing.Process(target=run_worker, args=(i+1, args, queue)) - for i in range(args.parallelism)] - [worker.start() for worker in workers] - [worker.join() for worker in workers] + + if args.parallelism == 1: + # Wait for some jobs to be inserted into the queue + while queue.empty(): time.sleep(0.01) + # If we're only running one worker, then we can just run it in the same process + run_worker(1, args, queue) + else: + if args.batch: + raise Exception(f"Batch mode uses all available CPU resources, --parallelism should be set to 1. (Was: {args.parallelism})") + workers = [multiprocessing.Process(target=run_worker, args=(i+1, args, queue)) + for i in range(args.parallelism)] + [worker.start() for worker in workers] + [worker.join() for worker in workers] # TODO: need to figure out cleanup handling here diff --git a/ann_benchmarks/plotting/metrics.py b/ann_benchmarks/plotting/metrics.py index d16d74250..71e650b0b 100644 --- a/ann_benchmarks/plotting/metrics.py +++ b/ann_benchmarks/plotting/metrics.py @@ -77,8 +77,22 @@ def rel(dataset_distances, run_distances, metrics): def queries_per_second(queries, attrs): - return 1.0 / attrs["best_search_time"] + try: + return (attrs['run_count'] * len(queries)) / (attrs["end_querying_time"] - attrs["start_querying_time"]) + except KeyError: + return 1.0 / attrs["best_search_time"] +def percentile_50(times): + return np.percentile(times, 50.0) * 1000.0 + +def percentile_95(times): + return np.percentile(times, 95.0) * 1000.0 + +def percentile_99(times): + return np.percentile(times, 99.0) * 1000.0 + +def percentile_999(times): + return np.percentile(times, 99.9) * 1000.0 def index_size(queries, attrs): # TODO(erikbern): should replace this with peak memory usage or something @@ -100,53 +114,73 @@ def dist_computations(queries, attrs): all_metrics = { "k-nn": { "description": "Recall", - "function": lambda true_distances, run_distances, metrics, run_attrs: knn(true_distances, run_distances, run_attrs["count"], metrics).attrs['mean'], # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: knn(true_distances, run_distances, run_attrs["count"], metrics).attrs['mean'], # noqa "worst": float("-inf"), "lim": [0.0, 1.03] }, "epsilon": { "description": "Epsilon 0.01 Recall", - "function": lambda true_distances, run_distances, metrics, run_attrs: epsilon(true_distances, run_distances, run_attrs["count"], metrics).attrs['mean'], # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: epsilon(true_distances, run_distances, run_attrs["count"], metrics).attrs['mean'], # noqa "worst": float("-inf") }, "largeepsilon": { "description": "Epsilon 0.1 Recall", - "function": lambda true_distances, run_distances, metrics, run_attrs: epsilon(true_distances, run_distances, run_attrs["count"], metrics, 0.1).attrs['mean'], # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: epsilon(true_distances, run_distances, run_attrs["count"], metrics, 0.1).attrs['mean'], # noqa "worst": float("-inf") }, "rel": { "description": "Relative Error", - "function": lambda true_distances, run_distances, metrics, run_attrs: rel(true_distances, run_distances, metrics), # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: rel(true_distances, run_distances, metrics), # noqa "worst": float("inf") }, "qps": { "description": "Queries per second (1/s)", - "function": lambda true_distances, run_distances, metrics, run_attrs: queries_per_second(true_distances, run_attrs), # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: queries_per_second(true_distances, run_attrs), # noqa "worst": float("-inf") }, + "p50": { + "description": "Percentile 50 (millis)", + "function": lambda true_distances, run_distances, metrics, times, run_attrs: percentile_50(times), # noqa + "worst": float("inf") + }, + "p95": { + "description": "Percentile 95 (millis)", + "function": lambda true_distances, run_distances, metrics, times, run_attrs: percentile_95(times), # noqa + "worst": float("inf") + }, + "p99": { + "description": "Percentile 99 (millis)", + "function": lambda true_distances, run_distances, metrics, times, run_attrs: percentile_99(times), # noqa + "worst": float("inf") + }, + "p999": { + "description": "Percentile 99.9 (millis)", + "function": lambda true_distances, run_distances, metrics, times, run_attrs: percentile_999(times), # noqa + "worst": float("inf") + }, "distcomps": { "description": "Distance computations", - "function": lambda true_distances, run_distances, metrics, run_attrs: dist_computations(true_distances, run_attrs), # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: dist_computations(true_distances, run_attrs), # noqa "worst": float("inf") }, "build": { "description": "Build time (s)", - "function": lambda true_distances, run_distances, metrics, run_attrs: build_time(true_distances, run_attrs), # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: build_time(true_distances, run_attrs), # noqa "worst": float("inf") }, "candidates": { "description": "Candidates generated", - "function": lambda true_distances, run_distances, metrics, run_attrs: candidates(true_distances, run_attrs), # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: candidates(true_distances, run_attrs), # noqa "worst": float("inf") }, "indexsize": { "description": "Index size (kB)", - "function": lambda true_distances, run_distances, metrics, run_attrs: index_size(true_distances, run_attrs), # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: index_size(true_distances, run_attrs), # noqa "worst": float("inf") }, "queriessize": { "description": "Index size (kB)/Queries per second (s)", - "function": lambda true_distances, run_distances, metrics, run_attrs: index_size(true_distances, run_attrs) / queries_per_second(true_distances, run_attrs), # noqa + "function": lambda true_distances, run_distances, metrics, times, run_attrs: index_size(true_distances, run_attrs) / queries_per_second(true_distances, run_attrs), # noqa "worst": float("inf") } } diff --git a/ann_benchmarks/plotting/plot_variants.py b/ann_benchmarks/plotting/plot_variants.py index f3632c00d..a30d06dfd 100644 --- a/ann_benchmarks/plotting/plot_variants.py +++ b/ann_benchmarks/plotting/plot_variants.py @@ -9,5 +9,9 @@ "recall/candidates": ("k-nn", "candidates"), "recall/qpssize": ("k-nn", "queriessize"), "eps/time": ("epsilon", "qps"), - "largeeps/time": ("largeepsilon", "qps") + "largeeps/time": ("largeepsilon", "qps"), + "recall/p50": ("k-nn", "p50"), + "recall/p95": ("k-nn", "p95"), + "recall/p99": ("k-nn", "p99"), + "recall/p999": ("k-nn", "p999"), } diff --git a/ann_benchmarks/plotting/utils.py b/ann_benchmarks/plotting/utils.py index 46b3ed4f6..4b18bb1b2 100644 --- a/ann_benchmarks/plotting/utils.py +++ b/ann_benchmarks/plotting/utils.py @@ -45,16 +45,18 @@ def compute_metrics(true_nn_distances, res, metric_1, metric_2, algo_name = properties['name'] # cache distances to avoid access to hdf5 file run_distances = numpy.array(run['distances']) + # cache times to avoid access to hdf5 file + times = numpy.array(run['times']) if recompute and 'metrics' in run: del run['metrics'] metrics_cache = get_or_create_metrics(run) metric_1_value = metrics[metric_1]['function']( true_nn_distances, - run_distances, metrics_cache, properties) + run_distances, metrics_cache, times, properties) metric_2_value = metrics[metric_2]['function']( true_nn_distances, - run_distances, metrics_cache, properties) + run_distances, metrics_cache, times, properties) print('%3d: %80s %12.3f %12.3f' % (i, algo_name, metric_1_value, metric_2_value)) @@ -73,18 +75,47 @@ def compute_all_metrics(true_nn_distances, run, properties, recompute=False): results = {} # cache distances to avoid access to hdf5 file run_distances = numpy.array(run["distances"]) + # cache times to avoid access to hdf5 file + times = numpy.array(run['times']) if recompute and 'metrics' in run: del run['metrics'] metrics_cache = get_or_create_metrics(run) for name, metric in metrics.items(): v = metric["function"]( - true_nn_distances, run_distances, metrics_cache, properties) + true_nn_distances, run_distances, metrics_cache, times, properties) results[name] = v if v: print('%s: %g' % (name, v)) return (algo, algo_name, results) +def compute_metrics_all_runs(dataset, res, recompute=False): + true_nn_distances=list(dataset['distances']) + for i, (properties, run) in enumerate(res): + algo = properties['algo'] + algo_name = properties['name'] + # cache distances to avoid access to hdf5 file + # print('Load distances and times') + run_distances = numpy.array(run['distances']) + times = numpy.array(run['times']) + # print('... done') + if recompute and 'metrics' in run: + print('Recomputing metrics, clearing cache') + del run['metrics'] + metrics_cache = get_or_create_metrics(run) + + dataset = properties['dataset'] + + run_result = { + 'algorithm': algo, + 'parameters': algo_name, + 'count': properties['count'] + } + for name, metric in metrics.items(): + v = metric["function"](true_nn_distances, run_distances, metrics_cache, times, properties) + run_result[name] = v + yield run_result + def generate_n_colors(n): vs = numpy.linspace(0.3, 0.9, 7) diff --git a/ann_benchmarks/results.py b/ann_benchmarks/results.py index c1306c927..d87352ac5 100644 --- a/ann_benchmarks/results.py +++ b/ann_benchmarks/results.py @@ -16,13 +16,13 @@ def get_result_filename(dataset=None, count=None, definition=None, d.append(str(count)) if definition: d.append(definition.algorithm + ('-batch' if batch_mode else '')) - if 'redisearch' in definition.algorithm: - prefix = re.sub(r'\W+', '_', json.dumps(query_arguments, sort_keys=True)).strip('_') - d.append(prefix + f'_client_{id}.hdf5') - else: - data = definition.arguments + query_arguments - d.append(re.sub(r'\W+', '_', json.dumps(data, sort_keys=True)) - .strip('_') + ".hdf5") + data = definition.arguments + query_arguments + for i in range(len(data)): + if isinstance(data[i], dict): + data[i] = {k:data[i][k] for k in data[i] if data[i][k] is not None and k != 'auth'} + data.append('client') + data.append(id) + d.append(re.sub(r'\W+', '_', json.dumps(data, sort_keys=True)).strip('_') + ".hdf5") return os.path.join(*d) diff --git a/ann_benchmarks/runner.py b/ann_benchmarks/runner.py index 63418cb9f..0e6bb53d0 100644 --- a/ann_benchmarks/runner.py +++ b/ann_benchmarks/runner.py @@ -6,6 +6,7 @@ import time import traceback import inspect +import h5py import colors import docker @@ -16,7 +17,7 @@ instantiate_algorithm) from ann_benchmarks.datasets import get_dataset, DATASETS from ann_benchmarks.distance import metrics, dataset_transform -from ann_benchmarks.results import store_results +from ann_benchmarks.results import get_result_filename, store_results def run_individual_query(algo, X_train, X_test, distance, count, run_count, @@ -26,6 +27,8 @@ def run_individual_query(algo, X_train, X_test, distance, count, run_count, ((not batch) and hasattr(algo, "prepare_query")) best_search_time = float('inf') + start_time = time.time() # actual start time + end_time = start_time # "virtual" end time. actual end time is start_time + sum of query times for i in range(run_count): print('Run %d/%d...' % (i + 1, run_count)) # a bit dumb but can't be a scalar since of Python's scoping rules @@ -72,17 +75,20 @@ def batch_query(X): results = batch_query(X_test) else: results = [single_query(x) for x in X_test] - + end_time = time.time() total_time = sum(time for time, _ in results) total_candidates = sum(len(candidates) for _, candidates in results) search_time = total_time / len(X_test) avg_candidates = total_candidates / len(X_test) best_search_time = min(best_search_time, search_time) + print("qps:", len(X_test)/total_time) verbose = hasattr(algo, "query_verbose") attrs = { "batch_mode": batch, "best_search_time": best_search_time, + "start_querying_time": start_time, + "end_querying_time": end_time, "candidates": avg_candidates, "expect_extra": verbose, "name": str(algo), @@ -105,12 +111,20 @@ def run(definition, dataset, count, run_count, batch, build_only, test_only, num function""" % (definition.module, definition.constructor, definition.arguments) D, dimension = get_dataset(dataset) - X_train = numpy.array(D['train']) - X_test = numpy.array(D['test']) + X_train, X_test = dataset_transform(D) distance = D.attrs['distance'] print('got a train set of size (%d * %d)' % (X_train.shape[0], dimension)) - X_train, X_test = dataset_transform(D) + hybrid_buckets = None + if 'bucket_names' in D.attrs: + hybrid_buckets = {} + bucket_names = D.attrs['bucket_names'] + for bucket_name in bucket_names: + bucket_dict = {} + bucket_dict['ids'] = numpy.array(D[f'{bucket_name}_ids']) + bucket_dict['text'] = D[bucket_name]['text'][()] + bucket_dict['number'] = D[bucket_name]['number'][()] + hybrid_buckets[bucket_name] = bucket_dict try: prepared_queries = False @@ -120,15 +134,17 @@ def run(definition, dataset, count, run_count, batch, build_only, test_only, num if not test_only: per_client = len(X_train) // num_clients offset = per_client * (id - 1) - fit_args = [X_train] + fit_kwargs = {} if "offset" and "limit" in inspect.getfullargspec(algo.fit)[0]: - fit_args.append(offset) + fit_kwargs['offset']=offset if num_clients != id: - fit_args.append(offset + per_client) - + fit_kwargs['limit']=offset + per_client + if hybrid_buckets: + fit_kwargs['hybrid_buckets']=hybrid_buckets + t0 = time.time() memory_usage_before = algo.get_memory_usage() - algo.fit(*fit_args) + algo.fit(X_train, **fit_kwargs) build_time = time.time() - t0 index_size = algo.get_memory_usage() - memory_usage_before print('Built index in', build_time) @@ -142,14 +158,37 @@ def run(definition, dataset, count, run_count, batch, build_only, test_only, num if not build_only: print('got %d queries' % len(X_test)) + per_client = len(X_test) // num_clients + offset = per_client * (id - 1) + if (num_clients != id): + X_test = X_test[offset : offset + per_client] + else: + X_test = X_test[offset:] + print('running %d out of them' % len(X_test)) + for pos, query_arguments in enumerate(query_argument_groups, 1): print("Running query argument group %d of %d..." % (pos, len(query_argument_groups))) if query_arguments: algo.set_query_arguments(*query_arguments) + if hybrid_buckets: + text = hybrid_buckets[D.attrs['selected_bucket']]['text'].decode() + print("setting hybrid text query", text) + algo.set_hybrid_query(text) descriptor, results = run_individual_query( algo, X_train, X_test, distance, count, run_count, batch) - if not test_only: + if test_only: + try: + fn = get_result_filename(dataset, count) + fn = os.path.join(fn, definition.algorithm, 'build_stats') + f = h5py.File(fn, 'r') + descriptor["build_time"] = f.attrs["build_time"] + descriptor["index_size"] = f.attrs["index_size"] + f.close() + except: + descriptor["build_time"] = 0 + descriptor["index_size"] = 0 + else: descriptor["build_time"] = build_time descriptor["index_size"] = index_size descriptor["algo"] = definition.algorithm @@ -289,7 +328,7 @@ def _handle_container_return_value(return_value, container, logger): error_msg = return_value['Error'] exit_code = return_value['StatusCode'] msg = base_msg + 'returned exit code %d with message %s' %(exit_code, error_msg) - else: + else: exit_code = return_value msg = base_msg + 'returned exit code %d' % (exit_code) diff --git a/create_hybrid_dataset.py b/create_hybrid_dataset.py new file mode 100644 index 000000000..1d389b12d --- /dev/null +++ b/create_hybrid_dataset.py @@ -0,0 +1,147 @@ +from operator import ne +import click +from ann_benchmarks.datasets import get_dataset, DATASETS +from ann_benchmarks.algorithms.bruteforce import BruteForceBLAS +import struct +import numpy as np +import click +import h5py +from joblib import Parallel, delayed +import multiprocessing +import scipy.spatial + +def calc_i(i, x, bf, test, neighbors, distances, count, orig_ids): + if i % 1000 == 0: + print('%d/%d...' % (i, len(test))) + res = list(bf.query_with_distances(x, count)) + res.sort(key=lambda t: t[-1]) + neighbors[i] = [orig_ids[j] for j, _ in res] + distances[i] = [d for _, d in res] + +def create_buckets(train): + bucket_0_5 = [] + bucket_1 = [] + bucket_2 = [] + bucket_5 = [] + bucket_10 = [] + bucket_20 = [] + bucket_50 = [] + other_bucket = [] + buckets = [bucket_0_5, bucket_1, bucket_2, bucket_5, bucket_10, bucket_20, bucket_50, other_bucket] + bucket_names=['0.5', '1', '2', '5', '10', '20', '50', 'other'] + for i in range(train.shape[0]): + if i % 200 == 19: # 0.5% + bucket_0_5.append(i) + elif i % 100 == 17: # 1% + bucket_1.append(i) + elif i % 50 == 9: # 2% + bucket_2.append(i) + elif i % 20 == 7: # 5% + bucket_5.append(i) + elif i % 10 == 3: # 10% + bucket_10.append(i) + elif i % 2 == 0: # 50% + bucket_50.append(i) + elif i % 5 <= 1: # 20% + bucket_20.append(i) + else: + other_bucket.append(i) + print(len(bucket_0_5), len(bucket_1), len(bucket_2), len(bucket_5), len(bucket_10), len(bucket_20), len(bucket_50), len(other_bucket)) + numeric_values = {} + text_values = {} + for i, bucket_name in enumerate(bucket_names): + numeric_values[bucket_name] = i + text_values[bucket_name] = f'text_{i}' + print(numeric_values) + print(text_values) + return buckets, bucket_names, numeric_values, text_values + +@click.command() +@click.option('--data_set', type=click.Choice(DATASETS.keys(), case_sensitive=False), default='glove-100-angular') +@click.option('--percentile', type=click.Choice(['0.5', '1', '2', '5', '10', '20', '50'], case_sensitive=False), default=None) +def create_ds(data_set, percentile): + ds, dimension= get_dataset(data_set) + train = ds['train'] + test = ds['test'] + distance = ds.attrs['distance'] + count=len(ds['neighbors'][0]) + print(count) + print(train.shape) + buckets, bucket_names, numeric_values, text_values = create_buckets(train) + + if percentile is not None: + i = ['0.5', '1', '2', '5', '10', '20', '50'].index(percentile) + bucket = buckets[i] + fn=f'{data_set}-hybrid-{bucket_names[i]}.hdf5' + with h5py.File(fn, 'w') as f: + f.attrs['type'] = 'dense' + f.attrs['distance'] = ds.attrs['distance'] + f.attrs['dimension'] = len(test[0]) + f.attrs['point_type'] = 'float' + f.attrs['bucket_names'] = bucket_names + f.attrs['selected_bucket'] = bucket_names[i] + for bucket_name in bucket_names: + grp = f.create_group(bucket_name) + grp["text"] = text_values[bucket_name] + grp["number"] = numeric_values[bucket_name] + + f.create_dataset('train', train.shape, dtype=train.dtype)[:] = train + f.create_dataset('test', test.shape, dtype=test.dtype)[:] = test + # Write the id buckets so on ingestion we will know what data to assign for each id. + + for j, id_bucket in enumerate(buckets): + np_bucket = np.array(id_bucket, dtype=np.int32) + f.create_dataset(f'{bucket_names[j]}_ids', np_bucket.shape, dtype=np_bucket.dtype)[:] = np_bucket + + neighbors = f.create_dataset(f'neighbors', (len(test), count), dtype='i') + distances = f.create_dataset(f'distances', (len(test), count), dtype='f') + + # Generate ground truth only for the relevan bucket. + train_bucket = np.array(bucket, dtype = np.int32) + train_set = np.empty((len(bucket), train.shape[1]), dtype=np.float32) + for id in range(len(bucket)): + train_set[id] = train[bucket[id]] + bf = BruteForceBLAS(distance, precision=train.dtype) + bf.fit(train_set) + Parallel(n_jobs=multiprocessing.cpu_count(), require='sharedmem')(delayed(calc_i)(i, x, bf, test, neighbors, distances, count, train_bucket) for i, x in enumerate(test)) + + else: + for i, bucket in enumerate(buckets): + fn=f'{data_set}-hybrid-{bucket_names[i]}.hdf5' + with h5py.File(fn, 'w') as f: + f.attrs['type'] = 'dense' + f.attrs['distance'] = ds.attrs['distance'] + f.attrs['dimension'] = len(test[0]) + f.attrs['point_type'] = 'float' + f.attrs['bucket_names'] = bucket_names + f.attrs['selected_bucket'] = bucket_names[i] + for bucket_name in bucket_names: + grp = f.create_group(bucket_name) + grp["text"] = text_values[bucket_name] + grp["number"] = numeric_values[bucket_name] + + f.create_dataset('train', train.shape, dtype=train.dtype)[:] = train + f.create_dataset('test', test.shape, dtype=test.dtype)[:] = test + # Write the id buckets so on ingestion we will know what data to assign for each id. + for j, id_bucket in enumerate(buckets): + np_bucket = np.array(id_bucket, dtype=np.int32) + f.create_dataset(f'{bucket_names[j]}_ids', np_bucket.shape, dtype=np_bucket.dtype)[:] = np_bucket + + neighbors = f.create_dataset(f'neighbors', (len(test), count), dtype='i') + distances = f.create_dataset(f'distances', (len(test), count), dtype='f') + + # Generate ground truth only for the relevan bucket. + train_bucket = np.array(bucket, dtype = np.int32) + train_set = np.empty((len(bucket), train.shape[1]), dtype=np.float32) + for id in range(len(bucket)): + train_set[id] = train[bucket[id]] + print(train_set.shape) + bf = BruteForceBLAS(distance, precision=train.dtype) + bf.fit(train_set) + Parallel(n_jobs=multiprocessing.cpu_count(), require='sharedmem')(delayed(calc_i)(i, x, bf, test, neighbors, distances, count, train_bucket) for i, x in enumerate(test)) + print(neighbors[1]) + print(distances[1]) + + +if __name__ == "__main__": + create_ds() diff --git a/create_text_to_image_ds.py b/create_text_to_image_ds.py new file mode 100644 index 000000000..3343e4d2f --- /dev/null +++ b/create_text_to_image_ds.py @@ -0,0 +1,117 @@ +from ann_benchmarks.algorithms.bruteforce import BruteForceBLAS +import struct +import numpy as np +import click +import h5py +from joblib import Parallel, delayed +import multiprocessing + +def read_fbin(filename, start_idx=0, chunk_size=None): + """ Read *.fbin file that contains float32 vectors + Args: + :param filename (str): path to *.fbin file + :param start_idx (int): start reading vectors from this index + :param chunk_size (int): number of vectors to read. + If None, read all vectors + Returns: + Array of float32 vectors (numpy.ndarray) + """ + with open(filename, "rb") as f: + nvecs, dim = np.fromfile(f, count=2, dtype=np.int32) + nvecs = (nvecs - start_idx) if chunk_size is None else chunk_size + arr = np.fromfile(f, count=nvecs * dim, dtype=np.float32, + offset=start_idx * 4 * dim) + return arr.reshape(nvecs, dim) + + +def read_ibin(filename, start_idx=0, chunk_size=None): + """ Read *.ibin file that contains int32 vectors + Args: + :param filename (str): path to *.ibin file + :param start_idx (int): start reading vectors from this index + :param chunk_size (int): number of vectors to read. + If None, read all vectors + Returns: + Array of int32 vectors (numpy.ndarray) + """ + with open(filename, "rb") as f: + nvecs, dim = np.fromfile(f, count=2, dtype=np.int32) + nvecs = (nvecs - start_idx) if chunk_size is None else chunk_size + arr = np.fromfile(f, count=nvecs * dim, dtype=np.int32, + offset=start_idx * 4 * dim) + return arr.reshape(nvecs, dim) + + +def write_fbin(filename, vecs): + """ Write an array of float32 vectors to *.fbin file + Args:s + :param filename (str): path to *.fbin file + :param vecs (numpy.ndarray): array of float32 vectors to write + """ + assert len(vecs.shape) == 2, "Input array must have 2 dimensions" + with open(filename, "wb") as f: + nvecs, dim = vecs.shape + f.write(struct.pack(' len(files): + print(f'missing files! got {len(files)} but expected {len(different_attrs) * clients}') + print('got files:') + [print('\t' + f) for f in files] + print('probably missing files:') + [[print('\t' + f) for f in g if f not in files] for g in groups] + assert False + elif len(different_attrs) * clients < len(files): + print(f'too many files! got {len(files)} but expected {len(different_attrs) * clients}') + print('got files:') + [print('\t' + f) for f in files] + print('probably unnecessary files:') + [print('\t' + f) for f in files if len([g for g in groups if f in g]) == 0] + raise False + + for group in groups: + fn = group[0].split('client')[0][:-1] + '.hdf5' + f = h5py.File(fn, 'w') + + fs = [h5py.File(fi, 'r') for fi in group] + for k, v in fs[0].attrs.items(): + f.attrs[k] = v + f.attrs["best_search_time"] = average([fi.attrs["best_search_time"] for fi in fs]) + f.attrs["candidates"] = average([fi.attrs["candidates"] for fi in fs]) + f.attrs["start_querying_time"] = min([fi.attrs["start_querying_time"] for fi in fs]) + f.attrs["end_querying_time"] = max([fi.attrs["end_querying_time"] for fi in fs]) + + # As we split the test work between the clients, wee should concatenate their results + f['times'] = [t for fi in fs for t in fi['times']] + f['neighbors'] = [n for fi in fs for n in fi['neighbors']] + f['distances'] = [d for fi in fs for d in fi['distances']] + + [fi.close() for fi in fs] + [os.remove(fi) for fi in group] + f.close() + if __name__ == "__main__": parser = argparse.ArgumentParser( @@ -16,42 +61,50 @@ metavar='NAME', help='the dataset to load training points from', default='glove-100-angular') + parser.add_argument( + '--json-output', + help='Path to the output file. If defined will store the results in json format.', + default="" + ) parser.add_argument( "-k", "--count", - default=10, - type=positive_int, + default="10", + type=str, help="the number of near neighbours to search for") parser.add_argument( '--host', + type=str, help='host name or IP', - default='localhost') + default=None) parser.add_argument( '--port', - type=positive_int, + type=str, help='the port "host" is listening on', - default=6379) + default=None) parser.add_argument( '--auth', '-a', + type=str, metavar='PASS', help='password for connection', default=None) parser.add_argument( '--user', + type=str, metavar='NAME', help='user name for connection', default=None) parser.add_argument( '--build-clients', - type=int, + type=str, metavar='NUM', help='total number of clients running in parallel to build the index (could be 0)', - default=1) + default="1") parser.add_argument( '--test-clients', - type=int, + type=str, metavar='NUM', help='total number of clients running in parallel to test the index (could be 0)', - default=1) + default="1") parser.add_argument( '--force', help='re-run algorithms even if their results already exist', @@ -60,53 +113,188 @@ '--algorithm', metavar='ALGO', help='run redisearch with this algorithm', - default="hnsw") + default="redisearch-hnsw") + parser.add_argument( + '--run-group', + type=str, + metavar='NAME', + help='run only the named run group', + default=None) + parser.add_argument( + '--runs', + type=str, + help='run each algorithm instance %(metavar)s times and use only' + ' the best result', + default="3") parser.add_argument( '--cluster', action='store_true', help='working with a cluster') + parser.add_argument( + '--shards', + type=str, + metavar='NUM', + default="1", + help='specify number of shards') args = parser.parse_args() - redis = RedisCluster if args.cluster else Redis - redis = redis(host=args.host, port=args.port, password=args.auth, username=args.user) + # we should change to the proper workdir as soon we parse the args + # given some functions bellow require on relative path to the project + workdir = pathlib.Path(__file__).parent.absolute() + print("Changing the workdir to {}".format(workdir)) + os.chdir(workdir) - base = 'python run.py --local --algorithm redisearch-' + args.algorithm.lower() + ' -k ' + str(args.count) + \ - ' --dataset ' + args.dataset + ' --host ' + str(args.host) + ' --port ' + str(args.port) + # All supported algorithms that need spacial stuff + isredis = ismilvus = ispinecone = iselastic = False - if args.user: base += ' --user ' + str(args.user) - if args.auth: base += ' --auth ' + str(args.auth) - if args.force: base += ' --force' - if args.cluster:base += ' --cluster' + if 'redisearch' in args.algorithm: + from redis import Redis + from redis.cluster import RedisCluster + isredis = True - base_build = base + ' --build-only --total-clients ' + str(args.build_clients) - base_test = base + ' --test-only --runs 1 --total-clients ' + str(args.test_clients) + elif 'milvus' in args.algorithm: + from pymilvus import utility, connections + ismilvus = True - if args.build_clients > 0: - clients = [Process(target=system, args=(base_build + ' --client-id ' + str(i),)) for i in range(1, args.build_clients + 1)] + elif 'pinecone' in args.algorithm: + import pinecone + ispinecone = True - t0 = time.time() - for client in clients: client.start() - for client in clients: client.join() - total_time = time.time() - t0 - print(f'total build time: {total_time}\n\n') + elif 'elasticsearch' in args.algorithm: + from elasticsearch import Elasticsearch + from elastic_transport.client_utils import DEFAULT + iselastic = True - fn = get_result_filename(args.dataset, args.count) - if not path.isdir(fn): - makedirs(fn) - fn = path.join(fn, 'build_stats.hdf5') - f = h5py.File(fn, 'w') - f.attrs["build_time"] = total_time - if args.cluster: - f.attrs["index_size"] = -1 # TODO: get total size from all the shards - else: - f.attrs["index_size"] = redis.ft('ann_benchmark').info()['vector_index_sz_mb']*0x100000 - f.close() + if args.host is None: + args.host = 'localhost' + if args.port is None: + if isredis: args.port = '6379' + elif ismilvus: args.port = '19530' + elif iselastic: args.port = '9200' + + if isredis: + redis = RedisCluster if args.cluster else Redis + redis = redis(host=args.host, port=int(args.port), password=args.auth, username=args.user) + elif ismilvus: + connections.connect(host=args.host, port=args.port) + elif ispinecone: + pinecone.init(api_key=args.auth) + elif iselastic: + args.user = args.user if args.user is not None else 'elastic' + args.auth = args.auth if args.auth is not None else os.environ.get('ELASTIC_PASSWORD', '') + try: + es = Elasticsearch([f'http://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth)) + es.info() + except Exception: + es = Elasticsearch([f'https://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth), ca_certs=os.environ.get('ELASTIC_CA', DEFAULT)) + + if args.run_group is not None: + run_groups = [args.run_group] + else: + run_groups = get_run_groups('algos.yaml', args.algorithm) + + base = 'python3 run.py --local --algorithm ' + args.algorithm + ' -k ' + args.count + ' --dataset ' + args.dataset + + if args.host: base += ' --host ' + args.host + if args.port: base += ' --port ' + args.port + if args.user: base += ' --user ' + args.user + if args.auth: base += ' --auth ' + args.auth + if args.force: base += ' --force' + if args.cluster: base += ' --cluster' + if args.shards: base += ' --shards ' + args.shards + + base_build = base + ' --build-only --total-clients ' + args.build_clients + base_test = base + ' --test-only --runs {} --total-clients {}'.format(args.runs, args.test_clients) + outputsdir = "{}/{}".format(workdir, get_result_filename(args.dataset, args.count)) + outputsdir = os.path.join(outputsdir, args.algorithm) + if not os.path.isdir(outputsdir): + os.makedirs(outputsdir) + results_dicts = [] + + # skipping aggregation if using one tester + if int(args.test_clients) > 1: + test_stats_files = set() + watcher = PatternMatchingEventHandler(["*.hdf5"], ignore_directories=True) + + + def on_created_or_modified(event): + test_stats_files.add(event.src_path) + + + watcher.on_created = on_created_or_modified + watcher.on_modified = on_created_or_modified + observer = Observer() + observer.schedule(watcher, outputsdir) + observer.start() + + for run_group in run_groups: + results_dict = {} + curr_base_build = base_build + ' --run-group ' + run_group + curr_base_test = base_test + ' --run-group ' + run_group + + if int(args.build_clients) > 0: + if isredis: + redis.flushall() + elif ismilvus: + if utility.has_collection('milvus'): + utility.drop_collection('milvus') + elif ispinecone: + for idx in pinecone.list_indexes(): + pinecone.delete_index(idx) + elif iselastic: + for idx in es.indices.stats()['indices']: + es.indices.delete(index=idx) + + clients = [Process(target=os.system, args=(curr_base_build + ' --client-id ' + str(i),)) for i in + range(1, int(args.build_clients) + 1)] + + t0 = time.time() + for client in clients: client.start() + for client in clients: client.join() + total_time = time.time() - t0 + print(f'total build time: {total_time}\n\n') + + fn = os.path.join(outputsdir, 'build_stats') + f = h5py.File(fn, 'w') + f.attrs["build_time"] = total_time + print(fn) + index_size = -1 + if isredis: + if not args.cluster: # TODO: get total size from all the shards + index_size = float(redis.ft('ann_benchmark').info()['vector_index_sz_mb']) * 1024 + f.attrs["index_size"] = index_size + elif iselastic: + f.attrs["index_size"] = es.indices.stats(index='ann_benchmark')['indices']['ann_benchmark']['total']['store']['size_in_bytes'] + f.close() + results_dict["build"] = {"total_clients": args.build_clients, "build_time": total_time, + "vector_index_sz_mb": index_size} + + if int(args.test_clients) > 0: + queriers = [Process(target=os.system, args=(curr_base_test + ' --client-id ' + str(i),)) for i in + range(1, int(args.test_clients) + 1)] + t0 = time.time() + for querier in queriers: querier.start() + for querier in queriers: querier.join() + query_time = time.time() - t0 + print(f'total test time: {query_time}') + results_dict["query"] = {"total_clients": args.test_clients, "test_time": query_time} + + results_dicts.append(results_dict) + + # skipping aggregation if using one tester + if int(args.test_clients) > 1: + observer.stop() + observer.join() + print( + f'summarizing {int(args.test_clients)} clients data ({len(test_stats_files)} files into {len(test_stats_files) // int(args.test_clients)})...') + # ls = os.listdir(outputsdir) + # ls.remove('build_stats') + # aggregate_outputs(ls, int(args.test_clients)) + aggregate_outputs(test_stats_files, int(args.test_clients)) + print('done!') - if args.test_clients > 0: - queriers = [Process(target=system, args=(base_test + ' --client-id ' + str(i),)) for i in range(1, args.test_clients + 1)] - t0 = time.time() - for querier in queriers: querier.start() - for querier in queriers: querier.join() - query_time = time.time() - t0 - print(f'total test time: {query_time}') + if args.json_output != "": + with open(args.json_output, "w") as json_out_file: + print(f'storing json result into: {args.json_output}') + json.dump(results_dict, json_out_file) diff --git a/requirements.txt b/requirements.txt index 99b97aa38..2885ef7f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,8 @@ pyyaml==5.4 psutil==5.6.6 scipy==1.3.3 scikit-learn==0.22.2 -jinja2==2.10 +jinja2==3.1.2 +pymilvus==2.0.2 +pinecone-client==2.0.11 +redis==4.3.2 +elasticsearch==8.3.1 diff --git a/run.py b/run.py index 8ca27bc2e..c0a094238 100644 --- a/run.py +++ b/run.py @@ -1,6 +1,12 @@ +import os +import pathlib + from ann_benchmarks.main import main from multiprocessing import freeze_support if __name__ == "__main__": + workdir = pathlib.Path(__file__).parent.absolute() + print("Changing the workdir to {}".format(workdir)) + os.chdir(workdir) freeze_support() main()