Let's create a Logstash event pipeline to update config in your main pipeline

If you're using Elastic Search to ingest a lot of data then you've probably got a Logstash pipeline in action to help you streamline this. If your logstash pipeline depends on some custom config, and you want to be able to update this config without rebooting logstash then this method is a neat way of doing that.

Logstash introduced the feature of multiple pipelines in Logstash 6. And, in the docs you can see some of the usecases and patterns for multiple pipelines.

My suggestion is a new pattern to create a config pipeline that checks for config updates on a set schedule and then feeds events into the ingestion pipelines to update them. This keeps your ingestion pipeline nice and clean while the config pipeline polls another service for config updates.

An alternative option would be to send these config updates straight to Logstash from another service in your system. What you chose will depend on your supporting architecture. I found keeping it all in Logstash made sense for what I was doing and kept all the concerns nicely together.

If you do decide just to make Logstash repond to events then you can ignore the config updater pipeline and just use the event responder part of the ingestion pipeline.

Setting up multiple pipelines

It's very simple to definine and name multiple pipelines in pipelines.yml

/pipelines.yml

- pipeline.id: "updater-pipeline"
  path.config: "/etc/logstash/conf.d/updater-pipeline.conf"
- pipeline.id: "main-pipeline"
  path.config: "/etc/logstash/conf.d/main-pipeline.conf"

Setting up the Config Updater Pipeline

The updater pipeline uses the exec input to run a simple command every 300ms that echos the start of an UpdateEvent message. I mainly wanted the interval feature of exec to run an update job at a set frequency. Most of the functionality is done by a ruby script in the filter section.

/updater-pipeline.conf

input {
  exec {
    command => 'echo  "{\"Kind\": \"UpdateEvent\"}"'
    interval => 300
  }
}

filter {
    ruby {
      init => "
        require '/opt/logstash/pipeline/ruby_filters/updater-pipeline.rb'
      "
      code => "
        process_get_config(event)
      "
    }
}

output {
    # Send event to main ingestion pipeline.
    pipeline {
        send_to => "main-ingest-pipeline"
    }
}

The filter section loads in a ruby file and calls the method process_get_config passing it the event with the starter message.

Here is process_get_config; it gets some config from an API and adds values from that config to the UpdateEvent message. If no config is returned it cancels the event so the UpdateEvent message is not sent.

def process_load_config(event)
    config_load_time = Time.now
   
    logger.info("Get config from API at #{config_load_time}")
    config = get_config_from_api()
    if config.empty?
        event.cancel
        return
    end

    event.set('[ConfigValue1]', config.Value1)
    event.set('[ConfigValue2]', config.Value2)
    event.set('[LoadTime]', config_load_time)
end

Make the main pipeline respond to Update Events

The main pipeline needs to continue getting whatever inputs it was before and have a new pipeline input with a receiver address property that matches the send_to address specified in the output of the updater pipeline. So you can see that the output above sends to "main-ingest-pipeline" and the input belows sets that name as the address.

/main-pipeline.conf : input

input {
    # Main inputs for pipeline
    # ...

    # Input for config event updates from updater-pipeline
    pipeline {
        address => "main-ingest-pipeline"
    }
}

Respond to incoming event message - Config Update

The filter section of the main pipeline needs to check for UpdateEvents before it gets on with the normal business it was created to do.

First it needs to do a bit of setup. It does this by calling main_init. It does this only once.

Then every time the pipeline runs it checks the message kind. If the message is an UpdateEvent it calls the ruby method process_update_event and then drops the event message so Logstash does not try to save the event message as a document in ElasticSearch.

If the message is not and UpdateEvent, then it will carry on doing it's normal business of saving stuff in ElasticSearch.

/main-pipeline.conf : filter

filter {
    # Runs once at startup to set things up
    ruby {
        init => "
            require '/opt/logstash/pipeline/ruby_filters/main-pipeline.rb'
            main_init()
        "
    }

    # First - Check if message is an UpdateEvent
    if [@metadata][message][Kind] == "UpdateEvent"
    {
        ruby {
            code => "
                process_update_event(event)
            "
        }
        # Drop - Stop processing and don't try to save the event message
        drop {}
    }

    # Normal logstash business - Get on with what you were doing before
}

Storing and setting up our config

We need the config values to be persistent, so they can be used in any run of the pipeline. Variables named with a starting @@ like @@a_nice_variable in ruby are class level and are persistent between pipeline runs.

They need to be declared at the class level not inside any methods. So add this to the top of your script file.

@@config = {}

The method main_init is called when the pipeline is first run. We can't guarantee it will only run once so anything it does should be thread safe.

I'm using it to make sure when we start up the pipeline we have some valid config. This could be hardcoded defaults, or it could also call the API.

def main_init()
    logger.info("main_init - Getting initial config")

    unless defined?(@@config) && @@config

        # @@variables in ruby are class level are persistent between pipeline runs

        @@config = get_config()
        @@config_load_time = Time.now
        logger.info("main_init - Loaded #{@@config.size} Config: #{@@config}")

    else

        logger.info("main_init - Config already initialized: #{@@config}")

    end

end

Respond to UpdateEvent message and update our cached config

The process_update_event method is the method that does the actual work

def process_update_event(event)

    configValue1 = event.get('[ConfigValue1]').to_s.strip
    configValue2 = event.get('[ConfigValue2]').to_s.strip

    if configValue1.empty? && configValue2.empty?
        event.cancel
        return
    end


    # @@variables in ruby are class level are persistent between pipeline runs

    @@config.value1 = configValue1
    @@config.value2 = configValue2

    @@config.load_time = Time.now

    logger.info("Updated config at #{@@config.load_time} - #{config}")

end

And that is everything you need to make your main pipeline be able to respond to events and store config that can be dynamically changed. I've found this method to work really well. These event messages can come from any input source, but using this multipipeline method is a nice way of keeping all the Logstash related functionality together.