Working with InfluxDB in Go

InfluxDB is one of the most popular time series databases on the market. In this post you will learn its key concepts, as well as, how to interact with InfluxDB in Go on example.

InfluxDB

InfluxDB design conceptually reminds relational database. Don't get me wrong, it is not a relational database in any sense. It doesn't impose schema on your data and doesn't implement SQL. But... it has equivalent concepts and is operated via DSL looking quite similar to SQL.

Key concepts

If you are new to InfluxDB, have a look at key concepts with examples here. If you need a quick brush-up, read on.

The largest entity in InfluxDB is a database. You can have multiple databases in one InfluxDB instance.

A database usually has multiple measurements. A measurement is roughly equivalent to a relational table, but it doesn't enforce any structure on data it stores.

Just like relational tables contain records, measurements contain points. Every point has mandatory fields(s) and a timestamp. Also a point may have optional tag(s).

A timestamp specifies when its point was created. It is a primary index of a measurement.

A field is a key=value pair, which is similar to a column with its value in a relational table.

A tag is also a key=value pair, but it differs from a field, because measurements are always indexed on tag(s) and never on fields. In that way tags are equivalent to indexed columns in a relational table.

InfluxDB also has series concept, which is a combination of a measurement and tag(s). Series is similar to compound index in a relational table.

There are official schema design recommendations. Make sure you read them before creating your first measurement.

Line Protocol

Every point is encoded using Line Protocol, which has a simple structure:

[key] [fields] [timestamp]

A key consists of a measurement and optional tags separated by comma.

It's easier to understand Line Protocol on an example of one point:

cpu,host=server02,region=uswest value=3 1434055562000010000  

This point belongs to a measurement cpu. The point specifies two tags host and region. Also the point has one field with name value and value 3. The timestamp at the end informs us when the point was created.

InfluxDB Go client

InfluxDB team developed and supports a Go client. This package provides convenience functions to read and write time series data. Under the hood it utilizes InfluxDB RESTful API.

Connecting to InfluxDB

To connect to InfluxDB you need to create a new HTTP client, e.g.:

httpClient, err := client.NewHTTPClient(client.HTTPConfig{  
        Addr: "http://localhost:8086",
        Username: "username",
        Password: "password",
    })
Writing data

When writing points to InfluxDB you should do it in batches to improve performance, e.g.:

bp, _ := client.NewBatchPoints(client.BatchPointsConfig{  
    Database:  "nodes",
    Precision: "us",
})

bp.AddPoint(client.NewPoint(  
    "cpu_usage",
    tags,
    fields,
    time.Now(),
))

....

err := httpClient.Write(bp)
Reading data

To read data from InfluxDB, you need to create and execute a Query, e.g.:

q := client.Query{  
    Command:  fmt.Sprintf("select mean(cpu_usage) from node_status where cluster = '%s'", cluster),
    Database: "nodes",
}

resp, err := c.Query(q)

...

res, err := resp.Results[0].Series[0].Values[0][1].(json.Number).Float64()  

Example

First you need to install InfluxDB if you haven't done it yet.

Creating user and database
influx

> create user monitor with password 'secret'
> grant all privileges to monitor

> create database nodes
Example

Example below connects to InfluxDB, generates random metrics for two clusters for 20 days and calculates mean cpu usage for both clusters. Source code of the example is on GitHub.

package main

import (  
    "encoding/json"
    "fmt"
    "github.com/influxdata/influxdb/client/v2"
    "log"
    "math/rand"
    "time"
)

const (  
    database = "nodes"
    username = "monitor"
    password = "secret"
)

var clusters = []string{"public", "private"}

func main() {  
    c := influxDBClient()
    createMetrics(c)
    for _, cluster := range clusters {
        log.Printf("Mean values: cluster='%s', cpu_usage='%f'", cluster, meanCpuUsage(c, cluster))
    }
}

func influxDBClient() client.Client {  
    c, err := client.NewHTTPClient(client.HTTPConfig{
        Addr:     "http://localhost:8086",
        Username: username,
        Password: password,
    })
    if err != nil {
        log.Fatalln("Error: ", err)
    }
    return c
}

func createMetrics(c client.Client) {  
    bp, err := client.NewBatchPoints(client.BatchPointsConfig{
        Database:  database,
        Precision: "s",
    })

    if err != nil {
        log.Fatalln("Error: ", err)
    }

    eventTime := time.Now().Add(time.Second * -20)

    var t time.Duration
    for t = 0; t < 20; t++ {
        for i := 0; i < 100; i++ {
            clusterIndex := rand.Intn(len(clusters))
            tags := map[string]string{
                "cluster": clusters[clusterIndex],
                "host":    fmt.Sprintf("192.168.%d.%d", clusterIndex, rand.Intn(100)),
            }

            fields := map[string]interface{}{
                "cpu_usage":  rand.Float64() * 100.0,
                "disk_usage": rand.Float64() * 100.0,
            }

            point, err := client.NewPoint(
                "node_status",
                tags,
                fields,
                eventTime.Add(time.Second*10),
            )
            if err != nil {
                log.Fatalln("Error: ", err)
            }

            bp.AddPoint(point)
        }
    }

    err = c.Write(bp)
    if err != nil {
        log.Fatal(err)
    }
}

func meanCpuUsage(c client.Client, cluster string) float64 {  
    q := client.Query{
        Command:  fmt.Sprintf("select mean(cpu_usage) from node_status where cluster = '%s'", cluster),
        Database: database,
    }

    resp, err := c.Query(q)
    if err != nil {
        log.Fatalln("Error: ", err)
    }
    if resp.Error() != nil {
        log.Fatalln("Error: ", resp.Error())
    }

    res, err := resp.Results[0].Series[0].Values[0][1].(json.Number).Float64()
    if err != nil {
        log.Fatalln("Error: ", err)
    }

    return res
}

Sample output:

2016/04/18 19:04:24 Mean values: cluster='public', cpu_usage='47.198448'  
2016/04/18 19:04:24 Mean values: cluster='private', cpu_usage='45.478159'  

In summary

InfluxDB is a time series database optimised for writes and series searches. It has a data manipulation DSL designed to be very similar to SQL to simplify adoption. Also InfluxDB team provides a Go client, which allows to simplify writing and reading data.

comments powered by Disqus