feat(inputs.openweathermap): Add per-city query scheme for current weather (#14214)

This commit is contained in:
Sven Rebhan 2023-10-31 13:29:12 +01:00 committed by GitHub
parent debae8ead0
commit 3b2d8c507f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 443 additions and 208 deletions

View File

@ -37,7 +37,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# lang = "en"
## APIs to fetch; can contain "weather" or "forecast".
fetch = ["weather", "forecast"]
# fetch = ["weather", "forecast"]
## OpenWeatherMap base URL
# base_url = "https://api.openweathermap.org/"
@ -49,9 +49,18 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## "metric", "imperial", or "standard".
# units = "metric"
## Query interval; OpenWeatherMap weather data is updated every 10
## minutes.
interval = "10m"
## Style to query the current weather; available options
## batch -- query multiple cities at once using the "group" endpoint
## individual -- query each city individually using the "weather" endpoint
## You should use "individual" here as it is documented and provides more
## frequent updates. The default is "batch" for backward compatibility.
# query_style = "batch"
## Query interval to fetch data.
## By default the gloabl 'interval' setting is used. You should override the
## interval here if the global setting is shorter than 10 minutes as
## OpenWeatherMap weather data is only updated every 10 minutes.
# interval = "10m"
```
## Metrics

View File

@ -22,17 +22,9 @@ import (
//go:embed sample.conf
var sampleConfig string
const (
// https://openweathermap.org/current#severalid
// Call for several city IDs
// The limit of locations is 20.
owmRequestSeveralCityID int = 20
defaultBaseURL = "https://api.openweathermap.org/"
defaultResponseTimeout = time.Second * 5
defaultUnits = "metric"
defaultLang = "en"
)
// https://openweathermap.org/current#severalid
// Limit for the number of city IDs per request.
const maxIDsPerBatch int = 20
type OpenWeatherMap struct {
AppID string `toml:"app_id"`
@ -42,8 +34,10 @@ type OpenWeatherMap struct {
BaseURL string `toml:"base_url"`
ResponseTimeout config.Duration `toml:"response_timeout"`
Units string `toml:"units"`
QueryStyle string `toml:"query_style"`
client *http.Client
cityIDBatches []string
baseParsedURL *url.URL
}
@ -51,48 +45,119 @@ func (*OpenWeatherMap) SampleConfig() string {
return sampleConfig
}
func (n *OpenWeatherMap) Init() error {
// Set the default for the base-URL if not given
if n.BaseURL == "" {
n.BaseURL = "https://api.openweathermap.org/"
}
// Check the query-style setting
switch n.QueryStyle {
case "":
n.QueryStyle = "batch"
case "batch", "individual":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown query-style: %s", n.QueryStyle)
}
// Check the unit setting
switch n.Units {
case "":
n.Units = "metric"
case "imperial", "standard", "metric":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown units: %s", n.Units)
}
// Check the language setting
switch n.Lang {
case "":
n.Lang = "en"
case "ar", "bg", "ca", "cz", "de", "el", "en", "fa", "fi", "fr", "gl",
"hr", "hu", "it", "ja", "kr", "la", "lt", "mk", "nl", "pl",
"pt", "ro", "ru", "se", "sk", "sl", "es", "tr", "ua", "vi",
"zh_cn", "zh_tw":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown language: %s", n.Lang)
}
// Check the properties to fetch
if len(n.Fetch) == 0 {
n.Fetch = []string{"weather", "forecast"}
}
for _, fetch := range n.Fetch {
switch fetch {
case "forecast", "weather":
// Do nothing, those are valid
default:
return fmt.Errorf("unknown property to fetch: %s", fetch)
}
}
// Split the city IDs into batches smaller than the maximum size
nBatches := len(n.CityID) / maxIDsPerBatch
if len(n.CityID)%maxIDsPerBatch != 0 {
nBatches++
}
batches := make([][]string, nBatches)
for i, id := range n.CityID {
batch := i / maxIDsPerBatch
batches[batch] = append(batches[batch], id)
}
n.cityIDBatches = make([]string, 0, nBatches)
for _, batch := range batches {
n.cityIDBatches = append(n.cityIDBatches, strings.Join(batch, ","))
}
// Parse the base-URL used later to construct the property API endpoint
u, err := url.Parse(n.BaseURL)
if err != nil {
return err
}
n.baseParsedURL = u
// Create an HTTP client to be used in each collection interval
n.client = &http.Client{
Transport: &http.Transport{},
Timeout: time.Duration(n.ResponseTimeout),
}
return nil
}
func (n *OpenWeatherMap) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
var strs []string
for _, fetch := range n.Fetch {
if fetch == "forecast" {
for _, city := range n.CityID {
addr := n.formatURL("/data/2.5/forecast", city)
switch fetch {
case "forecast":
for _, cityID := range n.CityID {
wg.Add(1)
go func() {
go func(city string) {
defer wg.Done()
status, err := n.gatherURL(addr)
if err != nil {
acc.AddError(err)
return
}
gatherForecast(acc, status)
}()
acc.AddError(n.gatherForecast(acc, city))
}(cityID)
}
} else if fetch == "weather" {
j := 0
for j < len(n.CityID) {
strs = make([]string, 0)
for i := 0; j < len(n.CityID) && i < owmRequestSeveralCityID; i++ {
strs = append(strs, n.CityID[j])
j++
case "weather":
switch n.QueryStyle {
case "individual":
for _, cityID := range n.CityID {
wg.Add(1)
go func(city string) {
defer wg.Done()
acc.AddError(n.gatherWeather(acc, city))
}(cityID)
}
case "batch":
for _, cityIDs := range n.cityIDBatches {
wg.Add(1)
go func(cities string) {
defer wg.Done()
acc.AddError(n.gatherWeatherBatch(acc, cities))
}(cityIDs)
}
cities := strings.Join(strs, ",")
addr := n.formatURL("/data/2.5/group", cities)
wg.Add(1)
go func() {
defer wg.Done()
status, err := n.gatherURL(addr)
if err != nil {
acc.AddError(err)
return
}
gatherWeather(acc, status)
}()
}
}
}
@ -101,122 +166,69 @@ func (n *OpenWeatherMap) Gather(acc telegraf.Accumulator) error {
return nil
}
func (n *OpenWeatherMap) createHTTPClient() *http.Client {
if n.ResponseTimeout < config.Duration(time.Second) {
n.ResponseTimeout = config.Duration(defaultResponseTimeout)
}
client := &http.Client{
Transport: &http.Transport{},
Timeout: time.Duration(n.ResponseTimeout),
}
return client
}
func (n *OpenWeatherMap) gatherURL(addr string) (*Status, error) {
resp, err := n.client.Get(addr)
func (n *OpenWeatherMap) gatherWeather(acc telegraf.Accumulator, city string) error {
// Query the data and decode the response
addr := n.formatURL("/data/2.5/weather", city)
buf, err := n.gatherURL(addr)
if err != nil {
return nil, fmt.Errorf("error making HTTP request to %q: %w", addr, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%s returned HTTP status %s", addr, resp.Status)
return fmt.Errorf("querying %q failed: %w", addr, err)
}
mediaType, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
var e WeatherEntry
if err := json.Unmarshal(buf, &e); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err)
}
// Construct the metric
tm := time.Unix(e.Dt, 0)
fields := map[string]interface{}{
"cloudiness": e.Clouds.All,
"humidity": e.Main.Humidity,
"pressure": e.Main.Pressure,
"rain": e.rain(),
"snow": e.snow(),
"sunrise": time.Unix(e.Sys.Sunrise, 0).UnixNano(),
"sunset": time.Unix(e.Sys.Sunset, 0).UnixNano(),
"temperature": e.Main.Temp,
"feels_like": e.Main.Feels,
"visibility": e.Visibility,
"wind_degrees": e.Wind.Deg,
"wind_speed": e.Wind.Speed,
}
tags := map[string]string{
"city": e.Name,
"city_id": strconv.FormatInt(e.ID, 10),
"country": e.Sys.Country,
"forecast": "*",
}
if len(e.Weather) > 0 {
fields["condition_description"] = e.Weather[0].Description
fields["condition_icon"] = e.Weather[0].Icon
tags["condition_id"] = strconv.FormatInt(e.Weather[0].ID, 10)
tags["condition_main"] = e.Weather[0].Main
}
acc.AddFields("weather", fields, tags, tm)
return nil
}
func (n *OpenWeatherMap) gatherWeatherBatch(acc telegraf.Accumulator, cities string) error {
// Query the data and decode the response
addr := n.formatURL("/data/2.5/group", cities)
buf, err := n.gatherURL(addr)
if err != nil {
return nil, err
return fmt.Errorf("querying %q failed: %w", addr, err)
}
if mediaType != "application/json" {
return nil, fmt.Errorf("%s returned unexpected content type %s", addr, mediaType)
var status Status
if err := json.Unmarshal(buf, &status); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err)
}
return gatherWeatherURL(resp.Body)
}
type WeatherEntry struct {
Dt int64 `json:"dt"`
Clouds struct {
All int64 `json:"all"`
} `json:"clouds"`
Main struct {
Humidity int64 `json:"humidity"`
Pressure float64 `json:"pressure"`
Temp float64 `json:"temp"`
Feels float64 `json:"feels_like"`
} `json:"main"`
Rain struct {
Rain1 float64 `json:"1h"`
Rain3 float64 `json:"3h"`
} `json:"rain"`
Snow struct {
Snow1 float64 `json:"1h"`
Snow3 float64 `json:"3h"`
} `json:"snow"`
Sys struct {
Country string `json:"country"`
Sunrise int64 `json:"sunrise"`
Sunset int64 `json:"sunset"`
} `json:"sys"`
Wind struct {
Deg float64 `json:"deg"`
Speed float64 `json:"speed"`
} `json:"wind"`
ID int64 `json:"id"`
Name string `json:"name"`
Coord struct {
Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
} `json:"coord"`
Visibility int64 `json:"visibility"`
Weather []struct {
ID int64 `json:"id"`
Main string `json:"main"`
Description string `json:"description"`
Icon string `json:"icon"`
} `json:"weather"`
}
type Status struct {
City struct {
Coord struct {
Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
} `json:"coord"`
Country string `json:"country"`
ID int64 `json:"id"`
Name string `json:"name"`
} `json:"city"`
List []WeatherEntry `json:"list"`
}
func gatherWeatherURL(r io.Reader) (*Status, error) {
dec := json.NewDecoder(r)
status := &Status{}
if err := dec.Decode(status); err != nil {
return nil, fmt.Errorf("error while decoding JSON response: %w", err)
}
return status, nil
}
func gatherSnow(e WeatherEntry) float64 {
if e.Snow.Snow1 > 0 {
return e.Snow.Snow1
}
return e.Snow.Snow3
}
func gatherRain(e WeatherEntry) float64 {
if e.Rain.Rain1 > 0 {
return e.Rain.Rain1
}
return e.Rain.Rain3
}
func gatherWeather(acc telegraf.Accumulator, status *Status) {
// Construct the metrics
for _, e := range status.List {
tm := time.Unix(e.Dt, 0)
@ -224,8 +236,8 @@ func gatherWeather(acc telegraf.Accumulator, status *Status) {
"cloudiness": e.Clouds.All,
"humidity": e.Main.Humidity,
"pressure": e.Main.Pressure,
"rain": gatherRain(e),
"snow": gatherSnow(e),
"rain": e.rain(),
"snow": e.snow(),
"sunrise": time.Unix(e.Sys.Sunrise, 0).UnixNano(),
"sunset": time.Unix(e.Sys.Sunset, 0).UnixNano(),
"temperature": e.Main.Temp,
@ -250,9 +262,24 @@ func gatherWeather(acc telegraf.Accumulator, status *Status) {
acc.AddFields("weather", fields, tags, tm)
}
return nil
}
func gatherForecast(acc telegraf.Accumulator, status *Status) {
func (n *OpenWeatherMap) gatherForecast(acc telegraf.Accumulator, city string) error {
// Query the data and decode the response
addr := n.formatURL("/data/2.5/forecast", city)
buf, err := n.gatherURL(addr)
if err != nil {
return fmt.Errorf("querying %q failed: %w", addr, err)
}
var status Status
if err := json.Unmarshal(buf, &status); err != nil {
return fmt.Errorf("parsing JSON response failed: %w", err)
}
// Construct the metric
tags := map[string]string{
"city_id": strconv.FormatInt(status.City.ID, 10),
"forecast": "*",
@ -265,8 +292,8 @@ func gatherForecast(acc telegraf.Accumulator, status *Status) {
"cloudiness": e.Clouds.All,
"humidity": e.Main.Humidity,
"pressure": e.Main.Pressure,
"rain": gatherRain(e),
"snow": gatherSnow(e),
"rain": e.rain(),
"snow": e.snow(),
"temperature": e.Main.Temp,
"feels_like": e.Main.Feels,
"wind_degrees": e.Wind.Deg,
@ -281,47 +308,6 @@ func gatherForecast(acc telegraf.Accumulator, status *Status) {
tags["forecast"] = fmt.Sprintf("%dh", (i+1)*3)
acc.AddFields("weather", fields, tags, tm)
}
}
func init() {
inputs.Add("openweathermap", func() telegraf.Input {
tmout := config.Duration(defaultResponseTimeout)
return &OpenWeatherMap{
ResponseTimeout: tmout,
BaseURL: defaultBaseURL,
}
})
}
func (n *OpenWeatherMap) Init() error {
var err error
n.baseParsedURL, err = url.Parse(n.BaseURL)
if err != nil {
return err
}
// Create an HTTP client that is re-used for each
// collection interval
n.client = n.createHTTPClient()
switch n.Units {
case "imperial", "standard", "metric":
case "":
n.Units = defaultUnits
default:
return fmt.Errorf("unknown units: %s", n.Units)
}
switch n.Lang {
case "ar", "bg", "ca", "cz", "de", "el", "en", "fa", "fi", "fr", "gl",
"hr", "hu", "it", "ja", "kr", "la", "lt", "mk", "nl", "pl",
"pt", "ro", "ru", "se", "sk", "sl", "es", "tr", "ua", "vi",
"zh_cn", "zh_tw":
case "":
n.Lang = defaultLang
default:
return fmt.Errorf("unknown language: %s", n.Lang)
}
return nil
}
@ -341,3 +327,34 @@ func (n *OpenWeatherMap) formatURL(path string, city string) string {
return n.baseParsedURL.ResolveReference(relative).String()
}
func (n *OpenWeatherMap) gatherURL(addr string) ([]byte, error) {
resp, err := n.client.Get(addr)
if err != nil {
return nil, fmt.Errorf("error making HTTP request to %q: %w", addr, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%s returned HTTP status %s", addr, resp.Status)
}
mediaType, _, err := mime.ParseMediaType(resp.Header.Get("Content-Type"))
if err != nil {
return nil, err
}
if mediaType != "application/json" {
return nil, fmt.Errorf("%s returned unexpected content type %s", addr, mediaType)
}
return io.ReadAll(resp.Body)
}
func init() {
inputs.Add("openweathermap", func() telegraf.Input {
return &OpenWeatherMap{
ResponseTimeout: config.Duration(5 * time.Second),
}
})
}

View File

@ -13,7 +13,7 @@
# lang = "en"
## APIs to fetch; can contain "weather" or "forecast".
fetch = ["weather", "forecast"]
# fetch = ["weather", "forecast"]
## OpenWeatherMap base URL
# base_url = "https://api.openweathermap.org/"
@ -25,6 +25,15 @@
## "metric", "imperial", or "standard".
# units = "metric"
## Query interval; OpenWeatherMap weather data is updated every 10
## minutes.
interval = "10m"
## Style to query the current weather; available options
## batch -- query multiple cities at once using the "group" endpoint
## individual -- query each city individually using the "weather" endpoint
## You should use "individual" here as it is documented and provides more
## frequent updates. The default is "batch" for backward compatibility.
# query_style = "batch"
## Query interval to fetch data.
## By default the gloabl 'interval' setting is used. You should override the
## interval here if the global setting is shorter than 10 minutes as
## OpenWeatherMap weather data is only updated every 10 minutes.
# interval = "10m"

View File

@ -0,0 +1,3 @@
weather,city=Moscow,city_id=524901,condition_id=802,condition_main=Clouds,country=RU,forecast=* cloudiness=40i,condition_description="scattered clouds",condition_icon="03d",feels_like=8.57,humidity=46i,pressure=1014,rain=0,snow=0,sunrise=1556416455000000000i,sunset=1556470779000000000i,temperature=9.57,visibility=10000i,wind_degrees=60,wind_speed=5 1556444155000000000
weather,city=Kiev,city_id=703448,condition_id=520,condition_main=Rain,country=UA,forecast=* cloudiness=0i,condition_description="light intensity shower rain",condition_icon="09d",feels_like=18.29,humidity=63i,pressure=1009,rain=0,snow=0,sunrise=1556419155000000000i,sunset=1556471486000000000i,temperature=19.29,visibility=10000i,wind_degrees=0,wind_speed=1 1556444155000000000
weather,city=London,city_id=2643743,condition_id=804,condition_main=Clouds,country=GB,forecast=* cloudiness=100i,condition_description="overcast clouds",condition_icon="04n",feels_like=7.91,humidity=90i,pressure=997,rain=0,snow=0,sunrise=1698648577000000000i,sunset=1698683914000000000i,temperature=8.94,visibility=10000i,wind_degrees=250,wind_speed=2.06 1556444155000000000

View File

@ -0,0 +1,43 @@
{
"coord": {
"lon": -0.1257,
"lat": 51.5085
},
"weather": [
{
"id": 804,
"main": "Clouds",
"description": "overcast clouds",
"icon": "04n"
}
],
"base": "stations",
"main": {
"temp": 8.94,
"feels_like": 7.91,
"temp_min": 7.38,
"temp_max": 9.98,
"pressure": 997,
"humidity": 90
},
"visibility": 10000,
"wind": {
"speed": 2.06,
"deg": 250
},
"clouds": {
"all": 100
},
"dt": 1556444155,
"sys": {
"type": 2,
"id": 2006068,
"country": "GB",
"sunrise": 1698648577,
"sunset": 1698683914
},
"timezone": 0,
"id": 2643743,
"name": "London",
"cod": 200
}

View File

@ -0,0 +1,39 @@
{
"coord": {
"lon": 37.62,
"lat": 55.75
},
"sys": {
"type": 1,
"id": 9029,
"message": 0.0061,
"country": "RU",
"sunrise": 1556416455,
"sunset": 1556470779
},
"weather": [
{
"id": 802,
"main": "Clouds",
"description": "scattered clouds",
"icon": "03d"
}
],
"main": {
"temp": 9.57,
"feels_like": 8.57,
"pressure": 1014,
"humidity": 46
},
"visibility": 10000,
"wind": {
"speed": 5,
"deg": 60
},
"clouds": {
"all": 40
},
"dt": 1556444155,
"id": 524901,
"name": "Moscow"
}

View File

@ -0,0 +1,38 @@
{
"coord": {
"lon": 30.52,
"lat": 50.43
},
"sys": {
"type": 1,
"id": 8903,
"message": 0.0076,
"country": "UA",
"sunrise": 1556419155,
"sunset": 1556471486
},
"weather": [
{
"id": 520,
"main": "Rain",
"description": "light intensity shower rain",
"icon": "09d"
}
],
"main": {
"temp": 19.29,
"feels_like": 18.29,
"pressure": 1009,
"humidity": 63
},
"visibility": 10000,
"wind": {
"speed": 1
},
"clouds": {
"all": 0
},
"dt": 1556444155,
"id": 703448,
"name": "Kiev"
}

View File

@ -0,0 +1,5 @@
[[inputs.openweathermap]]
app_id = "noappid"
city_id = ["524901", "703448", "2643743"]
fetch = ["weather", "forecast"]
query_style = "individual"

View File

@ -0,0 +1,71 @@
package openweathermap
type WeatherEntry struct {
Dt int64 `json:"dt"`
Clouds struct {
All int64 `json:"all"`
} `json:"clouds"`
Main struct {
Humidity int64 `json:"humidity"`
Pressure float64 `json:"pressure"`
Temp float64 `json:"temp"`
Feels float64 `json:"feels_like"`
} `json:"main"`
Rain struct {
Rain1 float64 `json:"1h"`
Rain3 float64 `json:"3h"`
} `json:"rain"`
Snow struct {
Snow1 float64 `json:"1h"`
Snow3 float64 `json:"3h"`
} `json:"snow"`
Sys struct {
Country string `json:"country"`
Sunrise int64 `json:"sunrise"`
Sunset int64 `json:"sunset"`
} `json:"sys"`
Wind struct {
Deg float64 `json:"deg"`
Speed float64 `json:"speed"`
} `json:"wind"`
ID int64 `json:"id"`
Name string `json:"name"`
Coord struct {
Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
} `json:"coord"`
Visibility int64 `json:"visibility"`
Weather []struct {
ID int64 `json:"id"`
Main string `json:"main"`
Description string `json:"description"`
Icon string `json:"icon"`
} `json:"weather"`
}
func (e WeatherEntry) snow() float64 {
if e.Snow.Snow1 > 0 {
return e.Snow.Snow1
}
return e.Snow.Snow3
}
func (e WeatherEntry) rain() float64 {
if e.Rain.Rain1 > 0 {
return e.Rain.Rain1
}
return e.Rain.Rain3
}
type Status struct {
City struct {
Coord struct {
Lat float64 `json:"lat"`
Lon float64 `json:"lon"`
} `json:"coord"`
Country string `json:"country"`
ID int64 `json:"id"`
Name string `json:"name"`
} `json:"city"`
List []WeatherEntry `json:"list"`
}