Bottom
I needed to extract a significant number of access logs from the Elastic Cloud for analytical purposes over several months. During our research, I discovered that the number of logs generated per month was in the hundreds of millions, which far exceeded the limits of Kibana’s built-in tools. To overcome this problem, we implemented a Python script that leverages the Elasticsearch API and the search_after
method to iteratively retrieve the logs.
Challenges
- High volume of logs: Querying logs for several months meant dealing with hundreds of millions of records.
- Kibana Limitations: The Kibana user interface and standard API have limitations that prevent searching such large datasets in a single query.
- Elasticsearch Query Limits: A single query can return a maximum of 10,000 records at a time.
- Performance considerations: avoiding high load on the Elasticsearch cluster, ensuring smooth data retrieval.
API Token Creation
To authenticate API requests, I created an API key with the following permissions:
{
"superuser": {
"cluster": ["all"],
"indices": [
{
"names": ["*"],
"privileges": ["all"],
"allow_restricted_indices": false
},
{
"names": ["*"],
"privileges": ["monitor", "read", "view_index_metadata", "read_cross_cluster", "manage"],
"allow_restricted_indices": true
}
]
}
}
Notes:
- The API key must be in Base64 format before using it in requests.
- While not the most secure approach, in this example, I grant access to all indexes, all privileges, and all clusters. This is the easiest way to do it, and users can limit the indexes and privileges required for the task.
Elasticsearch query configuration
I used Kibana’s development tools to build a query before implementing it in Python. The query included:
- A wildcard filter to search for logs matching specific user agents.
- A time range filter to limit logs within the required period.
- Sort by
_doc
to improve performance withsearch_after
.
Example query:
{
"query": {
"bool": {
"filter": [
{"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
{"range": {"@timestamp": {"gte": "now-2h", "lte": "now"}}}
]
}
},
"size": 10000,
"sort": [{"_doc": "desc"}],
"pit": {
"id": "",
"keep_alive": "60m"
},
"fields": [
"json.EdgeRequestHost","json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
]
}
Implementation using python
Before you begin, install the elasticsearch
package in any convenient way, for example,pip install elasticsearch
I implemented a python script using the elasticsearch
library:
import json
import time
from datetime import datetime, timezone, timedelta
from elasticsearch import Elasticsearch
# Elasticsearch connection settings
ES_URL = ""
API_KEY = ""
# Query parameters
BATCH_SIZE = 10000 # Max 10000
OUTPUT_FILE = "logs.json"
INDEX = "EXAMPLE_INDEX"
KEEP_ALIVE = "60m"
TIME_WINDOW = 60 # Minutes
# Initialize Elasticsearch client
es = Elasticsearch(ES_URL, api_key=API_KEY, request_timeout=60, verify_certs=True)
def create_pit():
"""Create Point in Time"""
return es.open_point_in_time(index=INDEX, keep_alive=KEEP_ALIVE)["id"]
def close_pit(pit_id):
"""Close Point in Time"""
es.close_point_in_time(body={"id": pit_id})
def get_query(pit_id, search_after=None):
"""Generate search query for last TIME_WINDOW minutes"""
now = datetime.now(timezone.utc)
start_time = now - timedelta(minutes=TIME_WINDOW)
query = {
"pit": {"id": pit_id, "keep_alive": KEEP_ALIVE},
"size": BATCH_SIZE,
"sort": [{"_doc": "desc"}],
"query": {
"bool": {
"filter": [
{"wildcard": {"json.ClientRequestUserAgent": {"value": "*oogle*"}}},
{"range": {"@timestamp": {"gte": start_time.isoformat(), "lte": now.isoformat(),
"format": "strict_date_optional_time"}}}
]
}
},
"fields": [
"json.EdgeRequestHost", "json.EdgeRequestPath", "json.ClientRequestUserAgent", "json.ClientRequestStatusCode", "json.ClientRequestReferer", "json.EdgeStartTimestamp"
]
}
if search_after:
query["search_after"] = search_after
return query
def transform_hit(hit):
"""Transform record into required format"""
fields = hit.get("fields", {})
return {
"remote_ip": fields.get("source.ip", ["-"])[0] if "source.ip" in fields else "-",
"remote_log": "-",
"user": "-",
"timestamp": fields.get("json.EdgeStartTimestamp", ["-"])[0] if "json.EdgeStartTimestamp" in fields else "-",
"request-path": fields.get('url.path', ['-'])[0] if "url.path" in fields else "-",
"request-host": fields.get('json.EdgeRequestHost', ['-'])[0] if "json.EdgeRequestHost" in fields else "-",
"status": "-",
"response-bytes": "-",
"time-take": "-",
"referer": fields.get("json.ClientRequestReferer", ["-"])[0] if "json.ClientRequestReferer" in fields else "-",
"ua": fields.get("json.ClientRequestUserAgent", ["-"])[0] if "json.ClientRequestUserAgent" in fields else "-"
}
def fetch_logs():
"""Fetch logs from Elasticsearch"""
pit_id = None
start_time = time.time()
try:
pit_id = create_pit()
print("PIT opened")
total_records = 0
search_after = None
print("Starting logs extraction from Elasticsearch...")
with open(OUTPUT_FILE, 'w', encoding='utf-8') as outfile:
while True:
response = es.search(body=get_query(pit_id, search_after))
hits = response.get("hits", {}).get("hits", [])
if not hits:
break
for hit in hits:
outfile.write(json.dumps(transform_hit(hit)) + "\n")
total_records += 1
if total_records % BATCH_SIZE == 0:
elapsed_time = time.time() - start_time
elapsed_str = str(timedelta(seconds=elapsed_time))
print(f"Processed {total_records} records. Time elapsed: {elapsed_str}...")
search_after = hits[-1].get("sort")
time.sleep(0.1)
elapsed_time = time.time() - start_time
elapsed_str = str(timedelta(seconds=elapsed_time))
print(f"\nTotal processed records: {total_records}. Saved to {OUTPUT_FILE}. Time taken: {elapsed_str}")
except Exception as e:
print(f"Error occurred: {e}")
finally:
if pit_id:
try:
close_pit(pit_id)
print("PIT closed")
except Exception as e:
print(f"Error closing PIT: {e}")
if __name__ == "__main__":
fetch_logs()
Details
Elastic Cloud Traffic Filters
To allow access from any machine, users may need to configure elastic cloud traffic filters.
- Filters can be created here.
- Official documentation: Elastic Cloud Traffic Filtering.
- After creating a filter, don’t forget to apply it:
- Go to Cloud.elastic.co/Deployments → Select your deployment → Security → Traffic Filters → Apply traffic filter.
Query development in Kibana dev tools
Before implementing queries in the script, I built and tested them in the Kibana Dev tools.
Documentation: Kibana Console.
Creating an API Key in Elastic Cloud
To authenticate requests, create an API key in the Elastic Cloud.
- The API key must be Base64 encoded for use in the script.
- Generate it in the Kibana security section or in the elastic cloud console.
- Documentation: Create API key.
Conclusion
Using search_after
Pit allowed us to efficiently fetch large log data sets from Elastic Cloud.