feat: telegraf to merge tables with different indexes (#9241)

This commit is contained in:
helotpl 2021-09-28 23:24:08 +02:00 committed by GitHub
parent 6a3b27126a
commit 56398237c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 459 additions and 0 deletions

View File

@ -201,6 +201,113 @@ One [metric][] is created for each row of the SNMP table.
## Specifies if the value of given field should be snmptranslated
## by default no field values are translated
# translate = true
## Secondary index table allows to merge data from two tables with
## different index that this filed will be used to join them. There can
## be only one secondary index table.
# secondary_index_table = false
## This field is using secondary index, and will be later merged with
## primary index using SecondaryIndexTable. SecondaryIndexTable and
## SecondaryIndexUse are exclusive.
# secondary_index_use = false
## Controls if entries from secondary table should be added or not
## if joining index is present or not. I set to true, means that join
## is outer, and index is prepended with "Secondary." for missing values
## to avoid overlaping indexes from both tables. Can be set per field or
## globally with SecondaryIndexTable, global true overrides per field false.
# secondary_outer_join = false
```
##### Two Table Join
Snmp plugin can join two snmp tables that have different indexes. For this to work one table
should have translation field that return index of second table as value. Examples
of such fields are:
* Cisco portTable with translation field: `CISCO-STACK-MIB::portIfIndex`,
which value is IfIndex from ifTable
* Adva entityFacilityTable with translation field: `ADVA-FSPR7-MIB::entityFacilityOneIndex`,
which value is IfIndex from ifTable
* Cisco cpeExtPsePortTable with translation field: `CISCO-POWER-ETHERNET-EXT-MIB::cpeExtPsePortEntPhyIndex`,
which value is index from entPhysicalTable
Such field can be used to translate index to secondary table with `secondary_index_table = true`
and all fields from secondary table (with index pointed from translation field), should have added option
`secondary_index_use = true`. Telegraf cannot duplicate entries during join so translation
must be 1-to-1 (not 1-to-many). To add fields from secondary table with index that is not present
in translation table (outer join), there is a second option for translation index `secondary_outer_join = true`.
###### Example configuration for table joins
CISCO-POWER-ETHERNET-EXT-MIB table before join:
```
[[inputs.snmp.table]]
name = "ciscoPower"
index_as_tag = true
[[inputs.snmp.table.field]]
name = "PortPwrConsumption"
oid = "CISCO-POWER-ETHERNET-EXT-MIB::cpeExtPsePortPwrConsumption"
[[inputs.snmp.table.field]]
name = "EntPhyIndex"
oid = "CISCO-POWER-ETHERNET-EXT-MIB::cpeExtPsePortEntPhyIndex"
```
Partial result (removed agent_host and host columns from all following outputs in this section):
```
> ciscoPower,index=1.2 EntPhyIndex=1002i,PortPwrConsumption=6643i 1621460628000000000
> ciscoPower,index=1.6 EntPhyIndex=1006i,PortPwrConsumption=10287i 1621460628000000000
> ciscoPower,index=1.5 EntPhyIndex=1005i,PortPwrConsumption=8358i 1621460628000000000
```
Note here that EntPhyIndex column carries index from ENTITY-MIB table, config for it:
```
[[inputs.snmp.table]]
name = "entityTable"
index_as_tag = true
[[inputs.snmp.table.field]]
name = "EntPhysicalName"
oid = "ENTITY-MIB::entPhysicalName"
```
Partial result:
```
> entityTable,index=1006 EntPhysicalName="GigabitEthernet1/6" 1621460809000000000
> entityTable,index=1002 EntPhysicalName="GigabitEthernet1/2" 1621460809000000000
> entityTable,index=1005 EntPhysicalName="GigabitEthernet1/5" 1621460809000000000
```
Now, lets attempt to join these results into one table. EntPhyIndex matches index
from second table, and lets convert EntPhysicalName into tag, so second table will
only provide tags into result. Configuration:
```
[[inputs.snmp.table]]
name = "ciscoPowerEntity"
index_as_tag = true
[[inputs.snmp.table.field]]
name = "PortPwrConsumption"
oid = "CISCO-POWER-ETHERNET-EXT-MIB::cpeExtPsePortPwrConsumption"
[[inputs.snmp.table.field]]
name = "EntPhyIndex"
oid = "CISCO-POWER-ETHERNET-EXT-MIB::cpeExtPsePortEntPhyIndex"
secondary_index_table = true # enables joining
[[inputs.snmp.table.field]]
name = "EntPhysicalName"
oid = "ENTITY-MIB::entPhysicalName"
secondary_index_use = true # this tag is indexed from secondary table
is_tag = true
```
Result:
```
> ciscoPowerEntity,EntPhysicalName=GigabitEthernet1/2,index=1.2 EntPhyIndex=1002i,PortPwrConsumption=6643i 1621461148000000000
> ciscoPowerEntity,EntPhysicalName=GigabitEthernet1/6,index=1.6 EntPhyIndex=1006i,PortPwrConsumption=10287i 1621461148000000000
> ciscoPowerEntity,EntPhysicalName=GigabitEthernet1/5,index=1.5 EntPhyIndex=1005i,PortPwrConsumption=8358i 1621461148000000000
```
### Troubleshooting

View File

@ -187,11 +187,18 @@ func (t *Table) Init() error {
return err
}
secondaryIndexTablePresent := false
// initialize all the nested fields
for i := range t.Fields {
if err := t.Fields[i].init(); err != nil {
return fmt.Errorf("initializing field %s: %w", t.Fields[i].Name, err)
}
if t.Fields[i].SecondaryIndexTable {
if secondaryIndexTablePresent {
return fmt.Errorf("only one field can be SecondaryIndexTable")
}
secondaryIndexTablePresent = true
}
}
t.initialized = true
@ -252,6 +259,19 @@ type Field struct {
Conversion string
// Translate tells if the value of the field should be snmptranslated
Translate bool
// Secondary index table allows to merge data from two tables with different index
// that this filed will be used to join them. There can be only one secondary index table.
SecondaryIndexTable bool
// This field is using secondary index, and will be later merged with primary index
// using SecondaryIndexTable. SecondaryIndexTable and SecondaryIndexUse are exclusive.
SecondaryIndexUse bool
// Controls if entries from secondary table should be added or not if joining
// index is present or not. I set to true, means that join is outer, and
// index is prepended with "Secondary." for missing values to avoid overlaping
// indexes from both tables.
// Can be set per field or globally with SecondaryIndexTable, global true overrides
// per field false.
SecondaryOuterJoin bool
initialized bool
}
@ -278,6 +298,14 @@ func (f *Field) init() error {
//TODO use textual convention conversion from the MIB
}
if f.SecondaryIndexTable && f.SecondaryIndexUse {
return fmt.Errorf("SecondaryIndexTable and UseSecondaryIndex are exclusive")
}
if !f.SecondaryIndexTable && !f.SecondaryIndexUse && f.SecondaryOuterJoin {
return fmt.Errorf("SecondaryOuterJoin set to true, but field is not being used in join")
}
f.initialized = true
return nil
}
@ -414,6 +442,19 @@ func (s *Snmp) gatherTable(acc telegraf.Accumulator, gs snmpConnection, t Table,
func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) {
rows := map[string]RTableRow{}
//translation table for secondary index (when preforming join on two tables)
secIdxTab := make(map[string]string)
secGlobalOuterJoin := false
for i, f := range t.Fields {
if f.SecondaryIndexTable {
secGlobalOuterJoin = f.SecondaryOuterJoin
if i != 0 {
t.Fields[0], t.Fields[i] = t.Fields[i], t.Fields[0]
}
break
}
}
tagCount := 0
for _, f := range t.Fields {
if f.IsTag {
@ -519,6 +560,16 @@ func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) {
}
for idx, v := range ifv {
if f.SecondaryIndexUse {
if newidx, ok := secIdxTab[idx]; ok {
idx = newidx
} else {
if !secGlobalOuterJoin && !f.SecondaryOuterJoin {
continue
}
idx = ".Secondary" + idx
}
}
rtr, ok := rows[idx]
if !ok {
rtr = RTableRow{}
@ -543,6 +594,20 @@ func (t Table) Build(gs snmpConnection, walk bool) (*RTable, error) {
} else {
rtr.Fields[f.Name] = v
}
if f.SecondaryIndexTable {
//indexes are stored here with prepending "." so we need to add them if needed
var vss string
if ok {
vss = "." + vs
} else {
vss = fmt.Sprintf(".%v", v)
}
if idx[0] == '.' {
secIdxTab[vss] = idx
} else {
secIdxTab[vss] = "." + idx
}
}
}
}
}

View File

@ -81,6 +81,15 @@ var tsc = &testSNMPConnection{
".1.0.0.2.1.5.0.9.9": 11,
".1.0.0.2.1.5.1.9.9": 22,
".1.0.0.0.1.6.0": ".1.0.0.0.1.7",
".1.0.0.3.1.1.10": "instance",
".1.0.0.3.1.1.11": "instance2",
".1.0.0.3.1.1.12": "instance3",
".1.0.0.3.1.2.10": 10,
".1.0.0.3.1.2.11": 20,
".1.0.0.3.1.2.12": 20,
".1.0.0.3.1.3.10": 1,
".1.0.0.3.1.3.11": 2,
".1.0.0.3.1.3.12": 3,
},
}
@ -960,3 +969,242 @@ func TestSnmpTableCache_hit(t *testing.T) {
assert.Equal(t, []Field{{Name: "d"}}, fields)
assert.Equal(t, fmt.Errorf("e"), err)
}
func TestTableJoin_walk(t *testing.T) {
tbl := Table{
Name: "mytable",
IndexAsTag: true,
Fields: []Field{
{
Name: "myfield1",
Oid: ".1.0.0.3.1.1",
IsTag: true,
},
{
Name: "myfield2",
Oid: ".1.0.0.3.1.2",
},
{
Name: "myfield3",
Oid: ".1.0.0.3.1.3",
SecondaryIndexTable: true,
},
{
Name: "myfield4",
Oid: ".1.0.0.0.1.1",
SecondaryIndexUse: true,
IsTag: true,
},
{
Name: "myfield5",
Oid: ".1.0.0.0.1.2",
SecondaryIndexUse: true,
},
},
}
tb, err := tbl.Build(tsc, true)
require.NoError(t, err)
assert.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{
Tags: map[string]string{
"myfield1": "instance",
"myfield4": "bar",
"index": "10",
},
Fields: map[string]interface{}{
"myfield2": 10,
"myfield3": 1,
"myfield5": 2,
},
}
rtr2 := RTableRow{
Tags: map[string]string{
"myfield1": "instance2",
"index": "11",
},
Fields: map[string]interface{}{
"myfield2": 20,
"myfield3": 2,
"myfield5": 0,
},
}
rtr3 := RTableRow{
Tags: map[string]string{
"myfield1": "instance3",
"index": "12",
},
Fields: map[string]interface{}{
"myfield2": 20,
"myfield3": 3,
},
}
assert.Len(t, tb.Rows, 3)
assert.Contains(t, tb.Rows, rtr1)
assert.Contains(t, tb.Rows, rtr2)
assert.Contains(t, tb.Rows, rtr3)
}
func TestTableOuterJoin_walk(t *testing.T) {
tbl := Table{
Name: "mytable",
IndexAsTag: true,
Fields: []Field{
{
Name: "myfield1",
Oid: ".1.0.0.3.1.1",
IsTag: true,
},
{
Name: "myfield2",
Oid: ".1.0.0.3.1.2",
},
{
Name: "myfield3",
Oid: ".1.0.0.3.1.3",
SecondaryIndexTable: true,
SecondaryOuterJoin: true,
},
{
Name: "myfield4",
Oid: ".1.0.0.0.1.1",
SecondaryIndexUse: true,
IsTag: true,
},
{
Name: "myfield5",
Oid: ".1.0.0.0.1.2",
SecondaryIndexUse: true,
},
},
}
tb, err := tbl.Build(tsc, true)
require.NoError(t, err)
assert.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{
Tags: map[string]string{
"myfield1": "instance",
"myfield4": "bar",
"index": "10",
},
Fields: map[string]interface{}{
"myfield2": 10,
"myfield3": 1,
"myfield5": 2,
},
}
rtr2 := RTableRow{
Tags: map[string]string{
"myfield1": "instance2",
"index": "11",
},
Fields: map[string]interface{}{
"myfield2": 20,
"myfield3": 2,
"myfield5": 0,
},
}
rtr3 := RTableRow{
Tags: map[string]string{
"myfield1": "instance3",
"index": "12",
},
Fields: map[string]interface{}{
"myfield2": 20,
"myfield3": 3,
},
}
rtr4 := RTableRow{
Tags: map[string]string{
"index": "Secondary.0",
"myfield4": "foo",
},
Fields: map[string]interface{}{
"myfield5": 1,
},
}
assert.Len(t, tb.Rows, 4)
assert.Contains(t, tb.Rows, rtr1)
assert.Contains(t, tb.Rows, rtr2)
assert.Contains(t, tb.Rows, rtr3)
assert.Contains(t, tb.Rows, rtr4)
}
func TestTableJoinNoIndexAsTag_walk(t *testing.T) {
tbl := Table{
Name: "mytable",
IndexAsTag: false,
Fields: []Field{
{
Name: "myfield1",
Oid: ".1.0.0.3.1.1",
IsTag: true,
},
{
Name: "myfield2",
Oid: ".1.0.0.3.1.2",
},
{
Name: "myfield3",
Oid: ".1.0.0.3.1.3",
SecondaryIndexTable: true,
},
{
Name: "myfield4",
Oid: ".1.0.0.0.1.1",
SecondaryIndexUse: true,
IsTag: true,
},
{
Name: "myfield5",
Oid: ".1.0.0.0.1.2",
SecondaryIndexUse: true,
},
},
}
tb, err := tbl.Build(tsc, true)
require.NoError(t, err)
assert.Equal(t, tb.Name, "mytable")
rtr1 := RTableRow{
Tags: map[string]string{
"myfield1": "instance",
"myfield4": "bar",
//"index": "10",
},
Fields: map[string]interface{}{
"myfield2": 10,
"myfield3": 1,
"myfield5": 2,
},
}
rtr2 := RTableRow{
Tags: map[string]string{
"myfield1": "instance2",
//"index": "11",
},
Fields: map[string]interface{}{
"myfield2": 20,
"myfield3": 2,
"myfield5": 0,
},
}
rtr3 := RTableRow{
Tags: map[string]string{
"myfield1": "instance3",
//"index": "12",
},
Fields: map[string]interface{}{
"myfield2": 20,
"myfield3": 3,
},
}
assert.Len(t, tb.Rows, 3)
assert.Contains(t, tb.Rows, rtr1)
assert.Contains(t, tb.Rows, rtr2)
assert.Contains(t, tb.Rows, rtr3)
}

View File

@ -55,4 +55,43 @@ hostname OBJECT-TYPE
STATUS current
::= { testOID 1 1 }
testSecondaryTable OBJECT-TYPE
SYNTAX SEQUENCE OF testSecondaryTableEntry
MAX-ACCESS not-accessible
STATUS current
::= { testOID 3 }
testSecondaryTableEntry OBJECT-TYPE
SYNTAX TestSecondaryTableEntry
MAX-ACCESS not-accessible
STATUS current
INDEX {
instance
}
::= { testSecondaryTable 1 }
TestSecondaryTableEntry ::=
SEQUENCE {
instance OCTET STRING,
connections INTEGER,
testTableIndex INTEGER,
}
instance OBJECT-TYPE
SYNTAX OCTET STRING
MAX-ACCESS read-only
STATUS current
::= { testSecondaryTableEntry 1 }
connections OBJECT-TYPE
SYNTAX OCTET STRING
MAX-ACCESS read-only
STATUS current
::= { testSecondaryTableEntry 2 }
testTableIndex OBJECT-TYPE
SYNTAX OCTET STRING
MAX-ACCESS read-only
STATUS current
::= { testSecondaryTableEntry 3 }
END