Sunday, November 29, 2020

Constructing a simple alerting system with well-known open source projects


Some time ago, I have been experimenting with all kinds of monitoring and alerting technologies. For example, with the following technologies, I can develop a simple alerting system with relative ease:

  • Telegraf is an agent that can be used to gather measurements and transfer the corresponding data to all kinds of storage solutions.
  • InfluxDB is a time series database platform that can store, manage and analyze timestamped data.
  • Kapacitor is a real-time streaming data process engine, that can be used for a variety of purposes. I can use Kapacitor to analyze measurements and see if a threshold has been exceeded so that an alert can be triggered.
  • Alerta is a monitoring system that can store, de-duplicate alerts, and arrange black outs.
  • Grafana is a multi-platform open source analytics and interactive visualization web application.

These technologies appear to be quite straight forward to use. However, as I was learning more about them, I discovered a number of oddities, that may have big implications.

Furthermore, testing and making incremental changes also turns out to be much more challenging than expected, making it very hard to diagnose and fix problems.

In this blog post, I will describe how I built a simple monitoring and alerting system, and elaborate about my learning experiences.

Building the alerting system


As described in the introduction, I can combine several technologies to create an alerting system. I will explain them more in detail in the upcoming sections.

Telegraf


Telegraf is a pluggable agent that gathers measurements from a variety of inputs (such as system metrics, platform metrics, database metrics etc.) and sends them to a variety of outputs, typically storage solutions (database management systems such as InfluxDB, PostgreSQL or MongoDB). Telegraf has a large plugin eco-system that provides all kinds integrations.

In this blog post, I will use InfluxDB as an output storage backend. For the inputs, I will restrict myself to capturing a sub set of system metrics only.

With the following telegraf.conf configuration file, I can capture a variety of system metrics every 10 seconds:

[agent]
  interval = "10s"

[[outputs.influxdb]]
  urls = [ "http://test1:8086" ]
  database = "sysmetricsdb"
  username = "sysmetricsdb"
  password = "sysmetricsdb"

[[inputs.system]]
  # no configuration

[[inputs.cpu]]
  ## Whether to report per-cpu stats or not
  percpu = true
  ## Whether to report total system cpu stats or not
  totalcpu = true
  ## If true, collect raw CPU time metrics.
  collect_cpu_time = false
  ## If true, compute and report the sum of all non-idle CPU states.
  report_active = true

[[inputs.mem]]
  # no configuration

With the above configuration file, I can collect the following metrics:
  • System metrics, such as the hostname and system load.
  • CPU metrics, such as how much the CPU cores on a machine are utilized, including the total CPU activity.
  • Memory (RAM) metrics.

The data will be stored in an InfluxDB database name: sysmetricsdb hosted on a remote machine with host name: test1.

InfluxDB


As explained earlier, InfluxDB is a timeseries platform that can store, manage and analyze timestamped data. In many ways, InfluxDB resembles relational databases, but there are also some notable differences.

The query language that InfluxDB uses is called InfluxQL (that shares many similarities with SQL).

For example, with the following query I can retrieve the first three data points from the cpu measurement, that contains the CPU-related measurements collected by Telegraf:

> precision rfc3339
> select * from "cpu" limit 3

providing me the following result set:

name: cpu
time                 cpu       host  usage_active       usage_guest usage_guest_nice usage_idle        usage_iowait        usage_irq usage_nice usage_softirq       usage_steal usage_system      usage_user
----                 ---       ----  ------------       ----------- ---------------- ----------        ------------        --------- ---------- -------------       ----------- ------------      ----------
2020-11-16T15:36:00Z cpu-total test2 10.665258711721098 0           0                89.3347412882789  0.10559662090813073 0         0          0.10559662090813073 0           8.658922914466714 1.79514255543822
2020-11-16T15:36:00Z cpu0      test2 10.665258711721098 0           0                89.3347412882789  0.10559662090813073 0         0          0.10559662090813073 0           8.658922914466714 1.79514255543822
2020-11-16T15:36:10Z cpu-total test2 0.1055966209080346 0           0                99.89440337909197 0                   0         0          0.10559662090813073 0           0                 0

As you may probably notice by looking at the output above, every data point has a timestamp and a number of fields capturing CPU metrics:

  • cpu identifies the CPU core.
  • host contains the host name of the machine.
  • The remainder of the fields contain all kinds of CPU metrics, e.g. how much CPU time is consumed by the system (usage_system), the user (usage_user), by waiting for IO (usage_iowait) etc.
  • The usage_active field contains the total CPU activity percentage, which is going to be useful to develop an alert that will warn us if there is too much CPU activity for a long period of time.

Aside from the fact that all data is timestamp based, data in InfluxDB has another notable difference compared to relational databases: an InfluxDB database is schemaless. You can add an arbitrary number of fields and tags to a data point without having to adjust the database structure (and migrating existing data to the new database structure).

Fields and tags can contain arbitrary data, such as numeric values or strings. Tags are also indexed so that you can search for these values more efficiently. Furthermore, tags can be used to group data.

For example, the cpu measurement collection has the following tags:

> SHOW TAG KEYS ON "sysmetricsdb" FROM "cpu";
name: cpu
tagKey
------
cpu
host

As shown in the above output, the cpu and host fields are tags in the cpu measurement.

We can use these tags to search for all data points related to a CPU core and/or host machine. Moreover, we can use these tags for grouping allowing us to compute aggregate values, sch as the mean value per CPU core and host.

Beyond storing and retrieving data, InfluxDB has many useful additional features:

  • You can also automatically sample data and run continuous queries that generate and store sampled data in the background.
  • Configure retention policies so that data is no longer stored for an indefinite amount of time. For example, you can configure a retention policy to drop raw data after a certain amount of time, but retain the corresponding sampled data.

InfluxDB has a "open core" development model. The free and open source edition (FOSS) of InfluxDB server (that is MIT licensed) allows you to host multiple databases on a multiple servers.

However, if you also want horizontal scalability and/or high assurance, then you need to switch to the hosted InfluxDB versions -- data in InfluxDB is partitioned into so-called shards of a fixed size (the default shard size is 168 hours).

These shards can be distributed over multiple InfluxDB servers. It is also possible to deploy multiple read replicas of the same shard to multiple InfluxDB servers improving read speed.

Kapacitor


Kapacitor is a real-time streaming data process engine developed by InfluxData -- the same company that also develops InfluxDB and Telegraf.

It can be used for all kinds of purposes. In my example cases, I will only use it to determine whether some threshold has been exceeded and an alert needs to be triggered.

Kapacitor works with customly implemented tasks that are written in a domain-specific language called the TICK script language. There are two kinds of tasks: stream and batch tasks. Both task types have advantages and disadvantages.

We can easily develop an alert that gets triggered if the CPU activity level is high for a relatively long period of time (more than 75% on average over 1 minute).

To implement this alert as a stream job, we can write the following TICK script:

dbrp "sysmetricsdb"."autogen"

stream
    |from()
        .measurement('cpu')
        .groupBy('host', 'cpu')
        .where(lambda: "cpu" != 'cpu-total')
    |window()
        .period(1m)
        .every(1m)
    |mean('usage_active')
    |alert()
        .message('Host: {{ index .Tags "host" }} has high cpu usage: {{ index .Fields "mean" }}')
        .warn(lambda: "mean" > 75.0)
        .crit(lambda: "mean" > 85.0)
        .alerta()
            .resource('{{ index .Tags "host" }}/{{ index .Tags "cpu" }}')
            .event('cpu overload')
            .value('{{ index .Fields "mean" }}')

A stream job is built around the following principles:

  • A stream task does not execute queries on an InfluxDB server. Instead, it creates a subscription to InfluxDB -- whenever a data point gets inserted into InfluxDB, the data points gets forwarded to Kapacitor as well.

    To make subscriptions work, both InfluxDB and Kapacitor need to be able to connect to each other with a public IP address.
  • A stream task defines a pipeline consisting of a number of nodes (connected with the | operator). Each node can consume data points, filter, transform, aggregate, or execute arbitrary operations (such as calling an external service), and produce new data points that can be propagated to the next node in the pipeline.
  • Every node also has property methods (such as .measurement('cpu')) making it possible to configure parameters.

The TICK script example shown above does the following:

  • The from node consumes cpu data points from the InfluxDB subscription, groups them by host and cpu and filters out data points with the the cpu-total label, because we are only interested in the CPU consumption per core, not the total amount.
  • The window node states that we should aggregate data points over the last 1 minute and pass the resulting (aggregated) data points to the next node after one minute in time has elapsed. To aggregate data, Kapacitor will buffer data points in memory.
  • The mean node computes the mean value for usage_active for the aggregated data points.
  • The alert node is used to trigger an alert of a specific severity level (WARNING if the mean activity percentage is bigger than 75%) and (CRITICAL if the mean activity percentage is bigger than 85%). In the remainder of the case, the status is considered OK. The alert is sent to Alerta.

It is also possible to write a similar kind of alerting script as a batch task:

dbrp "sysmetricsdb"."autogen"

batch
    |query('''
        SELECT mean("usage_active")
        FROM "sysmetricsdb"."autogen"."cpu"
        WHERE "cpu" != 'cpu-total'
    ''')
        .period(1m)
        .every(1m)
        .groupBy('host', 'cpu')
    |alert()
        .message('Host: {{ index .Tags "host" }} has high cpu usage: {{ index .Fields "mean" }}')
        .warn(lambda: "mean" > 75.0)
        .crit(lambda: "mean" > 85.0)
        .alerta()
            .resource('{{ index .Tags "host" }}/{{ index .Tags "cpu" }}')
            .event('cpu overload')
            .value('{{ index .Fields "mean" }}')

The above TICK script looks similar to the stream task shown earlier, but instead of using a subscription, the script queries the InfluxDB database (with an InfluxQL query) for data points over the last minute with a query node.

Which approach for writing a CPU alert is best, you may wonder? Each of these two approaches have their pros and cons:

  • Stream tasks offer low latency responses -- when a data point appears, a stream task can immediately respond, whereas a batch task needs to query every minute all the data points to compute the mean percentage over the last minute.
  • Stream tasks maintain a buffer for aggregating the data points making it possible to only send incremental updates to Alerta. Batch tasks are stateless. As a result, they need to update the status of all hosts and CPUs every minute.
  • Processing data points is done synchronously and in sequential order -- if an update round to Alerta takes too long (which is more likely to happen with a batch task), then the next processing run may overlap with the previous, causing all kinds of unpredictable results.

    It may also cause Kapacitor to eventually crash due to growing resource consumption.
  • Batch tasks may also miss data points -- while querying data over a certain time window, it may happen that a new data point gets inserted in that time window (that is being queried). This new data point will not be picked up by Kapacitor.

    A subscription made by a stream task, however, will never miss any data points.
  • Stream tasks can only work with data points that appear from the moment Kapacitor is started -- it cannot work with data points in the past.

    For example, if Kapacitor is restarted and some important event is triggered in the restart time window, Kapacitor will not notice that event, causing the alert to remain in its previous state.

    To work effectively with stream tasks, a continuous data stream is required that frequently reports on the status of a resource. Batch tasks, on the other hand, can work with historical data.
  • The fact that nodes maintain a buffer may also cause the RAM consumption of Kapacitor to grow considerably, if the data volumes are big.

    A batch task on the other hand, does not buffer any data and is more memory efficient.

    Another compelling advantage of batch tasks over stream tasks is that InfluxDB does all the work. The hosted version of InfluxDB can also horizontally scale.
  • Batch tasks can also aggregate data more efficiently (e.g. computing the mean value or sum of values over a certain time period).

I consider neither of these script types the optimal solution. However, for implementing the alerts I tend to have a slight preference for stream jobs, because of its low latency, and incremental update properties.

Alerta


As explained in the introduction, Alerta is a monitoring system that can store and de-duplicate alerts, and arrange black outs.

The Alerta server provides a REST API that can be used to query and modify alerting data and uses MongoDB or PostgreSQL as a storage database.

There are also a variety of Alerta clients: there is the alerta-cli allows you to control the service from the command-line. There is also a web user interface that I will show later in this blog post.

Running experiments


With all the components described above in place, we can start running experiments to see if the CPU alert will work as expected. To gain better insights in the process, I can install Grafana that allows me to visualize the measurements that are stored in InfluxDB.

Configuring a dashboard and panel for visualizing the CPU activity rate was straight forward. I configured a new dashboard, with the following variables:


The above variables allow me to select for each machine in the network, which CPU core's activity percentage I want to visualize.

I have configured the CPU panel as follows:


In the above configuration, I query the usage_activity from the cpu measurement collection, using the dashboard variables: cpu and host to filter for the right target machine and CPU core.

I have also configured the field unit to be a percentage value (between 0 and 100).

When running the following command-line instruction on a test machine that runs Telegraf (test2), I can deliberately hog the CPU:

$ dd if=/dev/zero of=/dev/null

The above command reads zero bytes (one-by-one) and discards them by sending them to /dev/null, causing the CPU to remain utilized at a high level:


In the graph shown above, it is clearly visible that CPU core 0 on the test2 machine remains utilized at 100% for several minutes.

(As a sidenote, we can also hog both the CPU and consume RAM at the same time with a simple command line instruction).

If we keep hogging the CPU and wait for at least a minute, the Alerta web interface dashboard will show a CRITICAL alert:


If we stop the dd command, then the TICK script should eventually notice that the mean percentage drops below the WARNING threshold causing the alert to go back into the OK state and disappearing from the Alerta dashboard.

Developing test cases


Being able to trigger an alert with a simple command-line instruction is useful, but not always convenient or effective -- one of the inconveniences is that we always have to wait at least one minute to get feedback.

Moreover, when an alert does not work, it is not always easy to find the root cause. I have encountered the following problems that contribute to a failing alert:

  • Telegraf may not be running and, as a result, not capturing the data points that need to be analyzed by the TICK script.
  • A subscription cannot be established between InfluxDB and Kapacitor. This may happen when Kapacitor cannot be reached through a public IP address.
  • There are data points collected, but only the wrong kinds of measurements.
  • The TICK script is functionally incorrect.

Fortunately, for stream tasks it is relatively easy to quickly find out whether an alert is functionally correct or not -- we can generate test cases that almost instantly trigger each possible outcome with a minimal amount of data points.

An interesting property of stream tasks is that they have no notion of time -- the .window(1m) property may suggest that Kapacitor computes the mean value of the data points every minute, but that is not what it actually does. Instead, Kapacitor only looks at the timestamps of the data points that it receives.

When Kapacitor sees that the timestamps of the data points fit in the 1 minute time window, then it keeps buffering. As soon as a data point appears that is outside this time window, the window node relays an aggregated data point to the next node (that computes the mean value, than in turn is consumed by the alert node deciding whether an alert needs to be raised or not).

We can exploit that knowledge, to create a very minimal bash test script that triggers every possible outcome: OK, WARNING and CRITICAL:

influxCmd="influx -database sysmetricsdb -host test1"

export ALERTA_ENDPOINT="http://test1"

### Trigger CRITICAL alert

# Force the average CPU consumption to be 100%
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=100   0000000000"
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=100  60000000000"
# This data point triggers the alert
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=100 120000000000"

sleep 1
actualSeverity=$(alerta --output json query | jq '.[0].severity')

if [ "$actualSeverity" != "critical" ]
then
     echo "Expected severity: critical, but we got: $actualSeverity" >&2
     false
fi
      
### Trigger WARNING alert

# Force the average CPU consumption to be 80%
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=80 180000000000"
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=80 240000000000"
# This data point triggers the alert
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=80 300000000000"

sleep 1
actualSeverity=$(alerta --output json query | jq '.[0].severity')

if [ "$actualSeverity" != "warning" ]
then
     echo "Expected severity: warning, but we got: $actualSeverity" >&2
     false
fi

### Trigger OK alert

# Force the average CPU consumption to be 0%
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=0 300000000000"
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=0 360000000000"
# This data point triggers the alert
$influxCmd -execute "INSERT cpu,cpu=cpu0,host=test2 usage_active=0 420000000000"

sleep 1
actualSeverity=$(alerta --output json query | jq '.[0].severity')

if [ "$actualSeverity" != "ok" ]
then
     echo "Expected severity: ok, but we got: $actualSeverity" >&2
     false
fi

The shell script shown above automatically triggers all three possible outcomes of the CPU alert:

  • CRITICAL is triggered by generating data points that force a mean activity percentage of 100%.
  • WARNING is triggered by a mean activity percentage of 80%.
  • OK is triggered by a mean activity percentage of 0%.

It uses the Alerta CLI to connect to the Alerta server to check whether the alert's severity level has the expected value.

We need three data points to trigger each alert type -- the first two data points are on the boundaries of the 1 minute window (0 seconds and 60 seconds), forcing the mean value to become the specified CPU activity percentage.

The third data point is deliberately outside the time window (of 1 minute), forcing the alert node to be triggered with a mean value over the previous two data points.

Although the above test strategy works to quickly validate all possible outcomes, one impractical aspect is that the timestamps in the above example start with 0 (meaning 0 seconds after the epoch: January 1st 1970 00:00 UTC).

If we also want to observe the data points generated by the above script in Grafana, we need to configure the panel to go back in time 50 years.

Fortunately, I can also easily adjust the script to start with a base timestamp, that is 1 hour in the past:

offset="$(($(date +%s) - 3600))"

With this tiny adjustment, we should see the following CPU graph (displaying data points from the last hour) after running the test script:


As you may notice, we can see that the CPU activity level quickly goes from 100%, to 80%, to 0%, using only 9 data points.

Although testing stream tasks (from a functional perspective) is quick and convenient, testing batch tasks in a similar way is difficult. Contrary to the stream task implementation, the query node in the batch task does have a notion of time (because of the WHERE clause that includes the now() expression).

Moreover, the embedded InfluxQL query evaluates the mean values every minute, but the test script does not exactly know when this event triggers.

The only way I could think of to (somewhat reliably) validate the outcomes is by creating a test script that continuously inserts data points for at least double the time window size (2 minutes) until Alerta reports the right alert status (if it does not after a while, I can conclude that the alert is incorrectly implemented).

Automating the deployment


As you may probably have already guessed, to be able to conveniently experiment with all these services, and to reliably run tests in isolation, some form of deployment automation is an absolute must-have.

Most people who do not know anything about my deployment technology preferences, will probably go for Docker or docker-compose, but I have decided to use a variety of solutions from the Nix project.

NixOps is used to automatically deploy a network of NixOS machines -- I have created a logical and physical NixOps configuration that deploys two VirtualBox virtual machines.

With the following command I can create and deploy the virtual machines:

$ nixops create network.nix network-virtualbox.nix -d test
$ nixops deploy -d test

The first machine: test1 is responsible for hosting the entire monitoring infrastructure (InfluxDB, Kapacitor, Alerta, Grafana), and the second machine (test2) runs Telegraf and the load tests.

Disnix (my own deployment tool) is responsible for deploying all services, such as InfluxDB, Kapacitor, Alarta, and the database storage backends. Contrary to docker-compose, Disnix does not work with containers (or other Docker objects, such as networks or volumes), but with arbitrary deployment units that are managed with a plugin system called Dysnomia.

Moreover, Disnix can also be used for distributed deployment in a network of machines.

I have packaged all the services and captured them in a Disnix services model that specifies all deployable services, their types, and their inter-dependencies.

If I combine the services model with the NixOps network models, and a distribution model (that maps Telegraf and the test scripts to the test2 machine and the remainder of the services to the first: test1), I can deploy the entire system:

$ export NIXOPS_DEPLOYMENT=test
$ export NIXOPS_USE_NIXOPS=1

$ disnixos-env -s services.nix \
  -n network.nix \
  -n network-virtualbox.nix \
  -d distribution.nix

The following diagram shows a possible deployment scenario of the system:


The above diagram describes the following properties:

  • The light-grey colored boxes denote machines. In the above diagram, we have two of them: test1 and test2 that correspond to the VirtualBox machines deployed by NixOps.
  • The dark-grey colored boxes denote containers in a Disnix-context (not to be confused with Linux or Docker containers). These are environments that manage other services.

    For example, a container service could be the PostgreSQL DBMS managing a number of PostgreSQL databases or the Apache HTTP server managing web applications.
  • The ovals denote services that could be any kind of deployment unit. In the above example, we have services that are running processes (managed by systemd), databases and web applications.
  • The arrows denote inter-dependencies between services. When a service has an inter-dependency on another service (i.e. the arrow points from the former to the latter), then the latter service needs to be activated first. Moreover, the former service also needs to know how the latter can be reached.
  • Services can also be container providers (as denoted by the arrows in the labels), stating that other services can be embedded inside this service.

    As already explained, the PostgreSQL DBMS is an example of such a service, because it can host multiple PostgreSQL databases.

Although the process components in the diagram above can also be conveniently deployed with Docker-based solutions (i.e. as I have explained in an earlier blog post, containers are somewhat confined and restricted processes), the non-process integrations need to be managed by other means, such as writing extra shell instructions in Dockerfiles.

In addition to deploying the system to machines managed by NixOps, it is also possible to use the NixOS test driver -- the NixOS test driver automatically generates QEMU virtual machines with a shared Nix store, so that no disk images need to be created, making it possible to quickly spawn networks of virtual machines, with very small storage footprints.

I can also create a minimal distribution model that only deploys the services required to run the test scripts -- Telegraf, Grafana and the front-end applications are not required, resulting in a much smaller deployment:


As can be seen in the above diagram, there are far fewer components required.

In this virtual network that runs a minimal system, we can run automated tests for rapid feedback. For example, the following test driver script (implemented in Python) will run my test shell script shown earlier:

test2.succeed("test-cpu-alerts")

With the following command I can automatically run the tests on the terminal:

$ nix-build release.nix -A tests

Availability


The deployment recipes, test scripts and documentation describing the configuration steps are stored in the monitoring playground repository that can be obtained from my GitHub page.

Besides the CPU activity alert described in this blog post, I have also developed a memory alert that triggers if too much RAM is consumed for a longer period of time.

In addition to virtual machines and services, there is also deployment automation in place allowing you also easily deploy Kapacitor TICK scripts and Grafana dashboards.

To deploy the system, you need to use the very latest version of Disnix (version 0.10) that was released very recently.

Acknowledgements


I would like to thank my employer: Mendix for writing this blog post. Mendix allows developers to work two days per month on research projects, making projects like these possible.

Presentation


I have given a presentation about this subject at Mendix. For convienence, I have embedded the slides:

No comments:

Post a Comment