Ingesting data with BigQuery

Learn how to index and search Google BigQuery data in Elasticsearch using Python.

BigQuery is a Google platform that allows you to centralize data from their different sources and services into one repository. It also enables you to do data analysis and use GenAI and ML tools. Below are the ways to bring data into BigQuery:

Indexing data from all of these sources into Elasticsearch allows you to centralize your data sources for a better observability experience.

In this article, you'll learn how to index data from BigQuery into Elasticsearch using Python, enabling you to unify data from different systems for search and analysis.

You can use the example from this article in this Google Colab notebook.

Steps

  1. Prepare BigQuery
  2. Configure the BigQuery Python client
  3. Index data to Elasticsearch
  4. Search data

Prepare BigQuery

To use BigQuery, you need to access Google Cloud Console and create a project. Once done, you'll be redirected to this view:

BigQuery allows you to transfer data from Google Drive and Google Cloud Storage, and to upload local files. To upload data to BigQuery you must first create a dataset. Create one and name it "server-logs" so we can upload some files.

For this article, we'll upload a local dataset that includes different types of articles. Check BigQuery’s official documentation to learn how to upload local files.

Dataset

The file we will upload to BigQuery has data from a server log with HTTP responses and their descriptions in a ndjson format. The ndjson file includes these fields: ip_address, _timestamp, http_method, endpoint, status_code, response_time and status_code_description.

BigQuery will extract data from this file. Then, we'll consolidate it with Python and index it to Elasticsearch.

Create a file named logs.ndjson and populate it with the following:

{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:03Z", "http_method": "GET", "endpoint": "/about", "status_code": "404", "response_time": 89, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:07Z", "http_method": "GET", "endpoint": "/contact", "status_code": "404", "response_time": 76, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:01Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 123, "status_code_description": "OK"}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:04Z", "http_method": "GET", "endpoint": "/products", "status_code": "200", "response_time": 156, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:05Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 101, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:08Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 98, "status_code_description": "OK"}
{"ip_address": "192.168.1.6", "_timestamp": "2024-12-03T12:00:10Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 105, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:02Z", "http_method": "POST", "endpoint": "/login", "status_code": "500", "response_time": 340, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.5", "_timestamp": "2024-12-03T12:00:09Z", "http_method": "POST", "endpoint": "/payment", "status_code": "500", "response_time": 512, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.4", "_timestamp": "2024-12-03T12:00:06Z", "http_method": "POST", "endpoint": "/checkout", "status_code": "503", "response_time": 450, "status_code_description": "Service temporarily unavailable during the checkout process."}

We upload this file to the dataset we've just created (shown as "server_logs") and use "logs" as table name (shown as "table id").

Once you're done, your files should look like this:

Configure the BigQuery Python client

Below, we'll learn how to use the BigQuery Python client and Google Colab to build an app.

1. Dependencies

First, we must install the following dependencies:

!pip install google-cloud-bigquery elasticsearch==8.16

The google-cloud-bigquery dependency has the necessary tools to consume the BigQuery data, elasticsearch allows it to connect to Elastic and index the data, and getpass lets us enter sensitive variables without exposing them in the code. Let's import all the necessary dependencies:

from elasticsearch import Elasticsearch, exceptions
from google.cloud import bigquery
from google.colab import auth
from getpass import getpass
from datetime import datetime
import json

We also need to declare other variables and initialize the Elasticsearch client for Python:

ELASTICSEARCH_ENDPOINT = getpass("Elasticsearch endpoint: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")
# Google Cloud project name and BigQuery dataset name
PROJECT_ID = "elasticsearch-bigquery"
# dataset_id in format <your-project-name>.<your-dataset-name>
DATASET_ID = f'{PROJECT_ID}.server-logs'
# Elasticsearch client
es_client = Elasticsearch(
ELASTICSEARCH_ENDPOINT,
api_key=ELASTIC_API_KEY,
)

2. Authentication

To get the necessary credentials to use BigQuery, we'll use auth. Run the command line below and choose the same account you used to create the Google Cloud project:

auth.authenticate_user()

Now, let's see the data in BigQuery:

client = bigquery.Client(project=PROJECT_ID)
# Getting tables from dataset
tables = client.list_tables(DATASET_ID)
data = {}
for table in tables:
# Table id must be in format <dataset_name>.<table_name>
table_id = f"{DATASET_ID}.{table.table_id}"
print(f"Processing table: {table.table_id}")
# Query to retrieve BigQuery tables data
query = f"""
SELECT *
FROM `{table_id}`
"""
query_job = client.query(query)
results = query_job.result()
print(f"Results for table: {table.table_id}:")
data[table.table_id] = []
for row in results:
# Saving data with key=table_id
data[table.table_id].append(dict(row))
print(row)
# variable with data
logs_data = data['logs']

This should be the result you see:

Processing table: logs
Results for table: logs:
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/about', datetime.datetime(2024, 12, 3, 12, 0, 3, tzinfo=datetime.timezone.utc), 89, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/contact', datetime.datetime(2024, 12, 3, 12, 0, 7, tzinfo=datetime.timezone.utc), 76, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 1, tzinfo=datetime.timezone.utc), 123, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/products', datetime.datetime(2024, 12, 3, 12, 0, 4, tzinfo=datetime.timezone.utc), 156, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 5, tzinfo=datetime.timezone.utc), 101, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 8, tzinfo=datetime.timezone.utc), 98, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 10, tzinfo=datetime.timezone.utc), 105, '192.168.1.6'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/login', datetime.datetime(2024, 12, 3, 12, 0, 2, tzinfo=datetime.timezone.utc), 340, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/payment', datetime.datetime(2024, 12, 3, 12, 0, 9, tzinfo=datetime.timezone.utc), 512, '192.168.1.5'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Service temporarily unavailable during the checkout process.', 503, 'POST', '/checkout', datetime.datetime(2024, 12, 3, 12, 0, 6, tzinfo=datetime.timezone.utc), 450, '192.168.1.4'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})

With this simple code, we've extracted the data from BigQuery. We've stored it in the logs_data variable and can now use it with Elasticsearch.

Index data to Elasticsearch

We'll begin by defining the data structure from the Kibana Devtools console:

es_client.indices.create(
index="bigquery-logs",
body={
"mappings": {
"properties": {
"status_code_description": {"type": "match_only_text"},
"status_code": {"type": "keyword"},
"@timestamp": {"type": "date"},
"ip_address": {"type": "ip"},
"http_method": {"type": "keyword"},
"endpoint": {"type": "keyword"},
"response_time": {"type": "integer"},
}
}
}
)

The match_only_text field is a variant of the text field type that saves disk space by not storing the metadata to calculate scores. We use it since logs are usually time-centric, i.e. the date is more important than the match quality in the text field. Queries that use a textfield are compatible with the ones that use a match_only_text field.

We'll index the files using the Elasticsearch_bulk api:

bulk_data = []
for log_entry in logs_data:
# Convert timestamp to ISO 8601 string
timestamp_iso8601 = log_entry["_timestamp"].isoformat()
# Prepare action metadata
action_metadata = {
"index": {
"_index": "bigquery-logs",
"_id": f"{log_entry['ip_address']}-{timestamp_iso8601}"
}
}
# Prepare document
document = {
"ip_address": log_entry["ip_address"],
"status_code": log_entry["status_code"],
"@timestamp": timestamp_iso8601,
"http_method": log_entry["http_method"],
"endpoint": log_entry["endpoint"],
"response_time": log_entry["response_time"],
"status_code_description": log_entry["status_code_description"]
}
# Append to bulk data
bulk_data.append(action_metadata)
bulk_data.append(document)
print(bulk_data)
# Indexing data
response = es_client.bulk(body=bulk_data)

Search data

We can now run queries using the data from the bigquery-logs index.

For this example, we'll run a search using the error descriptions from the server in the (status_code_description field). In addition, we'll sort them by date and get the IP addresses of the errors:

es_client.search(
index="bigquery-logs",
body={
"query": {"match": {"status_code_description": "error"}},
"sort": [{"@timestamp": {"order": "desc"}}],
"aggs": {"by_ip": {"terms": {"field": "ip_address", "size": 10}}},
},
)

This is the result:

{
...
"hits": {
...
"hits": [
{
"_index": "bigquery-logs",
"_id": "192.168.1.5-2024-12-03T12:00:09+00:00",
"_score": null,
"_source": {
"ip_address": "192.168.1.5",
"status_code": 500,
"@timestamp": "2024-12-03T12:00:09+00:00",
"http_method": "POST",
"endpoint": "/payment",
"response_time": 512,
"status_code_description": "Internal error while processing the payment gateway."
},
"sort": [
1733227209000
]
},
{
"_index": "bigquery-logs",
"_id": "192.168.1.2-2024-12-03T12:00:02+00:00",
"_score": null,
"_source": {
"ip_address": "192.168.1.2",
"status_code": 500,
"@timestamp": "2024-12-03T12:00:02+00:00",
"http_method": "POST",
"endpoint": "/login",
"response_time": 340,
"status_code_description": "Internal error while processing the payment gateway."
},
"sort": [
1733227202000
]
}
]
},
"aggregations": {
"by_ip": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "192.168.1.2",
"doc_count": 1
},
{
"key": "192.168.1.5",
"doc_count": 1
}
]
}
}
}

Conclusion

Tools like BigQuery, which help to centralize information, are very useful for data management. In addition to search, using BigQuery with Elasticsearch allows you to leverage the power of ML and data analysis to detect or analyze issues in a simpler and faster way.

Want to get Elastic certified? Find out when the next Elasticsearch Engineer training is running!

Elasticsearch is packed with new features to help you build the best search solutions for your use case. Dive into our sample notebooks to learn more, start a free cloud trial, or try Elastic on your local machine now.

Related content

Ready to build state of the art search experiences?

Sufficiently advanced search isn’t achieved with the efforts of one. Elasticsearch is powered by data scientists, ML ops, engineers, and many more who are just as passionate about search as your are. Let’s connect and work together to build the magical search experience that will get you the results you want.

Try it yourself