I needed to extract a significant number of access logs from Elastic Cloud for analytical purposes over several months. During our research, I discovered that the number of logs generated per month was in the hundreds of millions, which far exceeded the limit of Kibana's built-in tools. To overcome this issue, we implemented a Python script that leverages the Elasticsearch API and the search_after method to iteratively retrieve logs.
To authenticate API requests, I created an API key with the following permissions:
{
"superuser": {
"cluster": ["all"],
"indices": [
{
"names": ["*"],
"privileges": ["all"],
"allow_restricted_indices": false
},
{
"names": ["*"],
"privileges": ["monitor", "read", "view_index_metadata", "read_cross_cluster", "manage"],
"allow_restricted_indices": true
}
]
}
}
Notes:
I used Kibana's Dev Tools to construct a query before implementing it in Python. The query included:
_doc to improve performance with search_after.Example query:
{
"query": {
"bool": {
"filter": [
{"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
{"range": {"@timestamp": {"gte": "now-2h", "lte": "now"}}}
]
}
},
"size": 10000,
"sort": [{"_doc": "desc"}],
"pit": {
"id": "",
"keep_alive": "60m"
},
"fields": [
"json.EdgeRequestHost","json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
]
}
Before starting, install the elasticsearch package in any convenient way, e.g., pip install elasticsearch
I implemented a Python script using the elasticsearch library:
import json
import time
from datetime import datetime, timezone, timedelta
from elasticsearch import Elasticsearch
# Elasticsearch connection settings
ES_URL = ""
API_KEY = ""
# Query parameters
BATCH_SIZE = 10000 # Max 10000
OUTPUT_FILE = "logs.json"
INDEX = "EXAMPLE_INDEX"
KEEP_ALIVE = "60m"
TIME_WINDOW = 60 # Minutes
# Initialize Elasticsearch client
es = Elasticsearch(ES_URL, api_key=API_KEY, request_timeout=60, verify_certs=True)
def create_pit():
"""Create Point in Time"""
return es.open_point_in_time(index=INDEX, keep_alive=KEEP_ALIVE)["id"]
def close_pit(pit_id):
"""Close Point in Time"""
es.close_point_in_time(body={"id": pit_id})
def get_query(pit_id, search_after=None):
"""Generate search query for last TIME_WINDOW minutes"""
now = datetime.now(timezone.utc)
start_time = now - timedelta(minutes=TIME_WINDOW)
query = {
"pit": {"id": pit_id, "keep_alive": KEEP_ALIVE},
"size": BATCH_SIZE,
"sort": [{"_doc": "desc"}],
"query": {
"bool": {
"filter": [
{"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
{"range": {"@timestamp": {"gte": start_time.isoformat(), "lte": now.isoformat(),
"format": "strict_date_optional_time"}}}
]
}
},
"fields": [
"json.EdgeRequestHost", "json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
]
}
if search_after:
query["search_after"] = search_after
return query
def transform_hit(hit):
"""Transform record into required format"""
fields = hit.get("fields", {})
return {
"remote_ip": fields.get("source.ip", ["-"])[0] if "source.ip" in fields else "-",
"remote_log": "-",
"user": "-",
"timestamp": fields.get("json.EdgeStartTimestamp", ["-"])[0] if "json.EdgeStartTimestamp" in fields else "-",
"request-path": fields.get('url.path', ['-'])[0] if "url.path" in fields else "-",
"request-host": fields.get('json.EdgeRequestHost', ['-'])[0] if "json.EdgeRequestHost" in fields else "-",
"status": "-",
"response-bytes": "-",
"time-take": "-",
"referer": fields.get("json.ClientRequestReferer", ["-"])[0] if "json.ClientRequestReferer" in fields else "-",
"ua": fields.get("json.ClientRequestUserAgent", ["-"])[0] if "json.ClientRequestUserAgent" in fields else "-"
}
def fetch_logs():
"""Fetch logs from Elasticsearch"""
pit_id = None
start_time = time.time()
try:
pit_id = create_pit()
print("PIT opened")
total_records = 0
search_after = None
print("Starting logs extraction from Elasticsearch...")
with open(OUTPUT_FILE, 'w', encoding='utf-8') as outfile:
while True:
response = es.search(body=get_query(pit_id, search_after))
hits = response.get("hits", {}).get("hits", [])
if not hits:
break
for hit in hits:
outfile.write(json.dumps(transform_hit(hit)) + "\n")
total_records += 1
if total_records % BATCH_SIZE == 0:
elapsed_time = time.time() - start_time
elapsed_str = str(timedelta(seconds=elapsed_time))
print(f"Processed {total_records} records. Time elapsed: {elapsed_str}...")
search_after = hits[-1].get("sort")
time.sleep(0.1)
elapsed_time = time.time() - start_time
elapsed_str = str(timedelta(seconds=elapsed_time))
print(f"\nTotal processed records: {total_records}. Saved to {OUTPUT_FILE}. Time taken: {elapsed_str}")
except Exception as e:
print(f"Error occurred: {e}")
finally:
if pit_id:
try:
close_pit(pit_id)
print("PIT closed")
except Exception as e:
print(f"Error closing PIT: {e}")
if __name__ == "__main__":
fetch_logs()
To allow access from any machine, users may need to configure Traffic filters in Elastic Cloud.
Before implementing queries in the script, I first built and tested them in Kibana dev tools.
Documentation: Kibana Console.
To authenticate requests, create an API key in Elastic Cloud.
Using search_after PIT allowed us to efficiently fetch large log datasets from Elastic Cloud.