mirror of
https://github.com/sascha-hemi/spaceDevices.git
synced 2026-03-21 05:06:36 +01:00
changes project layout
This commit is contained in:
204
internal/mqtt/deviceData.go
Normal file
204
internal/mqtt/deviceData.go
Normal file
@@ -0,0 +1,204 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ktt-ol/spaceDevices/internal/conf"
|
||||
"github.com/ktt-ol/spaceDevices/internal/db"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"bytes"
|
||||
"sort"
|
||||
"strings"
|
||||
"github.com/ktt-ol/spaceDevices/pkg/structs"
|
||||
)
|
||||
|
||||
var ignoredVisibility = [...]db.Visibility{db.VisibilityCriticalInfrastructure, db.VisibilityImportantInfrastructure,
|
||||
db.VisibilityInfrastructure, db.VisibilityDeprecatedInfrastructure, db.VisibilityUserInfrastructure}
|
||||
|
||||
var ddLogger = logrus.WithField("where", "deviceData")
|
||||
|
||||
type devicesEntry struct {
|
||||
hideName bool
|
||||
showDevices bool
|
||||
devices []structs.Devices
|
||||
}
|
||||
|
||||
type DeviceData struct {
|
||||
locations []conf.Location
|
||||
mqttHandler *MqttHandler
|
||||
masterDb db.MasterDb
|
||||
userDb db.UserDb
|
||||
wifiSessionList []structs.WifiSession
|
||||
|
||||
lastSentHash []byte
|
||||
|
||||
// more to come, e.g. LanSessions
|
||||
}
|
||||
|
||||
func NewDeviceData(locations []conf.Location, mqttHandler *MqttHandler, masterDb db.MasterDb, userDb db.UserDb) *DeviceData {
|
||||
dd := DeviceData{locations: locations, mqttHandler: mqttHandler, masterDb: masterDb, userDb: userDb}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
data := <-mqttHandler.GetNewDataChannel()
|
||||
dd.newData(data)
|
||||
}
|
||||
}()
|
||||
|
||||
return &dd
|
||||
}
|
||||
|
||||
func (d *DeviceData) newData(data []byte) {
|
||||
sessionsList, peopleAndDevices, ok := d.parseWifiSessions(data)
|
||||
if ok {
|
||||
d.wifiSessionList = sessionsList
|
||||
if ddLogger.Logger.Level >= logrus.DebugLevel {
|
||||
peopleList := make([]string, 0, len(peopleAndDevices.People))
|
||||
for _, person := range peopleAndDevices.People {
|
||||
if len(person.Name) == 0 {
|
||||
continue
|
||||
}
|
||||
personStr := person.Name + " ["
|
||||
for _, device := range person.Devices {
|
||||
personStr += device.Name + ","
|
||||
}
|
||||
personStr += "]"
|
||||
peopleList = append(peopleList, personStr)
|
||||
}
|
||||
sort.Strings(peopleList)
|
||||
ddLogger.Debugf("PeopleCount: %d, DeviceCount: %d, UnknownDevicesCount: %d, Persons: %s",
|
||||
peopleAndDevices.PeopleCount, peopleAndDevices.DeviceCount, peopleAndDevices.UnknownDevicesCount, strings.Join(peopleList, "; "))
|
||||
}
|
||||
h := md5.New()
|
||||
s := fmt.Sprintf("%v", peopleAndDevices)
|
||||
hash := h.Sum([]byte(s))
|
||||
if bytes.Equal(hash, d.lastSentHash) {
|
||||
ddLogger.Debug("Nothing changed in people count, skipping mqtt")
|
||||
} else {
|
||||
d.mqttHandler.SendPeopleAndDevices(peopleAndDevices)
|
||||
d.lastSentHash = hash
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DeviceData) GetByIp(ip string) (structs.WifiSession, bool) {
|
||||
for _, v := range d.wifiSessionList {
|
||||
if v.Ip == ip {
|
||||
return v, true
|
||||
}
|
||||
}
|
||||
|
||||
return structs.WifiSession{}, false
|
||||
}
|
||||
|
||||
func (d *DeviceData) parseWifiSessions(rawData []byte) (sessionsList []structs.WifiSession, peopleAndDevices structs.PeopleAndDevices, success bool) {
|
||||
var sessionData map[string]structs.WifiSession
|
||||
if err := json.Unmarshal(rawData, &sessionData); err != nil {
|
||||
ddLogger.WithFields(logrus.Fields{
|
||||
"rawData": string(rawData),
|
||||
"error": err,
|
||||
}).Error("Unable to unmarshal wifi session json.")
|
||||
return
|
||||
}
|
||||
|
||||
username2DevicesMap := make(map[string]*devicesEntry)
|
||||
SESSION_LOOP:
|
||||
for _, wifiSession := range sessionData {
|
||||
sessionsList = append(sessionsList, wifiSession)
|
||||
|
||||
peopleAndDevices.DeviceCount++
|
||||
var userInfo db.UserDbEntry
|
||||
masterDbEntry, ok := d.masterDb.Get(wifiSession.Mac)
|
||||
if ok {
|
||||
userInfo = masterDbEntry.UserDbEntry
|
||||
} else {
|
||||
userInfo, ok = d.userDb.Get(wifiSession.Mac)
|
||||
if !ok {
|
||||
// nothing found for this mac
|
||||
peopleAndDevices.UnknownDevicesCount++
|
||||
continue
|
||||
}
|
||||
}
|
||||
for _, v := range ignoredVisibility {
|
||||
if v == userInfo.Visibility {
|
||||
continue SESSION_LOOP
|
||||
}
|
||||
}
|
||||
|
||||
entry, ok := username2DevicesMap[userInfo.Name]
|
||||
if !ok {
|
||||
entry = &devicesEntry{}
|
||||
username2DevicesMap[userInfo.Name] = entry
|
||||
}
|
||||
|
||||
device := structs.Devices{Name: userInfo.DeviceName, Location: d.findLocation(wifiSession.AP)}
|
||||
entry.devices = append(entry.devices, device)
|
||||
|
||||
if userInfo.Visibility == db.VisibilityIgnore {
|
||||
entry.hideName = true
|
||||
continue
|
||||
}
|
||||
|
||||
if len(entry.devices) == 1 {
|
||||
peopleAndDevices.PeopleCount++
|
||||
}
|
||||
|
||||
if userInfo.Visibility == db.VisibilityAnon {
|
||||
entry.hideName = true
|
||||
continue
|
||||
}
|
||||
|
||||
if userInfo.Visibility == db.VisibilityUser {
|
||||
entry.showDevices = false
|
||||
continue
|
||||
}
|
||||
|
||||
if userInfo.Visibility == db.VisibilityAll {
|
||||
entry.showDevices = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
peopleAndDevices.People = make([]structs.Person, 0, 10)
|
||||
for username, devicesEntry := range username2DevicesMap {
|
||||
if devicesEntry.hideName {
|
||||
continue
|
||||
}
|
||||
|
||||
var person structs.Person
|
||||
if devicesEntry.showDevices {
|
||||
person = structs.Person{Name: username, Devices: devicesEntry.devices}
|
||||
sort.Sort(structs.DevicesSorter(person.Devices))
|
||||
} else {
|
||||
person = structs.Person{Name: username}
|
||||
}
|
||||
peopleAndDevices.People = append(peopleAndDevices.People, person)
|
||||
}
|
||||
sort.Sort(structs.PersonSorter(peopleAndDevices.People))
|
||||
|
||||
success = true
|
||||
return
|
||||
}
|
||||
|
||||
func (d *DeviceData) findLocation(apID int) string {
|
||||
|
||||
for _, location := range d.locations {
|
||||
for _, id := range location.Ids {
|
||||
if id == apID {
|
||||
return location.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func logParseError(field string, data interface{}) {
|
||||
ddLogger.WithFields(logrus.Fields{
|
||||
"field": field,
|
||||
"data": data,
|
||||
}).Error("Parse error for field.")
|
||||
}
|
||||
280
internal/mqtt/deviceData_test.go
Normal file
280
internal/mqtt/deviceData_test.go
Normal file
@@ -0,0 +1,280 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/ktt-ol/spaceDevices/internal/conf"
|
||||
|
||||
"github.com/ktt-ol/spaceDevices/internal/db"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_parseWifiSessions(t *testing.T) {
|
||||
const testData = `
|
||||
{
|
||||
"38126": {
|
||||
"last-auth": 1509210709,
|
||||
"vlan": "default",
|
||||
"stats": {
|
||||
"rx-multicast-pkts": 499,
|
||||
"rx-unicast-pkts": 1817,
|
||||
"tx-unicast-pkts": 734,
|
||||
"rx-unicast-bytes": 156208,
|
||||
"tx-unicast-bytes": 272461,
|
||||
"rx-multicast-bytes": 76808
|
||||
},
|
||||
"ssid": "mainframe",
|
||||
"ip": "192.168.2.127",
|
||||
"hostname": "-",
|
||||
"last-snr": 40,
|
||||
"last-rate-mbits": "24",
|
||||
"ap": 2,
|
||||
"mac": "2c:0e:3d:aa:aa:aa",
|
||||
"radio": 2,
|
||||
"userinfo": null,
|
||||
"session-start": 1509210709,
|
||||
"last-rssi-dbm": -55,
|
||||
"last-activity": 1509211581
|
||||
},
|
||||
"38134": {
|
||||
"last-auth": 1509211121,
|
||||
"vlan": "default",
|
||||
"stats": {
|
||||
"rx-multicast-pkts": 0,
|
||||
"rx-unicast-pkts": 292,
|
||||
"tx-unicast-pkts": 654,
|
||||
"rx-unicast-bytes": 20510,
|
||||
"tx-unicast-bytes": 278565,
|
||||
"rx-multicast-bytes": 0
|
||||
},
|
||||
"ssid": "mainframe",
|
||||
"ip": "192.168.2.179",
|
||||
"hostname": "-",
|
||||
"last-snr": 47,
|
||||
"last-rate-mbits": "6",
|
||||
"ap": 1,
|
||||
"mac": "10:68:3f:bb:bb:bb",
|
||||
"radio": 2,
|
||||
"userinfo": {
|
||||
"name": "Holger",
|
||||
"visibility": "show",
|
||||
"ts": 1427737817755
|
||||
},
|
||||
"session-start": 1509211121,
|
||||
"last-rssi-dbm": -48,
|
||||
"last-activity": 1509211584
|
||||
},
|
||||
"38135": {
|
||||
"last-auth": 1509211163,
|
||||
"vlan": "default",
|
||||
"stats": {
|
||||
"rx-multicast-pkts": 114,
|
||||
"rx-unicast-pkts": 8119,
|
||||
"tx-unicast-pkts": 12440,
|
||||
"rx-unicast-bytes": 1093407,
|
||||
"tx-unicast-bytes": 15083985,
|
||||
"rx-multicast-bytes": 20379
|
||||
},
|
||||
"ssid": "mainframe",
|
||||
"ip": "192.168.2.35",
|
||||
"hostname": "happle",
|
||||
"last-snr": 39,
|
||||
"last-rate-mbits": "24",
|
||||
"ap": 1,
|
||||
"mac": "20:c9:d0:cc:cc:cc",
|
||||
"radio": 2,
|
||||
"userinfo": {
|
||||
"name": "Holger",
|
||||
"visibility": "show",
|
||||
"ts": 1438474581580
|
||||
},
|
||||
"session-start": 1509211163,
|
||||
"last-rssi-dbm": -56,
|
||||
"last-activity": 1509211584
|
||||
},
|
||||
"38137": {
|
||||
"last-auth": 1509211199,
|
||||
"vlan": "FreiFunk",
|
||||
"stats": {
|
||||
"rx-multicast-pkts": 14,
|
||||
"rx-unicast-pkts": 931,
|
||||
"tx-unicast-pkts": 615,
|
||||
"rx-unicast-bytes": 70172,
|
||||
"tx-unicast-bytes": 265390,
|
||||
"rx-multicast-bytes": 1574
|
||||
},
|
||||
"ssid": "nordwest.freifunk.net",
|
||||
"ip": "10.18.159.6",
|
||||
"hostname": "iPhonevineSager",
|
||||
"last-snr": 13,
|
||||
"last-rate-mbits": "2",
|
||||
"ap": 1,
|
||||
"mac": "b8:53:ac:dd:dd:dd",
|
||||
"radio": 1,
|
||||
"userinfo": null,
|
||||
"session-start": 1509211199,
|
||||
"last-rssi-dbm": -82,
|
||||
"last-activity": 1509211584
|
||||
}
|
||||
}
|
||||
`
|
||||
assert := assert.New(t)
|
||||
|
||||
masterDb := &masterDbTest{}
|
||||
userDb := &userDbTest{}
|
||||
dd := DeviceData{masterDb: masterDb, userDb: userDb}
|
||||
|
||||
sessions, _, ok := dd.parseWifiSessions([]byte(testData))
|
||||
assert.Equal(len(sessions), 4)
|
||||
assert.True(ok)
|
||||
|
||||
mustContain := [4]bool{false, false, false, false}
|
||||
for _, v := range sessions {
|
||||
mustContain[0] = mustContain[0] || (v.Ip == "192.168.2.127" && v.Mac == "2c:0e:3d:aa:aa:aa" && v.Vlan == "default" && v.AP == 2)
|
||||
mustContain[1] = mustContain[1] || (v.Ip == "192.168.2.179" && v.Mac == "10:68:3f:bb:bb:bb" && v.Vlan == "default" && v.AP == 1)
|
||||
mustContain[2] = mustContain[2] || (v.Ip == "192.168.2.35" && v.Mac == "20:c9:d0:cc:cc:cc" && v.Vlan == "default" && v.AP == 1)
|
||||
mustContain[3] = mustContain[3] || (v.Ip == "10.18.159.6" && v.Mac == "b8:53:ac:dd:dd:dd" && v.Vlan == "FreiFunk" && v.AP == 1)
|
||||
}
|
||||
for _, v := range mustContain {
|
||||
assert.True(v)
|
||||
}
|
||||
|
||||
// don't fail for garbage
|
||||
sessions, _, ok = dd.parseWifiSessions([]byte("{ totally invalid json }"))
|
||||
assert.False(ok)
|
||||
assert.Equal(len(sessions), 0)
|
||||
}
|
||||
|
||||
func Test_peopleCalculation(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
masterMap := make(map[string]db.MasterDbEntry)
|
||||
masterDb := &masterDbTest{masterMap: masterMap}
|
||||
userMap := make(map[string]db.UserDbEntry)
|
||||
userDb := &userDbTest{userMap}
|
||||
locations := []conf.Location{conf.Location{Name: "Bar", Ids: []int{1, 3}}}
|
||||
dd := DeviceData{locations: locations, masterDb: masterDb, userDb: userDb}
|
||||
|
||||
testData := newSessionTestData(stt("1", "01"), stt("2", "02"), stt("3", "03"), stt("4", "04"), stt("5", "05"))
|
||||
_, peopleAndDevices, _ := dd.parseWifiSessions(testData)
|
||||
assertPeopleAndDevices(assert, 0, 0, 5, 5, peopleAndDevices)
|
||||
|
||||
userMap["00:00:00:00:00:01"] = db.UserDbEntry{Name: "holger", DeviceName: "handy", Visibility: db.VisibilityUser}
|
||||
_, peopleAndDevices, _ = dd.parseWifiSessions(testData)
|
||||
assertPeopleAndDevices(assert, 1, 1, 5, 4, peopleAndDevices)
|
||||
|
||||
userMap["00:00:00:00:00:02"] = db.UserDbEntry{Name: "hans", DeviceName: "", Visibility: db.VisibilityAnon}
|
||||
_, peopleAndDevices, _ = dd.parseWifiSessions(testData)
|
||||
assertPeopleAndDevices(assert, 1, 2, 5, 3, peopleAndDevices)
|
||||
|
||||
userMap["00:00:00:00:00:03"] = db.UserDbEntry{Name: "herman", DeviceName: "", Visibility: db.VisibilityIgnore}
|
||||
_, peopleAndDevices, _ = dd.parseWifiSessions(testData)
|
||||
assertPeopleAndDevices(assert, 1, 2, 5, 2, peopleAndDevices)
|
||||
|
||||
userMap["00:00:00:00:00:04"] = db.UserDbEntry{Name: "olaf", DeviceName: "iphone", Visibility: db.VisibilityAll}
|
||||
_, peopleAndDevices, _ = dd.parseWifiSessions(testData)
|
||||
assertPeopleAndDevices(assert, 2, 3, 5, 1, peopleAndDevices)
|
||||
for _, p := range peopleAndDevices.People {
|
||||
if p.Name == "olaf" {
|
||||
assert.Equal("iphone", p.Devices[0].Name)
|
||||
assert.Equal("Bar", p.Devices[0].Location)
|
||||
} else {
|
||||
assert.Equal(0, len(p.Devices))
|
||||
}
|
||||
}
|
||||
entry := db.MasterDbEntry{}
|
||||
entry.Name = "pc1"
|
||||
entry.Visibility = db.VisibilityCriticalInfrastructure
|
||||
masterMap["00:00:00:00:00:05"] = entry
|
||||
_, peopleAndDevices, _ = dd.parseWifiSessions(testData)
|
||||
assertPeopleAndDevices(assert, 2, 3, 5, 0, peopleAndDevices)
|
||||
|
||||
// add a second device for olaf
|
||||
testData = newSessionTestData(stt("1", "01"), stt("2", "02"), stt("3", "03"), stt("4", "04"), stt("5", "05"), stt("6", "06"))
|
||||
userMap["00:00:00:00:00:06"] = db.UserDbEntry{Name: "olaf", DeviceName: "mac", Visibility: db.VisibilityAll}
|
||||
_, peopleAndDevices, _ = dd.parseWifiSessions(testData)
|
||||
fmt.Printf("peopleAndDevices: %+v\n", peopleAndDevices)
|
||||
assertPeopleAndDevices(assert, 2, 3, 6, 0, peopleAndDevices)
|
||||
for _, p := range peopleAndDevices.People {
|
||||
if p.Name == "olaf" {
|
||||
assert.Equal(2, len(p.Devices))
|
||||
} else {
|
||||
assert.Equal(0, len(p.Devices))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func assertPeopleAndDevices(assert *assert.Assertions, peopleArrayCount int, peopleCount uint, deviceCount uint, unknownDevicesCount uint, test PeopleAndDevices) {
|
||||
assert.Equal(peopleArrayCount, len(test.People), "len(People)")
|
||||
assert.Equal(peopleCount, test.PeopleCount, "peopleCount")
|
||||
assert.Equal(deviceCount, test.DeviceCount, "deviceCount")
|
||||
assert.Equal(unknownDevicesCount, test.UnknownDevicesCount, "unknownDevicesCount")
|
||||
}
|
||||
|
||||
func Test_peopleNeverNil(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
dd := DeviceData{}
|
||||
|
||||
testData := newSessionTestData()
|
||||
_, peopleAndDevices, success := dd.parseWifiSessions(testData)
|
||||
assert.True(success)
|
||||
assert.NotNil(peopleAndDevices.People)
|
||||
}
|
||||
|
||||
/****************************************/
|
||||
/* helper to create test data and mocks */
|
||||
/****************************************/
|
||||
|
||||
type sessionTestType struct {
|
||||
Vlan string `json:"vlan"`
|
||||
IP string `json:"ip"`
|
||||
Ap float64 `json:"ap"`
|
||||
Mac string `json:"mac"`
|
||||
}
|
||||
|
||||
type userDbTest struct {
|
||||
userMap map[string]db.UserDbEntry
|
||||
}
|
||||
|
||||
func (db *userDbTest) Get(mac string) (db.UserDbEntry, bool) {
|
||||
value, ok := db.userMap[mac]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
func (db *userDbTest) Set(mac string, info db.UserDbEntry) {
|
||||
db.userMap[mac] = info
|
||||
}
|
||||
|
||||
func (db *userDbTest) Delete(mac string) {
|
||||
delete(db.userMap, mac)
|
||||
}
|
||||
|
||||
type masterDbTest struct {
|
||||
masterMap map[string]db.MasterDbEntry
|
||||
}
|
||||
|
||||
func (db *masterDbTest) Get(mac string) (db.MasterDbEntry, bool) {
|
||||
value, ok := db.masterMap[mac]
|
||||
return value, ok
|
||||
}
|
||||
|
||||
func stt(lastIp string, lastMac string) sessionTestType {
|
||||
return sessionTestType{"vlan", "10.1.1." + lastIp, 1, "00:00:00:00:00:" + lastMac}
|
||||
}
|
||||
|
||||
func newSessionTestData(testData ...sessionTestType) []byte {
|
||||
sessionData := make(map[string]sessionTestType)
|
||||
|
||||
for index, val := range testData {
|
||||
sessionData[strconv.Itoa(index)] = val
|
||||
}
|
||||
|
||||
bytes, err := json.Marshal(sessionData)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return bytes
|
||||
}
|
||||
208
internal/mqtt/mqtt.go
Normal file
208
internal/mqtt/mqtt.go
Normal file
@@ -0,0 +1,208 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/ktt-ol/spaceDevices/internal/conf"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/ktt-ol/spaceDevices/pkg/structs"
|
||||
)
|
||||
|
||||
const CLIENT_ID = "spaceDevicesGo"
|
||||
|
||||
var mqttLogger = log.WithField("where", "mqtt")
|
||||
|
||||
type MqttHandler struct {
|
||||
client mqtt.Client
|
||||
newDataChan chan []byte
|
||||
sessionTopic string
|
||||
devicesTopic string
|
||||
watchDog *watchDog
|
||||
}
|
||||
|
||||
//func init() {
|
||||
// mqtt.ERROR.SetOutput(copyOfStdLogger(log.ErrorLevel).Writer())
|
||||
// mqtt.CRITICAL.SetOutput(copyOfStdLogger(log.ErrorLevel).Writer())
|
||||
// mqtt.WARN.SetOutput(copyOfStdLogger(log.WarnLevel).Writer())
|
||||
// mqtt.DEBUG.SetOutput(copyOfStdLogger(log.DebugLevel).Writer())
|
||||
//}
|
||||
//func copyOfStdLogger(level log.Level) *log.Logger {
|
||||
// logger := log.New()
|
||||
// logger.Formatter = log.StandardLogger().Formatter
|
||||
// logger.Out = log.StandardLogger().Out
|
||||
// logger.SetLevel(level)
|
||||
// return logger
|
||||
//}
|
||||
|
||||
func EnableMqttDebugLogging() {
|
||||
stdLogWriter := log.StandardLogger().Writer()
|
||||
mqtt.ERROR.SetOutput(stdLogWriter)
|
||||
mqtt.CRITICAL.SetOutput(stdLogWriter)
|
||||
mqtt.WARN.SetOutput(stdLogWriter)
|
||||
mqtt.DEBUG.SetOutput(stdLogWriter)
|
||||
}
|
||||
|
||||
func NewMqttHandler(conf conf.MqttConf) *MqttHandler {
|
||||
opts := mqtt.NewClientOptions()
|
||||
|
||||
opts.AddBroker(conf.Url)
|
||||
|
||||
if conf.Username != "" {
|
||||
opts.SetUsername(conf.Username)
|
||||
}
|
||||
if conf.Password != "" {
|
||||
opts.SetPassword(conf.Password)
|
||||
}
|
||||
|
||||
certs := defaultCertPool(conf.CertFile)
|
||||
tlsConf := &tls.Config{
|
||||
RootCAs: certs,
|
||||
}
|
||||
opts.SetTLSConfig(tlsConf)
|
||||
|
||||
opts.SetClientID(CLIENT_ID + GenerateRandomString(4))
|
||||
opts.SetAutoReconnect(true)
|
||||
opts.SetKeepAlive(10 * time.Second)
|
||||
opts.SetMaxReconnectInterval(5 * time.Minute)
|
||||
opts.SetWill(conf.DevicesTopic, emptyPeopleAndDevices(), 0, true)
|
||||
|
||||
handler := MqttHandler{newDataChan: make(chan []byte), devicesTopic: conf.DevicesTopic, sessionTopic: conf.SessionTopic}
|
||||
opts.SetOnConnectHandler(handler.onConnect)
|
||||
opts.SetConnectionLostHandler(handler.onConnectionLost)
|
||||
|
||||
handler.client = mqtt.NewClient(opts)
|
||||
if tok := handler.client.Connect(); tok.WaitTimeout(5*time.Second) && tok.Error() != nil {
|
||||
mqttLogger.WithError(tok.Error()).Fatal("Could not connect to mqtt server.")
|
||||
}
|
||||
|
||||
if conf.WatchDogTimeoutInMinutes > 0 {
|
||||
mqttLogger.Println("Enable mqtt watch dog, timeout in minutes is", conf.WatchDogTimeoutInMinutes)
|
||||
handler.watchDog = NewWatchDog(time.Duration(conf.WatchDogTimeoutInMinutes) * time.Minute)
|
||||
}
|
||||
|
||||
return &handler
|
||||
}
|
||||
|
||||
func (h *MqttHandler) GetNewDataChannel() chan []byte {
|
||||
return h.newDataChan
|
||||
}
|
||||
|
||||
func (h *MqttHandler) SendPeopleAndDevices(data structs.PeopleAndDevices) {
|
||||
bytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
mqttLogger.Errorln("Invalid people json", err)
|
||||
return
|
||||
}
|
||||
|
||||
mqttLogger.Infof("Sending PeopleAndDevices: %d, %d, %d, %d",
|
||||
data.PeopleCount, data.DeviceCount, data.UnknownDevicesCount, len(data.People))
|
||||
|
||||
token := h.client.Publish(h.devicesTopic, 0, true, string(bytes))
|
||||
ok := token.WaitTimeout(time.Duration(time.Second * 10))
|
||||
if !ok {
|
||||
mqttLogger.WithError(token.Error()).WithField("topic", h.devicesTopic).Warn("Error sending devices.")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MqttHandler) onConnect(client mqtt.Client) {
|
||||
mqttLogger.Info("connected")
|
||||
|
||||
err := subscribe(client, h.sessionTopic,
|
||||
func(client mqtt.Client, message mqtt.Message) {
|
||||
mqttLogger.Debug("new wifi sessions")
|
||||
if h.watchDog != nil {
|
||||
h.watchDog.Ping()
|
||||
}
|
||||
|
||||
/*
|
||||
mock := []byte(`{ "38134": {
|
||||
"last-auth": 1509211121,
|
||||
"vlan": "default",
|
||||
"stats": {
|
||||
"rx-multicast-pkts": 0,
|
||||
"rx-unicast-pkts": 292,
|
||||
"tx-unicast-pkts": 654,
|
||||
"rx-unicast-bytes": 20510,
|
||||
"tx-unicast-bytes": 278565,
|
||||
"rx-multicast-bytes": 0
|
||||
},
|
||||
"ssid": "mainframe",
|
||||
"ip": "::1",
|
||||
"hostname": "-",
|
||||
"last-snr": 47,
|
||||
"last-rate-mbits": "6",
|
||||
"ap": 1,
|
||||
"mac": "d4:38:9c:01:dd:03",
|
||||
"radio": 2,
|
||||
"userinfo": {
|
||||
"name": "Holger",
|
||||
"visibility": "show",
|
||||
"ts": 1427737817755
|
||||
},
|
||||
"session-start": 1509211121,
|
||||
"last-rssi-dbm": -48,
|
||||
"last-activity": 1509211584
|
||||
}}`)
|
||||
*/
|
||||
select {
|
||||
//case h.newDataChan <- mock:
|
||||
case h.newDataChan <- message.Payload():
|
||||
break
|
||||
default:
|
||||
mqttLogger.Println("No one receives the message.")
|
||||
}
|
||||
|
||||
})
|
||||
if err != nil {
|
||||
mqttLogger.WithField("topic", h.sessionTopic).WithError(err).Fatal("Could not subscribe.")
|
||||
}
|
||||
}
|
||||
|
||||
func (h *MqttHandler) onConnectionLost(client mqtt.Client, err error) {
|
||||
mqttLogger.WithError(err).Error("Connection lost.")
|
||||
}
|
||||
|
||||
func subscribe(client mqtt.Client, topic string, cb mqtt.MessageHandler) error {
|
||||
qos := 0
|
||||
tok := client.Subscribe(topic, byte(qos), cb)
|
||||
tok.WaitTimeout(5 * time.Second)
|
||||
return tok.Error()
|
||||
}
|
||||
|
||||
func defaultCertPool(certFile string) *x509.CertPool {
|
||||
if certFile == "" {
|
||||
mqttLogger.Debug("No certFile given, using system pool")
|
||||
pool, err := x509.SystemCertPool()
|
||||
if err != nil {
|
||||
mqttLogger.WithError(err).Fatal("Could not create system cert pool.")
|
||||
}
|
||||
return pool
|
||||
}
|
||||
|
||||
fileData, err := ioutil.ReadFile(certFile)
|
||||
if err != nil {
|
||||
mqttLogger.WithError(err).Fatal("Could not read given cert file.")
|
||||
}
|
||||
|
||||
certs := x509.NewCertPool()
|
||||
if !certs.AppendCertsFromPEM(fileData) {
|
||||
mqttLogger.Fatal("unable to add given certificate to CertPool")
|
||||
}
|
||||
|
||||
return certs
|
||||
}
|
||||
|
||||
func emptyPeopleAndDevices() string {
|
||||
pad := structs.PeopleAndDevices{People: []structs.Person{}}
|
||||
bytes, err := json.Marshal(pad)
|
||||
if err != nil {
|
||||
mqttLogger.WithError(err).Panic()
|
||||
}
|
||||
return string(bytes)
|
||||
}
|
||||
1
internal/mqtt/mqtt_test.go
Normal file
1
internal/mqtt/mqtt_test.go
Normal file
@@ -0,0 +1 @@
|
||||
package mqtt
|
||||
31
internal/mqtt/utils.go
Normal file
31
internal/mqtt/utils.go
Normal file
@@ -0,0 +1,31 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"github.com/sirupsen/logrus"
|
||||
"encoding/base64"
|
||||
)
|
||||
|
||||
// https://elithrar.github.io/article/generating-secure-random-numbers-crypto-rand/
|
||||
|
||||
// GenerateRandomBytes returns securely generated random bytes.
|
||||
// It will fail with a fatal log if the system's secure random
|
||||
// number generator fails to function correctly
|
||||
func GenerateRandomBytes(n int) []byte {
|
||||
b := make([]byte, n)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
// Note that err == nil only if we read len(b) bytes.
|
||||
logrus.Fatal("Could not read random bytes")
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
// GenerateRandomString returns a URL-safe, base64 encoded
|
||||
// securely generated random string.
|
||||
// It will fail with a fatal log if the system's secure random
|
||||
// number generator fails to function correctly
|
||||
func GenerateRandomString(s int) string {
|
||||
b := GenerateRandomBytes(s)
|
||||
return base64.URLEncoding.EncodeToString(b)
|
||||
}
|
||||
60
internal/mqtt/watchDog.go
Normal file
60
internal/mqtt/watchDog.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"time"
|
||||
"github.com/sirupsen/logrus"
|
||||
"os"
|
||||
)
|
||||
|
||||
type watchDog struct {
|
||||
timeout time.Duration
|
||||
ping chan struct{}
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
var wdLogger = logrus.WithField("where", "watchDog")
|
||||
|
||||
// NewWatchDog creates a new watch dog
|
||||
// timeout - after this amount of time without a ping, the program will be killed and returns 3
|
||||
func NewWatchDog(timeout time.Duration) *watchDog {
|
||||
wd := watchDog{
|
||||
timeout: timeout,
|
||||
ping: make(chan struct{}),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
|
||||
go wd.loop()
|
||||
|
||||
return &wd
|
||||
}
|
||||
|
||||
// Ping updates the watch dog timeout
|
||||
func (wd *watchDog) Ping() {
|
||||
wd.ping <- struct{}{}
|
||||
}
|
||||
|
||||
func (wd *watchDog) Stop() {
|
||||
wdLogger.Println("Stopping watch dog")
|
||||
wd.stop <- struct{}{}
|
||||
}
|
||||
|
||||
func (wd *watchDog) loop() {
|
||||
timer := time.NewTicker(60 * time.Second)
|
||||
lastKeepAlive := time.Now()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
if time.Since(lastKeepAlive) > wd.timeout {
|
||||
wdLogger.Errorf("Last keep alive (%s) is older than allowed timeout (%s). Exit!",
|
||||
lastKeepAlive, wd.timeout)
|
||||
os.Exit(3)
|
||||
}
|
||||
case <-wd.ping:
|
||||
lastKeepAlive = time.Now()
|
||||
case <-wd.stop:
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user