Let's create a Logstash event pipeline to update config in your main pipeline
If you're using Elastic Search to ingest a lot of data then you've probably got a Logstash pipeline in action to help you streamline this. If your logstash pipeline depends on some custom config, and you want to be able to update this config without rebooting logstash then this method is a neat way of doing that.
Logstash introduced the feature of multiple pipelines in Logstash 6. And, in the docs you can see some of the usecases and patterns for multiple pipelines.
My suggestion is a new pattern to create a config pipeline that checks for config updates on a set schedule and then feeds events into the ingestion pipelines to update them. This keeps your ingestion pipeline nice and clean while the config pipeline polls another service for config updates.
An alternative option would be to send these config updates straight to Logstash from another service in your system. What you chose will depend on your supporting architecture. I found keeping it all in Logstash made sense for what I was doing and kept all the concerns nicely together.
If you do decide just to make Logstash repond to events then you can ignore the config updater pipeline and just use the event responder part of the ingestion pipeline.
Setting up multiple pipelines
It's very simple to definine and name multiple pipelines in pipelines.yml
/pipelines.yml
- pipeline.id: "updater-pipeline"
path.config: "/etc/logstash/conf.d/updater-pipeline.conf"
- pipeline.id: "main-pipeline"
path.config: "/etc/logstash/conf.d/main-pipeline.conf"
Setting up the Config Updater Pipeline
The updater pipeline uses the exec input to run a simple command every 300ms that echos the start of an UpdateEvent message. I mainly wanted the interval feature of exec to run an update job at a set frequency. Most of the functionality is done by a ruby script in the filter section.
/updater-pipeline.conf
input {
exec {
command => 'echo "{\"Kind\": \"UpdateEvent\"}"'
interval => 300
}
}
filter {
ruby {
init => "
require '/opt/logstash/pipeline/ruby_filters/updater-pipeline.rb'
"
code => "
process_get_config(event)
"
}
}
output {
# Send event to main ingestion pipeline.
pipeline {
send_to => "main-ingest-pipeline"
}
}
The filter section loads in a ruby file and calls the method process_get_config
passing it the event with the starter message.
Here is process_get_config
; it gets some config from an API and adds values from that config to the UpdateEvent message. If no config is returned it cancels the event so the UpdateEvent message is not sent.
def process_load_config(event)
config_load_time = Time.now
logger.info("Get config from API at #{config_load_time}")
config = get_config_from_api()
if config.empty?
event.cancel
return
end
event.set('[ConfigValue1]', config.Value1)
event.set('[ConfigValue2]', config.Value2)
event.set('[LoadTime]', config_load_time)
end
Make the main pipeline respond to Update Events
The main pipeline needs to continue getting whatever inputs it was before and have a new pipeline input with a receiver address
property that matches the send_to
address specified in the output of the updater pipeline. So you can see that the output above sends to "main-ingest-pipeline" and the input belows sets that name as the address.
/main-pipeline.conf : input
input {
# Main inputs for pipeline
# ...
# Input for config event updates from updater-pipeline
pipeline {
address => "main-ingest-pipeline"
}
}
Respond to incoming event message - Config Update
The filter section of the main pipeline needs to check for UpdateEvents before it gets on with the normal business it was created to do.
First it needs to do a bit of setup. It does this by calling main_init
. It does this only once.
Then every time the pipeline runs it checks the message kind. If the message is an UpdateEvent it calls the ruby method process_update_event
and then drops the event message so Logstash does not try to save the event message as a document in ElasticSearch.
If the message is not and UpdateEvent, then it will carry on doing it's normal business of saving stuff in ElasticSearch.
/main-pipeline.conf : filter
filter {
# Runs once at startup to set things up
ruby {
init => "
require '/opt/logstash/pipeline/ruby_filters/main-pipeline.rb'
main_init()
"
}
# First - Check if message is an UpdateEvent
if [@metadata][message][Kind] == "UpdateEvent"
{
ruby {
code => "
process_update_event(event)
"
}
# Drop - Stop processing and don't try to save the event message
drop {}
}
# Normal logstash business - Get on with what you were doing before
}
Storing and setting up our config
We need the config values to be persistent, so they can be used in any run of the pipeline. Variables named with a starting @@
like @@a_nice_variable
in ruby are class level and are persistent between pipeline runs.
They need to be declared at the class level not inside any methods. So add this to the top of your script file.
@@config = {}
The method main_init
is called when the pipeline is first run. We can't guarantee it will only run once so anything it does should be thread safe.
I'm using it to make sure when we start up the pipeline we have some valid config. This could be hardcoded defaults, or it could also call the API.
def main_init()
logger.info("main_init - Getting initial config")
unless defined?(@@config) && @@config
# @@variables in ruby are class level are persistent between pipeline runs
@@config = get_config()
@@config_load_time = Time.now
logger.info("main_init - Loaded #{@@config.size} Config: #{@@config}")
else
logger.info("main_init - Config already initialized: #{@@config}")
end
end
Respond to UpdateEvent message and update our cached config
The process_update_event method is the method that does the actual work
def process_update_event(event)
configValue1 = event.get('[ConfigValue1]').to_s.strip
configValue2 = event.get('[ConfigValue2]').to_s.strip
if configValue1.empty? && configValue2.empty?
event.cancel
return
end
# @@variables in ruby are class level are persistent between pipeline runs
@@config.value1 = configValue1
@@config.value2 = configValue2
@@config.load_time = Time.now
logger.info("Updated config at #{@@config.load_time} - #{config}")
end
And that is everything you need to make your main pipeline be able to respond to events and store config that can be dynamically changed. I've found this method to work really well. These event messages can come from any input source, but using this multipipeline method is a nice way of keeping all the Logstash related functionality together.