Extracting and Loading Data From Elasticsearch With Python Updated⚓︎
Executive Summary⚓︎
This article is an update to the previous one from 2018. Basically it uses the opensearch-py
library as is more generic and compresses the json output using gzip.
helper.bulk⚓︎
At a high level the steps are; * Import the required packages * Setup some environment variables * Create the scan iterator * Then write all the data from the iterator to disk
## Install/Load in Libraries
!pip3 install opensearch-py compress_json
from opensearchpy import OpenSearch as Elasticsearch
from opensearchpy import helpers
import json
import compress_json
##set variables
elasticProtocol = 'https'
elastichost = 'hostDetails'
elasticPrefix = 'elasticsearchURLPrefix'
elasticport = '443'
elasticUser = 'username'
elasticPassword = 'password'
elasticIndex = 'index'
actions = []
fileRecordCount = 100000
fileCounter = 0
## Generate RFC-1738 formatted URL
elasticURL = '%s://%s:%s@%s:%s/%s' % (elasticProtocol,elasticUser, elasticPassword, elastichost, elasticport, elasticPrefix )
## Create Connection to Elasticsearch
es = Elasticsearch([elasticURL],verify_certs=True)
output = helpers.scan(es,
index=elasticIndex,
size=10000, ### Obviously this can be increased
query={"query": {"match_all": {}}},
)
## Write Everything Out to Disk
for record in output:
actions.append(record['_source'])
if len(actions) >= fileRecordCount:
compress_json.dump(actions, elasticIndex + '-extract-' + str(fileCounter) + '.json.gz' , {"compresslevel" : 9}, {"ensure_ascii" : False, "indent" :4, "sort_keys" :True})
actions = []
print('file ' + str(fileCounter) + ' written')
fileCounter = fileCounter + 1
if len(actions) > 0:
compress_json.dump(actions, elasticIndex + '-extract-' + str(fileCounter) + '.json.gz' , {"compresslevel" : 9}, {"ensure_ascii" : False, "indent" :4, "sort_keys" :True})
print('file ' + str(fileCounter) + ' written')
Now the data is written back into a different index
import glob
outputIndex = 'index-backup'
saveSize = 1000
files_to_load = glob.glob("./*.json.gzip")
for file in files_to_load
file_data = compress_json.load(file) # for loading a gzip file
actions = []
for record in file_data:
action = {
"_index": outputIndex,
'_op_type': 'index',
"_id": record['id'],
"_source": record
}
actions.append(action)
if len(actions) >= saveSize:
helpers.bulk(es, actions)
del actions[0:len(actions)]
if len(actions) > 0:
helpers.bulk(es, actions)