Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 74 additions & 29 deletions elasticbackup/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,110 +2,155 @@

from __future__ import print_function

import os
import argparse
import datetime
import logging
import json
import time

import elasticsearch

logging.basicConfig(format='%(asctime)s [%(name)s] [%(levelname)s] '
'%(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
from utils import log
from utils import log_es
from utils import log_levels
from utils import positive_int
from utils import nonnegative_float

log_levels = [logging.WARN, logging.INFO, logging.DEBUG]
log = logging.getLogger('elasticbackup')
log_es = logging.getLogger('elasticsearch')

today = datetime.datetime.utcnow().strftime("%Y%m%d")
mappings_filename = "%%(index_name)s-mappings-%s.json" % today
documents_filename = "%%(index_name)s-documents-%s.json" % today

parser = argparse.ArgumentParser(
'elasticbackup',
description='Back up data and mappings from an ElasticSearch index')
description='Back up settings, mappings and data from an ElasticSearch '
'index')
parser.add_argument('host',
help='elasticsearch host')

parser.add_argument('index',
help='elasticsearch index name')
parser.add_argument('-m', '--mappings-file',
help='mappings output filename',
default=mappings_filename)
parser.add_argument('-d', '--documents-file',
help='documents output filename',
default=documents_filename)

parser.add_argument('-d', '--backup-dir',
help='backup parent directory',
default="./")

parser.add_argument('-b', '--batch-size',
help='document download batch size',
type=int,
type=positive_int,
default=1000)

parser.add_argument('-q', '--query',
help='query to pass to elasticsearch')

parser.add_argument('--sleep-time',
help='sleep time between scrolls in seconds',
type=nonnegative_float,
default=1.0)

parser.add_argument('--scroll-time',
help='scroll time in seconds',
type=positive_int,
default=600)

parser.add_argument('-u', '--user',
help='HTTP auth (in format user:pass)')

parser.add_argument('-v', '--verbose',
help='increase output verbosity',
action='count',
default=0)


def write_mappings(es, index, f):
log.info("Write mappings")
mapping = es.indices.get_mapping(index)
json.dump(mapping[index], f)
log.info("Write mappings complete")


def write_settings(es, index, f):
log.info("Write settings")
settings = es.indices.get_settings(index)[index]["settings"]["index"]

def write_documents(es, index, f, batch_size=1000, query=None):
def _write_hits(results):
hits = results['hits']['hits']
del settings["uuid"]
del settings["version"]
del settings["creation_date"]
del settings["number_of_shards"]
del settings["number_of_replicas"]

json.dump({"settings": settings}, f)
log.info("Write settings complete")


def write_documents(es, index, f, sleep, scroll, batch_size=1000, query=None):
def _write_hits(r):
hits = r['hits']['hits']
if hits:
for hit in hits:
hit.pop('_index', None)
hit.pop('_score', None)
f.write("%s\n" % json.dumps(hit))
return results['_scroll_id'], len(hits)
else:
return None, 0
return r['_scroll_id'], len(hits)
return None, 0

if query is None:
query = {"query": {"match_all": {}}}

status = "got batch of %s (total: %s)"

results = es.search(index=index, body=query, scroll="10m", size=batch_size)
results = es.search(index=index, body=query, scroll=scroll,
size=batch_size)
scroll_id, num = _write_hits(results)
total = num
log.info(status, num, total)

while scroll_id is not None:
time.sleep(1)
results = es.scroll(scroll_id=scroll_id, scroll='10m')
time.sleep(sleep)
results = es.scroll(scroll_id=scroll_id, scroll=scroll)
scroll_id, num = _write_hits(results)
total += num
log.info(status, num, total)


def main():
args = parser.parse_args()

verbose = min(args.verbose, 2)
log.setLevel(log_levels[verbose])
log_es.setLevel(log_levels[verbose])

backup_dir = os.path.join(args.backup_dir,
"%s-%s" % (args.index, today))

mappings_path = os.path.join(backup_dir, "mappings.json")
settings_path = os.path.join(backup_dir, "settings.json")
documents_path = os.path.join(backup_dir, "documents.json")

conn_kwargs = {}
if args.user:
conn_kwargs['http_auth'] = args.user
es = elasticsearch.Elasticsearch([args.host], **conn_kwargs)

with open(args.mappings_file % {'index_name': args.index}, 'w') as f:
if os.path.exists(backup_dir):
return log.warn("Dir %s already exists. Resolve it's, please" %
backup_dir)

os.mkdir(backup_dir)

with open(settings_path, 'w+') as f:
write_settings(es, args.index, f)

with open(mappings_path, 'w+') as f:
write_mappings(es, args.index, f)

with open(args.documents_file % {'index_name': args.index}, 'w') as f:
with open(documents_path, 'w+') as f:
write_documents(es,
args.index,
f,
sleep=args.sleep_time,
scroll="%ds" % args.scroll_time,
batch_size=args.batch_size,
query=args.query)

log.info("Backup can be found at %s" % backup_dir)

if __name__ == '__main__':
main()
130 changes: 104 additions & 26 deletions elasticbackup/restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,41 @@

from __future__ import print_function

import argparse
import logging
import os
import time
import json

import argparse
import elasticsearch

logging.basicConfig(format='%(asctime)s [%(name)s] [%(levelname)s] '
'%(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
from utils import log
from utils import log_es
from utils import log_levels
from utils import positive_int

log_levels = [logging.WARN, logging.INFO, logging.DEBUG]
log = logging.getLogger('elasticbackup')
log_es = logging.getLogger('elasticsearch')

parser = argparse.ArgumentParser(
'elasticrestore',
description='Restore data and mappings to an ElasticSearch index')

parser.add_argument('host',
help='elasticsearch host')

parser.add_argument('index',
help='elasticsearch index name')
parser.add_argument('-m', '--mappings-file',
help='mappings output filename',
required=True)
parser.add_argument('-d', '--documents-file',
help='documents output filename',
required=True)

parser.add_argument('-d', '--backup-dir',
help='backup path')

parser.add_argument('-b', '--batch-size',
help='document upload batch size',
type=int,
type=positive_int,
default=1000)

parser.add_argument('-v', '--verbose',
help='increase output verbosity',
action='count',
default=0)

parser.add_argument('-u', '--user',
help='HTTP auth (in format user:pass)')

Expand All @@ -46,33 +46,53 @@ def create_index(es, index, f):
es.indices.create(index=index, body=mappings)


def create_documents(es, index, f, batch_size=1000):
def create_documents(es, index, f, pipelines=None, batch_size=1000):
total = 0

for size, batch in document_batches(f, batch_size):
es.bulk(index=index, body=batch)
for size, batch, doc_type in document_batches(f, batch_size):
pipeline = doc_type + "-pipeline"
try:
pipelines[pipeline]
except KeyError:
pipeline = None

response = es.bulk(index=index, body=batch, pipeline=pipeline)
if response["errors"]:
print(response)
exit(-1)
total += size
log.info("uploaded %s (total: %s)", size, total)


def document_batches(fp, batch_size):
i = 0
batch = []
doc_type = None

for line in fp:
obj = json.loads(line)

if not doc_type:
doc_type = obj["_type"]

if doc_type != obj["_type"]:
yield i, batch, doc_type
i = 0
batch = []
doc_type = None

src = obj.pop('_source')
batch.append(json.dumps({"create": obj}))
batch.append(json.dumps(src))
i += 1

if i >= batch_size:
yield i, batch
yield i, batch, doc_type
i = 0
batch = []

if batch:
yield i, batch
yield i, batch, doc_type


def main():
Expand All @@ -82,16 +102,74 @@ def main():
log.setLevel(log_levels[verbose])
log_es.setLevel(log_levels[verbose])

backup_dir = args.backup_dir
if not os.path.exists(backup_dir):
return log.warn("Backup path %s does not exists" % backup_dir)

mappings_path = os.path.join(backup_dir, "mappings.json")
if not os.path.exists(mappings_path):
return log.warn("mappings path %s does not exists" % mappings_path)

settings_path = os.path.join(backup_dir, "settings.json")
if not os.path.exists(settings_path):
return log.warn("settings path %s does not exists" % settings_path)

documents_path = os.path.join(backup_dir, "documents.json")
if not os.path.exists(documents_path):
return log.warn("documents path %s does not exists" % documents_path)

ingest_path = os.path.join(backup_dir, "ingest.json")
ingest_pipelines = {}
if not os.path.exists(ingest_path):
log.debug("No ingest file has been found")
else:
with open(ingest_path) as f:
ingest_pipelines = json.load(f)

conn_kwargs = {}
if args.user:
conn_kwargs['http_auth'] = args.user
es = elasticsearch.Elasticsearch([args.host], **conn_kwargs)

with open(args.mappings_file) as f:
create_index(es, args.index, f)
conn_kwargs["timeout"] = 300
conn_kwargs["retry_on_timeout"] = True

es = elasticsearch.Elasticsearch([args.host], **conn_kwargs)
if es.indices.exists(index=args.index):
return log.warn(
"Index %s already exists. Execute for delete: \n"
"curl -XDELETE %s:9200/%s" % (args.index, args.host, args.index))

es.indices.create(index=args.index)
time.sleep(1)
es.indices.close(index=args.index)
time.sleep(1)

with open(settings_path) as f:
settings = json.load(f)
es.indices.put_settings(index=args.index, body=settings)

es.indices.open(index=args.index)
time.sleep(1)

with open(mappings_path) as f:
mappings = json.load(f)
for doc_type, doc_mapping in mappings["mappings"].items():
es.indices.put_mapping(doc_type, index=args.index,
body=doc_mapping)
time.sleep(1)

for pipeline_id, pipeline_body in ingest_pipelines.items():
try:
exists = es.ingest.get_pipeline(pipeline_id)
print(exists)
except elasticsearch.exceptions.NotFoundError:
es.ingest.put_pipeline(pipeline_id, pipeline_body)

with open(documents_path) as f:
create_documents(es, args.index, f,
pipelines=ingest_pipelines,
batch_size=args.batch_size)

with open(args.documents_file) as f:
create_documents(es, args.index, f, batch_size=args.batch_size)

if __name__ == '__main__':
main()
Loading