From da89cad964c8f0518c29d52b6583307191290155 Mon Sep 17 00:00:00 2001 From: Yuriy Mednikov Date: Thu, 13 Oct 2016 15:27:38 +0300 Subject: [PATCH 1/2] Add elasticsearch index settings backup/restore Add sleep-time client argument for sleeping between scrolls Add scroll-time client argument for configure it from shell Use backup directory instead of separate files for mappings and docs --- elasticbackup/backup.py | 103 ++++++++++++++++++++++++++++----------- elasticbackup/restore.py | 78 +++++++++++++++++++++-------- elasticbackup/utils.py | 24 +++++++++ 3 files changed, 155 insertions(+), 50 deletions(-) create mode 100644 elasticbackup/utils.py diff --git a/elasticbackup/backup.py b/elasticbackup/backup.py index 641f7a5..ffbb4df 100755 --- a/elasticbackup/backup.py +++ b/elasticbackup/backup.py @@ -2,47 +2,58 @@ from __future__ import print_function +import os import argparse import datetime -import logging import json import time import elasticsearch -logging.basicConfig(format='%(asctime)s [%(name)s] [%(levelname)s] ' - '%(message)s', - datefmt='%Y-%m-%d %H:%M:%S') +from utils import log +from utils import log_es +from utils import log_levels +from utils import positive_int +from utils import nonnegative_float -log_levels = [logging.WARN, logging.INFO, logging.DEBUG] -log = logging.getLogger('elasticbackup') -log_es = logging.getLogger('elasticsearch') today = datetime.datetime.utcnow().strftime("%Y%m%d") -mappings_filename = "%%(index_name)s-mappings-%s.json" % today -documents_filename = "%%(index_name)s-documents-%s.json" % today parser = argparse.ArgumentParser( 'elasticbackup', - description='Back up data and mappings from an ElasticSearch index') + description='Back up settings, mappings and data from an ElasticSearch ' + 'index') parser.add_argument('host', help='elasticsearch host') + parser.add_argument('index', help='elasticsearch index name') -parser.add_argument('-m', '--mappings-file', - help='mappings output filename', - default=mappings_filename) -parser.add_argument('-d', '--documents-file', - help='documents output filename', - default=documents_filename) + +parser.add_argument('-d', '--backup-dir', + help='backup parent directory', + default="./") + parser.add_argument('-b', '--batch-size', help='document download batch size', - type=int, + type=positive_int, default=1000) + parser.add_argument('-q', '--query', help='query to pass to elasticsearch') + +parser.add_argument('--sleep-time', + help='sleep time between scrolls in seconds', + type=nonnegative_float, + default=1.0) + +parser.add_argument('--scroll-time', + help='scroll time in seconds', + type=positive_int, + default=600) + parser.add_argument('-u', '--user', help='HTTP auth (in format user:pass)') + parser.add_argument('-v', '--verbose', help='increase output verbosity', action='count', @@ -50,35 +61,51 @@ def write_mappings(es, index, f): + log.info("Write mappings") mapping = es.indices.get_mapping(index) json.dump(mapping[index], f) + log.info("Write mappings complete") + +def write_settings(es, index, f): + log.info("Write settings") + settings = es.indices.get_settings(index)[index]["settings"]["index"] -def write_documents(es, index, f, batch_size=1000, query=None): - def _write_hits(results): - hits = results['hits']['hits'] + del settings["uuid"] + del settings["version"] + del settings["creation_date"] + del settings["number_of_shards"] + del settings["number_of_replicas"] + + json.dump({"settings": settings}, f) + log.info("Write settings complete") + + +def write_documents(es, index, f, sleep, scroll, batch_size=1000, query=None): + def _write_hits(r): + hits = r['hits']['hits'] if hits: for hit in hits: hit.pop('_index', None) hit.pop('_score', None) f.write("%s\n" % json.dumps(hit)) - return results['_scroll_id'], len(hits) - else: - return None, 0 + return r['_scroll_id'], len(hits) + return None, 0 if query is None: query = {"query": {"match_all": {}}} status = "got batch of %s (total: %s)" - results = es.search(index=index, body=query, scroll="10m", size=batch_size) + results = es.search(index=index, body=query, scroll=scroll, + size=batch_size) scroll_id, num = _write_hits(results) total = num log.info(status, num, total) while scroll_id is not None: - time.sleep(1) - results = es.scroll(scroll_id=scroll_id, scroll='10m') + time.sleep(sleep) + results = es.scroll(scroll_id=scroll_id, scroll=scroll) scroll_id, num = _write_hits(results) total += num log.info(status, num, total) @@ -86,26 +113,44 @@ def _write_hits(results): def main(): args = parser.parse_args() - verbose = min(args.verbose, 2) log.setLevel(log_levels[verbose]) log_es.setLevel(log_levels[verbose]) + backup_dir = os.path.join(args.backup_dir, + "%s-%s" % (args.index, today)) + + mappings_path = os.path.join(backup_dir, "mappings.json") + settings_path = os.path.join(backup_dir, "settings.json") + documents_path = os.path.join(backup_dir, "documents.json") + conn_kwargs = {} if args.user: conn_kwargs['http_auth'] = args.user es = elasticsearch.Elasticsearch([args.host], **conn_kwargs) - with open(args.mappings_file % {'index_name': args.index}, 'w') as f: + if os.path.exists(backup_dir): + return log.warn("Dir %s already exists. Resolve it's, please" % + backup_dir) + + os.mkdir(backup_dir) + + with open(settings_path, 'w+') as f: + write_settings(es, args.index, f) + + with open(mappings_path, 'w+') as f: write_mappings(es, args.index, f) - with open(args.documents_file % {'index_name': args.index}, 'w') as f: + with open(documents_path, 'w+') as f: write_documents(es, args.index, f, + sleep=args.sleep_time, + scroll="%ds" % args.scroll_time, batch_size=args.batch_size, query=args.query) + log.info("Backup can be found at %s" % backup_dir) if __name__ == '__main__': main() diff --git a/elasticbackup/restore.py b/elasticbackup/restore.py index 59a9647..d7c4807 100755 --- a/elasticbackup/restore.py +++ b/elasticbackup/restore.py @@ -2,41 +2,41 @@ from __future__ import print_function -import argparse -import logging +import os +import time import json - +import argparse import elasticsearch -logging.basicConfig(format='%(asctime)s [%(name)s] [%(levelname)s] ' - '%(message)s', - datefmt='%Y-%m-%d %H:%M:%S') +from utils import log +from utils import log_es +from utils import log_levels +from utils import positive_int -log_levels = [logging.WARN, logging.INFO, logging.DEBUG] -log = logging.getLogger('elasticbackup') -log_es = logging.getLogger('elasticsearch') parser = argparse.ArgumentParser( 'elasticrestore', description='Restore data and mappings to an ElasticSearch index') + parser.add_argument('host', help='elasticsearch host') + parser.add_argument('index', help='elasticsearch index name') -parser.add_argument('-m', '--mappings-file', - help='mappings output filename', - required=True) -parser.add_argument('-d', '--documents-file', - help='documents output filename', - required=True) + +parser.add_argument('-d', '--backup-dir', + help='backup path') + parser.add_argument('-b', '--batch-size', help='document upload batch size', - type=int, + type=positive_int, default=1000) + parser.add_argument('-v', '--verbose', help='increase output verbosity', action='count', default=0) + parser.add_argument('-u', '--user', help='HTTP auth (in format user:pass)') @@ -82,15 +82,51 @@ def main(): log.setLevel(log_levels[verbose]) log_es.setLevel(log_levels[verbose]) + backup_dir = args.backup_dir + if not os.path.exists(backup_dir): + return log.warn("Backup path %s does not exists" % backup_dir) + + mappings_path = os.path.join(backup_dir, "mappings.json") + if not os.path.exists(mappings_path): + return log.warn("mappings path %s does not exists" % mappings_path) + + settings_path = os.path.join(backup_dir, "settings.json") + if not os.path.exists(settings_path): + return log.warn("settings path %s does not exists" % settings_path) + + documents_path = os.path.join(backup_dir, "documents.json") + if not os.path.exists(documents_path): + return log.warn("documents path %s does not exists" % documents_path) + conn_kwargs = {} if args.user: conn_kwargs['http_auth'] = args.user - es = elasticsearch.Elasticsearch([args.host], **conn_kwargs) - with open(args.mappings_file) as f: - create_index(es, args.index, f) - - with open(args.documents_file) as f: + es = elasticsearch.Elasticsearch([args.host], **conn_kwargs) + if es.indices.exists(index=args.index): + return log.warn( + "Index %s already exists. Execute for delete: \n" + "curl -XDELETE %s/%s" % (args.index, args.host, args.index)) + + es.indices.create(index=args.index) + time.sleep(1) + es.indices.close(index=args.index) + time.sleep(1) + + with open(settings_path) as f: + settings = json.load(f) + es.indices.put_settings(index=args.index, body=settings) + + es.indices.open(index=args.index) + time.sleep(1) + + with open(mappings_path) as f: + mappings = json.load(f) + for doc_type, doc_mapping in mappings["mappings"].items(): + es.indices.put_mapping(doc_type, index=args.index, + body=doc_mapping) + + with open(documents_path) as f: create_documents(es, args.index, f, batch_size=args.batch_size) if __name__ == '__main__': diff --git a/elasticbackup/utils.py b/elasticbackup/utils.py new file mode 100644 index 0000000..d7bf1e6 --- /dev/null +++ b/elasticbackup/utils.py @@ -0,0 +1,24 @@ +# coding: utf-8 +import logging + +logging.basicConfig(format='%(asctime)s [%(name)s] [%(levelname)s] ' + '%(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + +log_levels = [logging.WARN, logging.INFO, logging.DEBUG] +log = logging.getLogger('elasticbackup') +log_es = logging.getLogger('elasticsearch') + + +def positive_int(value): + intval = int(value) + if intval <= 0: + raise ValueError("%s is an invalid positive int value" % value) + return intval + + +def nonnegative_float(value): + floatval = float(value) + if floatval < 0: + raise ValueError("%s is an invalid non-negative float value" % value) + return floatval \ No newline at end of file From 4d383c4e2d54a4f4c98b40cf65f85b91697dbfea Mon Sep 17 00:00:00 2001 From: Mednikov Yuriy Date: Sun, 11 Dec 2016 03:44:00 +0300 Subject: [PATCH 2/2] ~ prepare to ingest file --- elasticbackup/restore.py | 56 +++++++++++++++++++++++++++++++++++----- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/elasticbackup/restore.py b/elasticbackup/restore.py index d7c4807..8dc8e44 100755 --- a/elasticbackup/restore.py +++ b/elasticbackup/restore.py @@ -46,11 +46,20 @@ def create_index(es, index, f): es.indices.create(index=index, body=mappings) -def create_documents(es, index, f, batch_size=1000): +def create_documents(es, index, f, pipelines=None, batch_size=1000): total = 0 - for size, batch in document_batches(f, batch_size): - es.bulk(index=index, body=batch) + for size, batch, doc_type in document_batches(f, batch_size): + pipeline = doc_type + "-pipeline" + try: + pipelines[pipeline] + except KeyError: + pipeline = None + + response = es.bulk(index=index, body=batch, pipeline=pipeline) + if response["errors"]: + print(response) + exit(-1) total += size log.info("uploaded %s (total: %s)", size, total) @@ -58,21 +67,32 @@ def create_documents(es, index, f, batch_size=1000): def document_batches(fp, batch_size): i = 0 batch = [] + doc_type = None for line in fp: obj = json.loads(line) + + if not doc_type: + doc_type = obj["_type"] + + if doc_type != obj["_type"]: + yield i, batch, doc_type + i = 0 + batch = [] + doc_type = None + src = obj.pop('_source') batch.append(json.dumps({"create": obj})) batch.append(json.dumps(src)) i += 1 if i >= batch_size: - yield i, batch + yield i, batch, doc_type i = 0 batch = [] if batch: - yield i, batch + yield i, batch, doc_type def main(): @@ -98,15 +118,26 @@ def main(): if not os.path.exists(documents_path): return log.warn("documents path %s does not exists" % documents_path) + ingest_path = os.path.join(backup_dir, "ingest.json") + ingest_pipelines = {} + if not os.path.exists(ingest_path): + log.debug("No ingest file has been found") + else: + with open(ingest_path) as f: + ingest_pipelines = json.load(f) + conn_kwargs = {} if args.user: conn_kwargs['http_auth'] = args.user + conn_kwargs["timeout"] = 300 + conn_kwargs["retry_on_timeout"] = True + es = elasticsearch.Elasticsearch([args.host], **conn_kwargs) if es.indices.exists(index=args.index): return log.warn( "Index %s already exists. Execute for delete: \n" - "curl -XDELETE %s/%s" % (args.index, args.host, args.index)) + "curl -XDELETE %s:9200/%s" % (args.index, args.host, args.index)) es.indices.create(index=args.index) time.sleep(1) @@ -125,9 +156,20 @@ def main(): for doc_type, doc_mapping in mappings["mappings"].items(): es.indices.put_mapping(doc_type, index=args.index, body=doc_mapping) + time.sleep(1) + + for pipeline_id, pipeline_body in ingest_pipelines.items(): + try: + exists = es.ingest.get_pipeline(pipeline_id) + print(exists) + except elasticsearch.exceptions.NotFoundError: + es.ingest.put_pipeline(pipeline_id, pipeline_body) with open(documents_path) as f: - create_documents(es, args.index, f, batch_size=args.batch_size) + create_documents(es, args.index, f, + pipelines=ingest_pipelines, + batch_size=args.batch_size) + if __name__ == '__main__': main()