ETL Example with LogBus
How I use LogBus to curate my Elasticsearch indices.
TL;DR:
I use LogBus to process the logs on my servers and store the results in my Elasticsearch cluster. That by itself isn't so interesting, but there might be a few details of interest:
- Define ID's such that logs can be replayed without creating a mess of duplicate data.
- You don't need big data tooling to handle many ETL tasks.
- The LogBus
sample
plugin is simple but useful.
Idempotency FTW
I do not let Elasticsearch create the document id for me. Instead, I define it in a way that system logs can be re-processed without creating duplicates. I use the host, timestamp, and enough message info to avoid collisions:
doc._id = [doc.host.name, doc.event.kind, ts.format('x'), doc.message.replace(/\W/ug, '').slice(0, 33)].join(':')
Yields documents like this:
http :9200/logbus.journal-2025.09/_doc/nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b
{
"_id": "nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b",
"_index": "logbus.journal-2025.09",
"_primary_term": 1,
"_seq_no": 6430,
"_source": {
"ecs": {
"version": "1.1.0"
},
"event": {
"action": "dropped",
"category": [
"network"
],
"dataset": "iptables",
"id": "nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b",
"kind": "alert",
"outcome": "success",
"provider": "kernel",
"severity": 4,
"type": [
"connection",
"dropped"
]
},
...
},
"_version": 1,
"found": true
}
Easy ETL
There isn't much transformation going on here, just removing the day portion from the index so that all of the daily data gets written to a monthly index. The index-elasticsearch
plugin will use the event's index
field when defined, defaulting to the pipeline's index
. Once all dailies have been loaded into a single more performant monthly index, I can remove them to free up storage & memory.
pipeline:
extract:
module: query-elasticsearch
config:
index: logbus.journal-2025.08.*
scroll: 1m
search:
size: 333
endpoint: http://localhost:9200
transform:
module: js
inChannels:
- extract
config:
function: !!js/function >-
function(doc) {
const event = doc._source
event._id = doc._id
event._index = doc._index.slice(0, -3)
return event
}
load:
module: index-elasticsearch
inChannels:
- transform
outChannels: []
config:
bufferSize: 1000
# index: logbus-etl-out
endpoint: http://localhost:9200
# to snoop on a subset of the data
sample:
inChannels:
- transform
config:
nth: 1000
log-errors:
module: errors
inChannels:
- errors
config:
intervalSeconds: 5
stackDepth: 6
log-stats:
module: stats
inChannels:
- stats
config:
intervalSeconds: 5
log:
inChannels:
- load
- sample
- log-errors
- log-stats
Simple Sampling
The sample
plugin has proven useful to "trace" an operation without flooding your senses, terminal, or file system. Unlike other log shippers that I'm aware of, those samples could easily be directed to another sink instead of the log
channel.
If you've made this far and are curious how this looks in action, here is an example:
Thu Sep 24 16:22:41 erik@nue:~/logbus-journald
./logbus -v info vacuum-month.yml | bunyan -o short
16:22:58.833Z INFO index.js: started (stage=extract)
server: {
"name": "master",
"cluster_name": "tfks",
"cluster_uuid": "7_L7iNzBTfacbFvIiK2KrA",
"version": "8.6.1"
}
16:22:58.833Z INFO index.js: started (stage=transform)
16:22:58.835Z INFO index.js: started (stage=load)
server: {
"name": "master",
"cluster_name": "tfks",
"cluster_uuid": "7_L7iNzBTfacbFvIiK2KrA",
"version": "8.6.1"
}
16:22:58.835Z INFO index.js: started (stage=sample)
16:22:58.835Z INFO index.js: started (stage=log-errors)
16:22:58.836Z INFO index.js: started (stage=log-stats)
16:22:58.836Z INFO index.js: started (stage=log)
16:22:58.836Z INFO index.js: pipeline startup complete
16:22:59.384Z INFO index.js: INPUT DROPPED: IN=enp0s31f6 OUT= MAC=90:1b:0e:c4:3e:10:40:71:83:a5:e9:d3:08:00 SRC=152.32.165.32 DST=88.99.60.246 LEN=44 TOS=0x00 PREC=0x00 TTL=39 ID=0 DF PROTO=TCP SPT=57724 DPT=16434 WINDOW=1024 RES=0x00 SYN URGP=0 (stage=log, labels={}, timestamp=2025-09-01T21:50:50.671Z, container={})
process: {
"name": "kernel"
}
--
tags: [
"sys",
"parsed",
"do"
]
--
source: {
"ip": "152.32.165.32",
"port": "57724",
"mac": "90:1b:0e:c4:3e:10:40:71:83:a5:e9:d3:08:00",
"geo": {
"city_name": "Taipei",
"continent_code": "AS",
"country_iso_code": "TW",
"country_name": "Taiwan",
"location": [
121.5324,
25.0504
]
}
}
--
destination: {
"ip": "88.99.60.246",
"port": "16434"
}
--
rule: {
"name": "INPUT",
"protocol": "TCP"
}
--
event: {
"action": "dropped",
"dataset": "iptables",
"id": "nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b",
"kind": "alert",
"category": [
"network"
],
"type": [
"connection",
"dropped"
],
"outcome": "success",
"severity": 4,
"provider": "kernel"
}
--
ecs: {
"version": "1.1.0"
}
--
host: {
"name": "nue.tfks.net"
}
--
agent: {
"type": "logbus",
"version": "1.0.0",
"name": "journald",
"id": "logbus-journald.nue.tfks.net"
}
16:22:59.422Z INFO index.js: asked to stop: end of search (stage=extract)
16:22:59.430Z INFO index.js: scroll deleted (stage=extract, succeeded=true, num_freed=1)
16:22:59.431Z INFO index.js: shutting down { reason: 'end of search' }
16:22:59.431Z INFO index.js: stopping via end of search (stage=extract)
16:22:59.431Z INFO index.js: waiting to stop (stage=extract)
16:22:59.431Z INFO index.js: stopping via end of search (stage=log-errors)
16:22:59.431Z INFO index.js: stopping via end of search (stage=log-stats)
16:22:59.433Z INFO index.js: errors=0 events[rx=9803,tx=0] mbytes[rx=0,tx=0] (stage=log, type=stats, ts=2025-09-25T16:22:59.432Z, heap=73050736, rss=132870144, errors=0, rxEvents=9803, txEvents=0, rxBytes=0, txBytes=0, rxLines=0, txLines=0)
16:22:59.530Z INFO index.js: stopping via extract (stage=transform)
16:22:59.530Z INFO index.js: stopping via transform (stage=load)
16:22:59.543Z WARN index.js: waiting for bulk index requests (stage=load, inflight=10)
16:22:59.543Z INFO index.js: stopping via transform (stage=sample)
16:22:59.547Z INFO index.js: stopping via sample (stage=log)
16:23:00.433Z INFO index.js: all stages shut down cleanly