ETL Example with LogBus

How I use LogBus to curate my Elasticsearch indices.

TL;DR:

I use LogBus to process the logs on my servers and store the results in my Elasticsearch cluster. That by itself isn't so interesting, but there might be a few details of interest:

Idempotency FTW

I do not let Elasticsearch create the document id for me. Instead, I define it in a way that system logs can be re-processed without creating duplicates. I use the host, timestamp, and enough message info to avoid collisions:

doc._id = [doc.host.name, doc.event.kind, ts.format('x'), doc.message.replace(/\W/ug, '').slice(0, 33)].join(':')

Yields documents like this:

http :9200/logbus.journal-2025.09/_doc/nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b
{
    "_id": "nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b",
    "_index": "logbus.journal-2025.09",
    "_primary_term": 1,
    "_seq_no": 6430,
    "_source": {
        "ecs": {
            "version": "1.1.0"
        },
        "event": {
            "action": "dropped",
            "category": [
                "network"
            ],
            "dataset": "iptables",
            "id": "nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b",
            "kind": "alert",
            "outcome": "success",
            "provider": "kernel",
            "severity": 4,
            "type": [
                "connection",
                "dropped"
            ]
        },
        ...
    },
    "_version": 1,
    "found": true
}

Easy ETL

There isn't much transformation going on here, just removing the day portion from the index so that all of the daily data gets written to a monthly index. The index-elasticsearch plugin will use the event's index field when defined, defaulting to the pipeline's index. Once all dailies have been loaded into a single more performant monthly index, I can remove them to free up storage & memory.

pipeline:

  extract:
    module: query-elasticsearch
    config:
      index: logbus.journal-2025.08.*
      scroll: 1m
      search:
        size: 333
      endpoint: http://localhost:9200

  transform:
    module: js
    inChannels:
      - extract
    config:
      function: !!js/function >-
        function(doc) {
          const event = doc._source
          event._id = doc._id
          event._index = doc._index.slice(0, -3)
          return event
        }

  load:
    module: index-elasticsearch
    inChannels:
      - transform
    outChannels: []
    config:
      bufferSize: 1000
      # index: logbus-etl-out
      endpoint: http://localhost:9200

  # to snoop on a subset of the data
  sample:
    inChannels:
      - transform
    config:
      nth: 1000

  log-errors:
    module: errors
    inChannels:
      - errors
    config:
      intervalSeconds: 5
      stackDepth: 6

  log-stats:
    module: stats
    inChannels:
      - stats
    config:
      intervalSeconds: 5

  log:
    inChannels:
      - load
      - sample
      - log-errors
      - log-stats

Simple Sampling

The sample plugin has proven useful to "trace" an operation without flooding your senses, terminal, or file system. Unlike other log shippers that I'm aware of, those samples could easily be directed to another sink instead of the log channel.

If you've made this far and are curious how this looks in action, here is an example:

Thu Sep 24 16:22:41 erik@nue:~/logbus-journald
./logbus -v info vacuum-month.yml | bunyan -o short
16:22:58.833Z  INFO index.js: started (stage=extract)
    server: {
      "name": "master",
      "cluster_name": "tfks",
      "cluster_uuid": "7_L7iNzBTfacbFvIiK2KrA",
      "version": "8.6.1"
    }
16:22:58.833Z  INFO index.js: started (stage=transform)
16:22:58.835Z  INFO index.js: started (stage=load)
    server: {
      "name": "master",
      "cluster_name": "tfks",
      "cluster_uuid": "7_L7iNzBTfacbFvIiK2KrA",
      "version": "8.6.1"
    }
16:22:58.835Z  INFO index.js: started (stage=sample)
16:22:58.835Z  INFO index.js: started (stage=log-errors)
16:22:58.836Z  INFO index.js: started (stage=log-stats)
16:22:58.836Z  INFO index.js: started (stage=log)
16:22:58.836Z  INFO index.js: pipeline startup complete
16:22:59.384Z  INFO index.js: INPUT DROPPED: IN=enp0s31f6 OUT= MAC=90:1b:0e:c4:3e:10:40:71:83:a5:e9:d3:08:00 SRC=152.32.165.32 DST=88.99.60.246 LEN=44 TOS=0x00 PREC=0x00 TTL=39 ID=0 DF PROTO=TCP SPT=57724 DPT=16434 WINDOW=1024 RES=0x00 SYN URGP=0 (stage=log, labels={}, timestamp=2025-09-01T21:50:50.671Z, container={})
    process: {
      "name": "kernel"
    }
    --
    tags: [
      "sys",
      "parsed",
      "do"
    ]
    --
    source: {
      "ip": "152.32.165.32",
      "port": "57724",
      "mac": "90:1b:0e:c4:3e:10:40:71:83:a5:e9:d3:08:00",
      "geo": {
        "city_name": "Taipei",
        "continent_code": "AS",
        "country_iso_code": "TW",
        "country_name": "Taiwan",
        "location": [
          121.5324,
          25.0504
        ]
      }
    }
    --
    destination: {
      "ip": "88.99.60.246",
      "port": "16434"
    }
    --
    rule: {
      "name": "INPUT",
      "protocol": "TCP"
    }
    --
    event: {
      "action": "dropped",
      "dataset": "iptables",
      "id": "nue.tfks.net:alert:1756763450671:INPUTDROPPEDINenp0s31f6OUTMAC901b",
      "kind": "alert",
      "category": [
        "network"
      ],
      "type": [
        "connection",
        "dropped"
      ],
      "outcome": "success",
      "severity": 4,
      "provider": "kernel"
    }
    --
    ecs: {
      "version": "1.1.0"
    }
    --
    host: {
      "name": "nue.tfks.net"
    }
    --
    agent: {
      "type": "logbus",
      "version": "1.0.0",
      "name": "journald",
      "id": "logbus-journald.nue.tfks.net"
    }
16:22:59.422Z  INFO index.js: asked to stop: end of search (stage=extract)
16:22:59.430Z  INFO index.js: scroll deleted (stage=extract, succeeded=true, num_freed=1)
16:22:59.431Z  INFO index.js: shutting down { reason: 'end of search' }
16:22:59.431Z  INFO index.js: stopping via end of search (stage=extract)
16:22:59.431Z  INFO index.js: waiting to stop (stage=extract)
16:22:59.431Z  INFO index.js: stopping via end of search (stage=log-errors)
16:22:59.431Z  INFO index.js: stopping via end of search (stage=log-stats)
16:22:59.433Z  INFO index.js: errors=0 events[rx=9803,tx=0] mbytes[rx=0,tx=0] (stage=log, type=stats, ts=2025-09-25T16:22:59.432Z, heap=73050736, rss=132870144, errors=0, rxEvents=9803, txEvents=0, rxBytes=0, txBytes=0, rxLines=0, txLines=0)
16:22:59.530Z  INFO index.js: stopping via extract (stage=transform)
16:22:59.530Z  INFO index.js: stopping via transform (stage=load)
16:22:59.543Z  WARN index.js: waiting for bulk index requests (stage=load, inflight=10)
16:22:59.543Z  INFO index.js: stopping via transform (stage=sample)
16:22:59.547Z  INFO index.js: stopping via sample (stage=log)
16:23:00.433Z  INFO index.js: all stages shut down cleanly