-
Notifications
You must be signed in to change notification settings - Fork 40
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
package influxdb | ||
Check failure on line 1 in database/influxdb2/database.go GitHub Actions / lint (1.21)
|
||
|
||
import ( | ||
"context" | ||
|
||
influxdb "github.com/influxdata/influxdb-client-go/v2" | ||
influxdbAPI "github.com/influxdata/influxdb-client-go/v2/api" | ||
|
||
"github.com/FreifunkBremen/yanic/database" | ||
"github.com/bdlm/log" | ||
) | ||
|
||
const ( | ||
MeasurementLink = "link" // Measurement for per-link statistics | ||
MeasurementNode = "node" // Measurement for per-node statistics | ||
MeasurementDHCP = "dhcp" // Measurement for DHCP server statistics | ||
MeasurementGlobal = "global" // Measurement for summarized global statistics | ||
CounterMeasurementFirmware = "firmware" // Measurement for firmware statistics | ||
CounterMeasurementModel = "model" // Measurement for model statistics | ||
CounterMeasurementAutoupdater = "autoupdater" // Measurement for autoupdater | ||
batchMaxSize = 1000 | ||
) | ||
|
||
type Connection struct { | ||
database.Connection | ||
config Config | ||
client influxdb.Client | ||
writeAPI map[string]influxdbAPI.WriteAPI | ||
} | ||
|
||
type Config map[string]interface{} | ||
|
||
func (c Config) Address() string { | ||
return c["address"].(string) | ||
} | ||
func (c Config) Token() string { | ||
if d, ok := c["token"]; ok { | ||
return d.(string) | ||
} | ||
log.Panic("influxdb2 - no token given") | ||
return "" | ||
} | ||
func (c Config) Organization() string { | ||
if d, ok := c["organization_id"]; ok { | ||
return d.(string) | ||
} | ||
return "" | ||
} | ||
func (c Config) Bucket(measurement string) string { | ||
logger := log.WithFields(map[string]interface{}{ | ||
"organization_id": c.Organization(), | ||
"address": c.Address(), | ||
"measurement": measurement, | ||
}) | ||
if d, ok := c["buckets"]; ok { | ||
dMap := d.(map[string]interface{}) | ||
if d, ok := dMap[measurement]; ok { | ||
bucket := d.(string) | ||
logger.WithField("bucket", bucket).Info("get bucket for writeapi") | ||
return bucket | ||
} | ||
if d, ok := c["bucket_default"]; ok { | ||
bucket := d.(string) | ||
logger.WithField("bucket", bucket).Info("get bucket for writeapi") | ||
return bucket | ||
} | ||
} | ||
if d, ok := c["bucket_default"]; ok { | ||
bucket := d.(string) | ||
logger.WithField("bucket", bucket).Info("get bucket for writeapi") | ||
return bucket | ||
} | ||
logger.Panic("no bucket found for measurement") | ||
return "" | ||
} | ||
func (c Config) Tags() map[string]string { | ||
if c["tags"] != nil { | ||
tags := make(map[string]string) | ||
for k, v := range c["tags"].(map[string]interface{}) { | ||
tags[k] = v.(string) | ||
} | ||
return tags | ||
} | ||
return nil | ||
} | ||
|
||
func init() { | ||
database.RegisterAdapter("influxdb2", Connect) | ||
} | ||
func Connect(configuration map[string]interface{}) (database.Connection, error) { | ||
config := Config(configuration) | ||
|
||
// Make client | ||
client := influxdb.NewClientWithOptions(config.Address(), config.Token(), influxdb.DefaultOptions().SetBatchSize(batchMaxSize)) | ||
|
||
ok, err := client.Ping(context.Background()) | ||
if !ok || err != nil { | ||
return nil, err | ||
} | ||
|
||
writeAPI := map[string]influxdbAPI.WriteAPI{ | ||
MeasurementLink: client.WriteAPI(config.Organization(), config.Bucket(MeasurementLink)), | ||
MeasurementNode: client.WriteAPI(config.Organization(), config.Bucket(MeasurementNode)), | ||
MeasurementDHCP: client.WriteAPI(config.Organization(), config.Bucket(MeasurementDHCP)), | ||
MeasurementGlobal: client.WriteAPI(config.Organization(), config.Bucket(MeasurementGlobal)), | ||
CounterMeasurementFirmware: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementFirmware)), | ||
CounterMeasurementModel: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementModel)), | ||
CounterMeasurementAutoupdater: client.WriteAPI(config.Organization(), config.Bucket(CounterMeasurementAutoupdater)), | ||
} | ||
|
||
db := &Connection{ | ||
config: config, | ||
client: client, | ||
writeAPI: writeAPI, | ||
} | ||
|
||
return db, nil | ||
} | ||
|
||
// Close all connection and clean up | ||
func (conn *Connection) Close() { | ||
for _, api := range conn.writeAPI { | ||
api.Flush() | ||
} | ||
conn.client.Close() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package influxdb | ||
|
||
import ( | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestConnect(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
conn, err := Connect(map[string]interface{}{ | ||
"address": "", | ||
"token": "", | ||
"bucket_default": "all", | ||
}) | ||
assert.Nil(conn) | ||
assert.Error(err) | ||
|
||
conn, err = Connect(map[string]interface{}{ | ||
"address": "http://localhost", | ||
"token": "", | ||
"bucket_default": "all", | ||
}) | ||
assert.Nil(conn) | ||
assert.Error(err) | ||
|
||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusNoContent) | ||
})) | ||
defer srv.Close() | ||
|
||
conn, err = Connect(map[string]interface{}{ | ||
"address": srv.URL, | ||
"token": "atoken", | ||
"bucket_default": "all", | ||
}) | ||
|
||
assert.NotNil(conn) | ||
assert.NoError(err) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package influxdb | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/FreifunkBremen/yanic/runtime" | ||
|
||
"github.com/bdlm/log" | ||
influxdb "github.com/influxdata/influxdb-client-go/v2" | ||
) | ||
|
||
// InsertGlobals implementation of database | ||
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time, site string, domain string) { | ||
measurementGlobal := MeasurementGlobal | ||
counterMeasurementModel := CounterMeasurementModel | ||
counterMeasurementFirmware := CounterMeasurementFirmware | ||
counterMeasurementAutoupdater := CounterMeasurementAutoupdater | ||
|
||
if site != runtime.GLOBAL_SITE { | ||
measurementGlobal += "_site" | ||
counterMeasurementModel += "_site" | ||
counterMeasurementFirmware += "_site" | ||
counterMeasurementAutoupdater += "_site" | ||
} | ||
if domain != runtime.GLOBAL_DOMAIN { | ||
measurementGlobal += "_domain" | ||
counterMeasurementModel += "_domain" | ||
counterMeasurementFirmware += "_domain" | ||
counterMeasurementAutoupdater += "_domain" | ||
} | ||
p := influxdb.NewPoint(measurementGlobal, | ||
conn.config.Tags(), | ||
map[string]interface{}{ | ||
"nodes": stats.Nodes, | ||
"gateways": stats.Gateways, | ||
"clients.total": stats.Clients, | ||
"clients.wifi": stats.ClientsWifi, | ||
"clients.wifi24": stats.ClientsWifi24, | ||
"clients.wifi5": stats.ClientsWifi5, | ||
"clients.owe": stats.ClientsOwe, | ||
Check failure on line 40 in database/influxdb2/global.go GitHub Actions / test (1.21)
Check failure on line 40 in database/influxdb2/global.go GitHub Actions / lint (1.21)
Check failure on line 40 in database/influxdb2/global.go GitHub Actions / lint (1.21)
Check failure on line 40 in database/influxdb2/global.go GitHub Actions / lint (1.21)
Check failure on line 40 in database/influxdb2/global.go GitHub Actions / lint (1.21)
|
||
"clients.owe24": stats.ClientsOwe24, | ||
Check failure on line 41 in database/influxdb2/global.go GitHub Actions / test (1.21)
Check failure on line 41 in database/influxdb2/global.go GitHub Actions / lint (1.21)
Check failure on line 41 in database/influxdb2/global.go GitHub Actions / lint (1.21)
Check failure on line 41 in database/influxdb2/global.go GitHub Actions / lint (1.21)
Check failure on line 41 in database/influxdb2/global.go GitHub Actions / lint (1.21)
|
||
"clients.owe5": stats.ClientsOwe5, | ||
Check failure on line 42 in database/influxdb2/global.go GitHub Actions / test (1.21)
Check failure on line 42 in database/influxdb2/global.go GitHub Actions / lint (1.21)
Check failure on line 42 in database/influxdb2/global.go GitHub Actions / lint (1.21)
|
||
}, | ||
time) | ||
|
||
if site != runtime.GLOBAL_SITE { | ||
p = p.AddTag("site", site) | ||
} | ||
if domain != runtime.GLOBAL_DOMAIN { | ||
p = p.AddTag("domain", domain) | ||
} | ||
conn.writeAPI[MeasurementGlobal].WritePoint(p) | ||
|
||
conn.addCounterMap(CounterMeasurementModel, counterMeasurementModel, stats.Models, time, site, domain) | ||
conn.addCounterMap(CounterMeasurementFirmware, counterMeasurementFirmware, stats.Firmwares, time, site, domain) | ||
conn.addCounterMap(CounterMeasurementAutoupdater, counterMeasurementAutoupdater, stats.Autoupdater, time, site, domain) | ||
} | ||
|
||
// Saves the values of a CounterMap in the database. | ||
// The key are used as 'value' tag. | ||
// The value is used as 'counter' field. | ||
func (conn *Connection) addCounterMap(writeName, name string, m runtime.CounterMap, t time.Time, site string, domain string) { | ||
writeAPI, ok := conn.writeAPI[writeName] | ||
if !ok { | ||
log.WithField("writeName", writeName).Panic("no writeAPI found for countermap") | ||
} | ||
for key, count := range m { | ||
p := influxdb.NewPoint("stat", | ||
conn.config.Tags(), | ||
map[string]interface{}{ | ||
"count": count, | ||
}, | ||
t). | ||
AddTag("site", site). | ||
AddTag("domain", domain). | ||
AddTag("value", key) | ||
writeAPI.WritePoint(p) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package influxdb | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/FreifunkBremen/yanic/runtime" | ||
|
||
influxdb "github.com/influxdata/influxdb-client-go/v2" | ||
) | ||
|
||
// InsertLink adds a link data point | ||
func (conn *Connection) InsertLink(link *runtime.Link, t time.Time) { | ||
p := influxdb.NewPoint(MeasurementLink, | ||
conn.config.Tags(), | ||
map[string]interface{}{ | ||
"tq": link.TQ * 100, | ||
}, | ||
t). | ||
AddTag("source.id", link.SourceID). | ||
AddTag("source.addr", link.SourceAddress). | ||
AddTag("target.id", link.TargetID). | ||
AddTag("target.addr", link.TargetAddress) | ||
if link.SourceHostname != "" { | ||
p.AddTag("source.hostname", link.SourceHostname) | ||
} | ||
if link.TargetHostname != "" { | ||
p.AddTag("target.hostname", link.TargetHostname) | ||
} | ||
conn.writeAPI[MeasurementLink].WritePoint(p) | ||
} |