feat(inputs.internet_speed): Support multi-server test (#12797)

This commit is contained in:
r3inbowari 2023-04-03 16:06:27 +08:00 committed by GitHub
parent 6c840b781d
commit 789a49858e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 22 deletions

2
go.mod
View File

@ -150,7 +150,7 @@ require (
github.com/safchain/ethtool v0.2.0 github.com/safchain/ethtool v0.2.0
github.com/sensu/sensu-go/api/core/v2 v2.16.0 github.com/sensu/sensu-go/api/core/v2 v2.16.0
github.com/shirou/gopsutil/v3 v3.23.2 github.com/shirou/gopsutil/v3 v3.23.2
github.com/showwin/speedtest-go v1.4.2 github.com/showwin/speedtest-go v1.5.2
github.com/signalfx/golib/v3 v3.3.46 github.com/signalfx/golib/v3 v3.3.46
github.com/sirupsen/logrus v1.9.0 github.com/sirupsen/logrus v1.9.0
github.com/sleepinggenius2/gosmi v0.4.4 github.com/sleepinggenius2/gosmi v0.4.4

4
go.sum
View File

@ -2063,8 +2063,8 @@ github.com/shirou/gopsutil/v3 v3.23.2/go.mod h1:gv0aQw33GLo3pG8SiWKiQrbDzbRY1K80
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/showwin/speedtest-go v1.4.2 h1:3YjBajURQTJCv/rVwJsd5UtCYlaiqCihg5NhPxJapk8= github.com/showwin/speedtest-go v1.5.2 h1:drXsmaGC36VXi6biSZ+vyo/tYCkaoTU2mAF2b6wQmlk=
github.com/showwin/speedtest-go v1.4.2/go.mod h1:Y7c+pxzaNAlo4mYP+x83pnYY8IM3bkHGDhTdrgUnkNE= github.com/showwin/speedtest-go v1.5.2/go.mod h1:Y7c+pxzaNAlo4mYP+x83pnYY8IM3bkHGDhTdrgUnkNE=
github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk= github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk=
github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ= github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ=
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=

View File

@ -35,6 +35,18 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Caches the closest server location ## Caches the closest server location
# cache = false # cache = false
## Number of concurrent connections
## By default or set to zero, the number of CPU cores is used. Use this to
## reduce the impact on system performance or to increase the connections on
## faster connections to ensure the fastest speed.
# connections = 0
## Test mode
## By default, a single sever is used for testing. This may work for most,
## however, setting to "multi" will reach out to multiple servers in an
## attempt to get closer to ideal internet speeds.
# test_mode = "single"
## Server ID exclude filter ## Server ID exclude filter
## Allows the user to exclude or include specific server IDs received by ## Allows the user to exclude or include specific server IDs received by
## speedtest-go. Values in the exclude option will be skipped over. Values in ## speedtest-go. Values in the exclude option will be skipped over. Values in
@ -64,9 +76,10 @@ And the following tags:
| --------- | --------- | | --------- | --------- |
| Source | source | | Source | source |
| Server ID | server_id | | Server ID | server_id |
| Test Mode | test_mode |
## Example Output ## Example Output
```sh ```text
internet_speed,source=speedtest02.z4internet.com:8080,server_id=54619 download=318.37580265897725,upload=30.444407341274385,latency=37.73174,jitter=1.99810 1675458921000000000 internet_speed,source=speedtest02.z4internet.com:8080,server_id=54619,test_mode=single download=318.37580265897725,upload=30.444407341274385,latency=37.73174,jitter=1.99810 1675458921000000000
``` ```

View File

@ -2,15 +2,18 @@
package internet_speed package internet_speed
import ( import (
"context"
_ "embed" _ "embed"
"fmt" "fmt"
"math" "math"
"os"
"time" "time"
"github.com/showwin/speedtest-go/speedtest" "github.com/showwin/speedtest-go/speedtest"
"github.com/influxdata/telegraf" "github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/filter"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs"
) )
@ -24,20 +27,35 @@ type InternetSpeed struct {
EnableFileDownload bool `toml:"enable_file_download" deprecated:"1.25.0;use 'memory_saving_mode' instead"` EnableFileDownload bool `toml:"enable_file_download" deprecated:"1.25.0;use 'memory_saving_mode' instead"`
MemorySavingMode bool `toml:"memory_saving_mode"` MemorySavingMode bool `toml:"memory_saving_mode"`
Cache bool `toml:"cache"` Cache bool `toml:"cache"`
Connections int `toml:"connections"`
TestMode string `toml:"test_mode"`
Log telegraf.Logger `toml:"-"` Log telegraf.Logger `toml:"-"`
server *speedtest.Server server *speedtest.Server // The main(best) server
servers speedtest.Servers // Auxiliary servers
serverFilter filter.Filter serverFilter filter.Filter
} }
const measurement = "internet_speed" const (
measurement = "internet_speed"
testModeSingle = "single"
testModeMulti = "multi"
)
func (*InternetSpeed) SampleConfig() string { func (*InternetSpeed) SampleConfig() string {
return sampleConfig return sampleConfig
} }
func (is *InternetSpeed) Init() error { func (is *InternetSpeed) Init() error {
switch is.TestMode {
case testModeSingle, testModeMulti:
case "":
is.TestMode = testModeSingle
default:
return fmt.Errorf("unrecognized test mode: %q", is.TestMode)
}
is.MemorySavingMode = is.MemorySavingMode || is.EnableFileDownload is.MemorySavingMode = is.MemorySavingMode || is.EnableFileDownload
var err error var err error
@ -50,7 +68,9 @@ func (is *InternetSpeed) Init() error {
} }
func (is *InternetSpeed) Gather(acc telegraf.Accumulator) error { func (is *InternetSpeed) Gather(acc telegraf.Accumulator) error {
// if not caching, go find the closest server each time // If not caching, go find the closest server each time.
// We will find the best server as the main server. And
// the remaining servers will be auxiliary candidates.
if !is.Cache || is.server == nil { if !is.Cache || is.server == nil {
if err := is.findClosestServer(); err != nil { if err := is.findClosestServer(); err != nil {
return fmt.Errorf("unable to find closest server: %w", err) return fmt.Errorf("unable to find closest server: %w", err)
@ -61,13 +81,25 @@ func (is *InternetSpeed) Gather(acc telegraf.Accumulator) error {
if err != nil { if err != nil {
return fmt.Errorf("ping test failed: %w", err) return fmt.Errorf("ping test failed: %w", err)
} }
err = is.server.DownloadTest(is.MemorySavingMode)
if is.TestMode == testModeMulti {
err = is.server.MultiDownloadTestContext(context.Background(), is.servers)
if err != nil { if err != nil {
return fmt.Errorf("download test failed, try `memory_saving_mode = true` if this fails consistently: %w", err) return fmt.Errorf("download test failed: %w", err)
} }
err = is.server.UploadTest(is.MemorySavingMode) err = is.server.MultiUploadTestContext(context.Background(), is.servers)
if err != nil { if err != nil {
return fmt.Errorf("upload test failed failed, try `memory_saving_mode = true` if this fails consistently: %w", err) return fmt.Errorf("upload test failed failed: %w", err)
}
} else {
err = is.server.DownloadTest()
if err != nil {
return fmt.Errorf("download test failed: %w", err)
}
err = is.server.UploadTest()
if err != nil {
return fmt.Errorf("upload test failed failed: %w", err)
}
} }
fields := map[string]any{ fields := map[string]any{
@ -79,32 +111,42 @@ func (is *InternetSpeed) Gather(acc telegraf.Accumulator) error {
tags := map[string]string{ tags := map[string]string{
"server_id": is.server.ID, "server_id": is.server.ID,
"source": is.server.Host, "source": is.server.Host,
"test_mode": is.TestMode,
} }
// recycle the detailed data of each test to prevent data backlog // Recycle the history of each test to prevent data backlog.
is.server.Context.Reset() is.server.Context.Reset()
acc.AddFields(measurement, fields, tags) acc.AddFields(measurement, fields, tags)
return nil return nil
} }
func (is *InternetSpeed) findClosestServer() error { func (is *InternetSpeed) findClosestServer() error {
user, err := speedtest.FetchUserInfo() client := speedtest.New(speedtest.WithUserConfig(&speedtest.UserConfig{
UserAgent: internal.ProductToken(),
ICMP: os.Geteuid() == 0 || os.Geteuid() == -1,
SavingMode: is.MemorySavingMode,
}))
if is.Connections > 0 {
client.SetNThread(is.Connections)
}
user, err := client.FetchUserInfo()
if err != nil { if err != nil {
return fmt.Errorf("fetching user info failed: %w", err) return fmt.Errorf("fetching user info failed: %w", err)
} }
serverList, err := speedtest.FetchServers(user) is.servers, err = client.FetchServers(user)
if err != nil { if err != nil {
return fmt.Errorf("fetching server list failed: %w", err) return fmt.Errorf("fetching server list failed: %w", err)
} }
if len(serverList) < 1 { if len(is.servers) < 1 {
return fmt.Errorf("no servers found") return fmt.Errorf("no servers found")
} }
// return the first match or the server with the lowest latency // Return the first match or the server with the lowest latency
// when filter mismatch all servers. // when filter mismatch all servers.
var min int64 = math.MaxInt64 var min int64 = math.MaxInt64
selectIndex := -1 selectIndex := -1
for index, server := range serverList { for index, server := range is.servers {
if is.serverFilter.Match(server.ID) { if is.serverFilter.Match(server.ID) {
selectIndex = index selectIndex = index
break break
@ -118,7 +160,7 @@ func (is *InternetSpeed) findClosestServer() error {
} }
if selectIndex != -1 { if selectIndex != -1 {
is.server = serverList[selectIndex] is.server = is.servers[selectIndex]
is.Log.Debugf("using server %s in %s (%s)\n", is.server.ID, is.server.Name, is.server.Host) is.Log.Debugf("using server %s in %s (%s)\n", is.server.ID, is.server.Name, is.server.Host)
return nil return nil
} }

View File

@ -11,6 +11,18 @@
## Caches the closest server location ## Caches the closest server location
# cache = false # cache = false
## Number of concurrent connections
## By default or set to zero, the number of CPU cores is used. Use this to
## reduce the impact on system performance or to increase the connections on
## faster connections to ensure the fastest speed.
# connections = 0
## Test mode
## By default, a single sever is used for testing. This may work for most,
## however, setting to "multi" will reach out to multiple servers in an
## attempt to get closer to ideal internet speeds.
# test_mode = "single"
## Server ID exclude filter ## Server ID exclude filter
## Allows the user to exclude or include specific server IDs received by ## Allows the user to exclude or include specific server IDs received by
## speedtest-go. Values in the exclude option will be skipped over. Values in ## speedtest-go. Values in the exclude option will be skipped over. Values in