Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(database/influxdb2): Add database support of influxdb2 client #203

Merged
merged 2 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ jobs:
with:
version: latest
skip-cache: true
args: --timeout=5m
44 changes: 43 additions & 1 deletion config_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ delete_interval = "1h"
#enable = true

# Save collected data to InfluxDB.
# There are the following measurments:
# There are the following measurements:
# node: store node specific data i.e. clients memory, airtime
# link: store link tq between two interfaces of two different nodes
# global: store global data, i.e. count of clients and nodes
Expand All @@ -219,6 +219,48 @@ password = ""
#system = "productive"
#site = "ffhb"

# Save collected data to InfluxDB2.
# There are the following measurements:
# node: store node specific data i.e. clients memory, airtime
# link: store link tq between two interfaces of two different nodes with i.e. nodeid, address, hostname
# global: store global data, i.e. count of clients and nodes
# firmware: store the count of nodes tagged with firmware
# model: store the count of nodes tagged with hardware model
# autoupdater: store the count of autoupdate branch
[[database.connection.influxdb2]]
enable = false
address = "http://localhost:8086"
token = ""
organization_id = ""
# fallback of no specific bucket for measurement is setup
bucket_default = "yanic"

# Specify bucket per measurement (of not set fallback bucket_default is used or panic)
#
# WARNING:
# yanic does NOT prune node's data (so please set up data retention in InfluxDB2 setup).
#
# We highly recommend to setup e.g. Data retention in your InfluxDB2 server per measurements.
# https://docs.influxdata.com/influxdb/v2/reference/internals/data-retention/
#
[database.connection.influxdb2.buckets]
#link = "yanic-temp"
#node = "yanic-temp"
#dhcp = "yanic-temp"
global = "yanic-persistent"
#firmware = "yanic-temp"
#model = "yanic-temp"
#autoupdater = "yanic-temp"

# Tagging of the data (optional)
[database.connection.influxdb2.tags]
# Tags used by Yanic would override the tags from this config
# nodeid, hostname, owner, model, firmware_base, firmware_release,frequency11g and frequency11a are tags which are already used
#tagname1 = "tagvalue 1"
# some useful e.g.:
#system = "productive"
#site = "ffhb"

# Graphite settings
[[database.connection.graphite]]
enable = false
Expand Down
1 change: 1 addition & 0 deletions database/all/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package all
import (
_ "github.com/FreifunkBremen/yanic/database/graphite"
_ "github.com/FreifunkBremen/yanic/database/influxdb"
_ "github.com/FreifunkBremen/yanic/database/influxdb2"
_ "github.com/FreifunkBremen/yanic/database/logging"
_ "github.com/FreifunkBremen/yanic/database/respondd"
)
126 changes: 126 additions & 0 deletions database/influxdb2/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package influxdb

import (
"context"

influxdb "github.com/influxdata/influxdb-client-go/v2"
influxdbAPI "github.com/influxdata/influxdb-client-go/v2/api"

"github.com/FreifunkBremen/yanic/database"
"github.com/bdlm/log"
)

const (
MeasurementLink = "link" // Measurement for per-link statistics
MeasurementNode = "node" // Measurement for per-node statistics
MeasurementDHCP = "dhcp" // Measurement for DHCP server statistics
MeasurementGlobal = "global" // Measurement for summarized global statistics
CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics
CounterMeasurementModel = "model" // Measurement for model statistics
CounterMeasurementAutoupdater = "autoupdater" // Measurement for autoupdater
batchMaxSize = 1000
)

type Connection struct {
database.Connection
config Config
client influxdb.Client
writeAPI map[string]influxdbAPI.WriteAPI
}

type Config map[string]interface{}

func (c Config) Address() string {
return c["address"].(string)
}
func (c Config) Token() string {
if d, ok := c["token"]; ok {
return d.(string)
}
log.Panic("influxdb2 - no token given")
return ""
}
func (c Config) Organization() string {
if d, ok := c["organization_id"]; ok {
return d.(string)
}
return ""
}
func (c Config) Bucket(measurement string) string {
logger := log.WithFields(map[string]interface{}{
"organization_id": c.Organization(),
"address": c.Address(),
"measurement": measurement,
})
if d, ok := c["buckets"]; ok {
dMap := d.(map[string]interface{})
if d, ok := dMap[measurement]; ok {
bucket := d.(string)
logger.WithField("bucket", bucket).Info("get bucket for writeapi")
return bucket
}
if d, ok := c["bucket_default"]; ok {
bucket := d.(string)
logger.WithField("bucket", bucket).Info("get bucket for writeapi")
return bucket
}
}
if d, ok := c["bucket_default"]; ok {
bucket := d.(string)
logger.WithField("bucket", bucket).Info("get bucket for writeapi")
return bucket
}
logger.Panic("no bucket found for measurement")
return ""
}
func (c Config) Tags() map[string]string {
if c["tags"] != nil {
tags := make(map[string]string)
for k, v := range c["tags"].(map[string]interface{}) {
tags[k] = v.(string)
}
return tags
}
return nil
}

func init() {
database.RegisterAdapter("influxdb2", Connect)
}
func Connect(configuration map[string]interface{}) (database.Connection, error) {
config := Config(configuration)

// Make client
client := influxdb.NewClientWithOptions(config.Address(), config.Token(), influxdb.DefaultOptions().SetBatchSize(batchMaxSize))

ok, err := client.Ping(context.Background())
if !ok || err != nil {
return nil, err
}

writeAPI := map[string]influxdbAPI.WriteAPI{
MeasurementLink: client.WriteAPI(config.Organization(), config.Bucket(MeasurementLink)),
MeasurementNode: client.WriteAPI(config.Organization(), config.Bucket(MeasurementNode)),
MeasurementDHCP: client.WriteAPI(config.Organization(), config.Bucket(MeasurementDHCP)),
MeasurementGlobal: client.WriteAPI(config.Organization(), config.Bucket(MeasurementGlobal)),
CounterMeasurementFirmware: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementFirmware)),
CounterMeasurementModel: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementModel)),
CounterMeasurementAutoupdater: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementAutoupdater)),
}

db := &Connection{
config: config,
client: client,
writeAPI: writeAPI,
}

return db, nil
}

// Close all connection and clean up
func (conn *Connection) Close() {
for _, api := range conn.writeAPI {
api.Flush()
}
conn.client.Close()
}
43 changes: 43 additions & 0 deletions database/influxdb2/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package influxdb

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestConnect(t *testing.T) {
assert := assert.New(t)

conn, err := Connect(map[string]interface{}{
"address": "",
"token": "",
"bucket_default": "all",
})
assert.Nil(conn)
assert.Error(err)

conn, err = Connect(map[string]interface{}{
"address": "http://localhost",
"token": "",
"bucket_default": "all",
})
assert.Nil(conn)
assert.Error(err)

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer srv.Close()

conn, err = Connect(map[string]interface{}{
"address": srv.URL,
"token": "atoken",
"bucket_default": "all",
})

assert.NotNil(conn)
assert.NoError(err)
}
79 changes: 79 additions & 0 deletions database/influxdb2/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package influxdb

import (
"time"

"github.com/FreifunkBremen/yanic/runtime"

"github.com/bdlm/log"
influxdb "github.com/influxdata/influxdb-client-go/v2"
)

// InsertGlobals implementation of database
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string, domain string) {
measurementGlobal := MeasurementGlobal
counterMeasurementModel := CounterMeasurementModel
counterMeasurementFirmware := CounterMeasurementFirmware
counterMeasurementAutoupdater := CounterMeasurementAutoupdater

if site != runtime.GLOBAL_SITE {
measurementGlobal += "_site"
counterMeasurementModel += "_site"
counterMeasurementFirmware += "_site"
counterMeasurementAutoupdater += "_site"
}
if domain != runtime.GLOBAL_DOMAIN {
measurementGlobal += "_domain"
counterMeasurementModel += "_domain"
counterMeasurementFirmware += "_domain"
counterMeasurementAutoupdater += "_domain"
}
p := influxdb.NewPoint(measurementGlobal,
conn.config.Tags(),
map[string]interface{}{
"nodes": stats.Nodes,
"gateways": stats.Gateways,
"clients.total": stats.Clients,
"clients.wifi": stats.ClientsWifi,
"clients.wifi24": stats.ClientsWifi24,
"clients.wifi5": stats.ClientsWifi5,
"clients.owe": stats.ClientsOWE,
"clients.owe24": stats.ClientsOWE24,
"clients.owe5": stats.ClientsOWE5,
},
time)

if site != runtime.GLOBAL_SITE {
p = p.AddTag("site", site)
}
if domain != runtime.GLOBAL_DOMAIN {
p = p.AddTag("domain", domain)
}
conn.writeAPI[MeasurementGlobal].WritePoint(p)

conn.addCounterMap(CounterMeasurementModel, counterMeasurementModel, stats.Models, time, site, domain)
conn.addCounterMap(CounterMeasurementFirmware, counterMeasurementFirmware, stats.Firmwares, time, site, domain)
conn.addCounterMap(CounterMeasurementAutoupdater, counterMeasurementAutoupdater, stats.Autoupdater, time, site, domain)
}

// Saves the values of a CounterMap in the database.
// The key are used as 'value' tag.
// The value is used as 'counter' field.
func (conn *Connection) addCounterMap(writeName, name string, m runtime.CounterMap, t time.Time, site string, domain string) {
writeAPI, ok := conn.writeAPI[writeName]
if !ok {
log.WithField("writeName", writeName).Panic("no writeAPI found for countermap")
}
for key, count := range m {
p := influxdb.NewPoint("stat",
conn.config.Tags(),
map[string]interface{}{
"count": count,
},
t).
AddTag("site", site).
AddTag("domain", domain).
AddTag("value", key)
writeAPI.WritePoint(p)
}
}
30 changes: 30 additions & 0 deletions database/influxdb2/link.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package influxdb

import (
"time"

"github.com/FreifunkBremen/yanic/runtime"

influxdb "github.com/influxdata/influxdb-client-go/v2"
)

// InsertLink adds a link data point
func (conn *Connection) InsertLink(link *runtime.Link, t time.Time) {
p := influxdb.NewPoint(MeasurementLink,
conn.config.Tags(),
map[string]interface{}{
"tq": link.TQ * 100,
},
t).
AddTag("source.id", link.SourceID).
AddTag("source.addr", link.SourceAddress).
AddTag("target.id", link.TargetID).
AddTag("target.addr", link.TargetAddress)
if link.SourceHostname != "" {
p.AddTag("source.hostname", link.SourceHostname)
}
if link.TargetHostname != "" {
p.AddTag("target.hostname", link.TargetHostname)
}
conn.writeAPI[MeasurementLink].WritePoint(p)
}
Loading
Loading