From 789a49858e22fd806df89daf30571f1361d0ce8e Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Mon, 3 Apr 2023 16:06:27 +0800 Subject: [PATCH] feat(inputs.internet_speed): Support multi-server test (#12797) --- go.mod | 2 +- go.sum | 4 +- plugins/inputs/internet_speed/README.md | 17 ++++- .../inputs/internet_speed/internet_speed.go | 76 ++++++++++++++----- plugins/inputs/internet_speed/sample.conf | 12 +++ 5 files changed, 89 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index 4e849d696..02d655d39 100644 --- a/go.mod +++ b/go.mod @@ -150,7 +150,7 @@ require ( github.com/safchain/ethtool v0.2.0 github.com/sensu/sensu-go/api/core/v2 v2.16.0 github.com/shirou/gopsutil/v3 v3.23.2 - github.com/showwin/speedtest-go v1.4.2 + github.com/showwin/speedtest-go v1.5.2 github.com/signalfx/golib/v3 v3.3.46 github.com/sirupsen/logrus v1.9.0 github.com/sleepinggenius2/gosmi v0.4.4 diff --git a/go.sum b/go.sum index fe1a82a95..8c23ee3dc 100644 --- a/go.sum +++ b/go.sum @@ -2063,8 +2063,8 @@ github.com/shirou/gopsutil/v3 v3.23.2/go.mod h1:gv0aQw33GLo3pG8SiWKiQrbDzbRY1K80 github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/showwin/speedtest-go v1.4.2 h1:3YjBajURQTJCv/rVwJsd5UtCYlaiqCihg5NhPxJapk8= -github.com/showwin/speedtest-go v1.4.2/go.mod h1:Y7c+pxzaNAlo4mYP+x83pnYY8IM3bkHGDhTdrgUnkNE= +github.com/showwin/speedtest-go v1.5.2 h1:drXsmaGC36VXi6biSZ+vyo/tYCkaoTU2mAF2b6wQmlk= +github.com/showwin/speedtest-go v1.5.2/go.mod h1:Y7c+pxzaNAlo4mYP+x83pnYY8IM3bkHGDhTdrgUnkNE= github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk= github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= diff --git a/plugins/inputs/internet_speed/README.md b/plugins/inputs/internet_speed/README.md index 7e99ad312..e3c1f282a 100644 --- a/plugins/inputs/internet_speed/README.md +++ b/plugins/inputs/internet_speed/README.md @@ -35,6 +35,18 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## Caches the closest server location # cache = false + ## Number of concurrent connections + ## By default or set to zero, the number of CPU cores is used. Use this to + ## reduce the impact on system performance or to increase the connections on + ## faster connections to ensure the fastest speed. + # connections = 0 + + ## Test mode + ## By default, a single sever is used for testing. This may work for most, + ## however, setting to "multi" will reach out to multiple servers in an + ## attempt to get closer to ideal internet speeds. + # test_mode = "single" + ## Server ID exclude filter ## Allows the user to exclude or include specific server IDs received by ## speedtest-go. Values in the exclude option will be skipped over. Values in @@ -64,9 +76,10 @@ And the following tags: | --------- | --------- | | Source | source | | Server ID | server_id | +| Test Mode | test_mode | ## Example Output -```sh -internet_speed,source=speedtest02.z4internet.com:8080,server_id=54619 download=318.37580265897725,upload=30.444407341274385,latency=37.73174,jitter=1.99810 1675458921000000000 +```text +internet_speed,source=speedtest02.z4internet.com:8080,server_id=54619,test_mode=single download=318.37580265897725,upload=30.444407341274385,latency=37.73174,jitter=1.99810 1675458921000000000 ``` diff --git a/plugins/inputs/internet_speed/internet_speed.go b/plugins/inputs/internet_speed/internet_speed.go index 0032f29de..e9106a30a 100644 --- a/plugins/inputs/internet_speed/internet_speed.go +++ b/plugins/inputs/internet_speed/internet_speed.go @@ -2,15 +2,18 @@ package internet_speed import ( + "context" _ "embed" "fmt" "math" + "os" "time" "github.com/showwin/speedtest-go/speedtest" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/filter" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) @@ -24,20 +27,35 @@ type InternetSpeed struct { EnableFileDownload bool `toml:"enable_file_download" deprecated:"1.25.0;use 'memory_saving_mode' instead"` MemorySavingMode bool `toml:"memory_saving_mode"` Cache bool `toml:"cache"` + Connections int `toml:"connections"` + TestMode string `toml:"test_mode"` Log telegraf.Logger `toml:"-"` - server *speedtest.Server + server *speedtest.Server // The main(best) server + servers speedtest.Servers // Auxiliary servers serverFilter filter.Filter } -const measurement = "internet_speed" +const ( + measurement = "internet_speed" + testModeSingle = "single" + testModeMulti = "multi" +) func (*InternetSpeed) SampleConfig() string { return sampleConfig } func (is *InternetSpeed) Init() error { + switch is.TestMode { + case testModeSingle, testModeMulti: + case "": + is.TestMode = testModeSingle + default: + return fmt.Errorf("unrecognized test mode: %q", is.TestMode) + } + is.MemorySavingMode = is.MemorySavingMode || is.EnableFileDownload var err error @@ -50,7 +68,9 @@ func (is *InternetSpeed) Init() error { } func (is *InternetSpeed) Gather(acc telegraf.Accumulator) error { - // if not caching, go find the closest server each time + // If not caching, go find the closest server each time. + // We will find the best server as the main server. And + // the remaining servers will be auxiliary candidates. if !is.Cache || is.server == nil { if err := is.findClosestServer(); err != nil { return fmt.Errorf("unable to find closest server: %w", err) @@ -61,13 +81,25 @@ func (is *InternetSpeed) Gather(acc telegraf.Accumulator) error { if err != nil { return fmt.Errorf("ping test failed: %w", err) } - err = is.server.DownloadTest(is.MemorySavingMode) - if err != nil { - return fmt.Errorf("download test failed, try `memory_saving_mode = true` if this fails consistently: %w", err) - } - err = is.server.UploadTest(is.MemorySavingMode) - if err != nil { - return fmt.Errorf("upload test failed failed, try `memory_saving_mode = true` if this fails consistently: %w", err) + + if is.TestMode == testModeMulti { + err = is.server.MultiDownloadTestContext(context.Background(), is.servers) + if err != nil { + return fmt.Errorf("download test failed: %w", err) + } + err = is.server.MultiUploadTestContext(context.Background(), is.servers) + if err != nil { + return fmt.Errorf("upload test failed failed: %w", err) + } + } else { + err = is.server.DownloadTest() + if err != nil { + return fmt.Errorf("download test failed: %w", err) + } + err = is.server.UploadTest() + if err != nil { + return fmt.Errorf("upload test failed failed: %w", err) + } } fields := map[string]any{ @@ -79,32 +111,42 @@ func (is *InternetSpeed) Gather(acc telegraf.Accumulator) error { tags := map[string]string{ "server_id": is.server.ID, "source": is.server.Host, + "test_mode": is.TestMode, } - // recycle the detailed data of each test to prevent data backlog + // Recycle the history of each test to prevent data backlog. is.server.Context.Reset() acc.AddFields(measurement, fields, tags) return nil } func (is *InternetSpeed) findClosestServer() error { - user, err := speedtest.FetchUserInfo() + client := speedtest.New(speedtest.WithUserConfig(&speedtest.UserConfig{ + UserAgent: internal.ProductToken(), + ICMP: os.Geteuid() == 0 || os.Geteuid() == -1, + SavingMode: is.MemorySavingMode, + })) + if is.Connections > 0 { + client.SetNThread(is.Connections) + } + + user, err := client.FetchUserInfo() if err != nil { return fmt.Errorf("fetching user info failed: %w", err) } - serverList, err := speedtest.FetchServers(user) + is.servers, err = client.FetchServers(user) if err != nil { return fmt.Errorf("fetching server list failed: %w", err) } - if len(serverList) < 1 { + if len(is.servers) < 1 { return fmt.Errorf("no servers found") } - // return the first match or the server with the lowest latency + // Return the first match or the server with the lowest latency // when filter mismatch all servers. var min int64 = math.MaxInt64 selectIndex := -1 - for index, server := range serverList { + for index, server := range is.servers { if is.serverFilter.Match(server.ID) { selectIndex = index break @@ -118,7 +160,7 @@ func (is *InternetSpeed) findClosestServer() error { } if selectIndex != -1 { - is.server = serverList[selectIndex] + is.server = is.servers[selectIndex] is.Log.Debugf("using server %s in %s (%s)\n", is.server.ID, is.server.Name, is.server.Host) return nil } diff --git a/plugins/inputs/internet_speed/sample.conf b/plugins/inputs/internet_speed/sample.conf index 015767b62..4fbfdb6fb 100644 --- a/plugins/inputs/internet_speed/sample.conf +++ b/plugins/inputs/internet_speed/sample.conf @@ -11,6 +11,18 @@ ## Caches the closest server location # cache = false + ## Number of concurrent connections + ## By default or set to zero, the number of CPU cores is used. Use this to + ## reduce the impact on system performance or to increase the connections on + ## faster connections to ensure the fastest speed. + # connections = 0 + + ## Test mode + ## By default, a single sever is used for testing. This may work for most, + ## however, setting to "multi" will reach out to multiple servers in an + ## attempt to get closer to ideal internet speeds. + # test_mode = "single" + ## Server ID exclude filter ## Allows the user to exclude or include specific server IDs received by ## speedtest-go. Values in the exclude option will be skipped over. Values in