diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index 45f40142b..45df73fa8 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -163,6 +163,7 @@ Stats][management-reference]. - consumer_utilisation (float, percent) - consumers (int, int) - idle_since (string, time - e.g., "2006-01-02 15:04:05") + - head_message_timestamp (int, unix timestamp - only emitted if available from API) - memory (int, bytes) - message_bytes (int, bytes) - message_bytes_persist (int, bytes) @@ -233,7 +234,7 @@ SELECT NON_NEGATIVE_DERIVATIVE(LAST("messages_published"), 1m) AS messages_publi ## Example Output ```text -rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000 +rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org head_message_timestamp=1493684017,messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000 rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,clustering_listeners=2i,amqp_listeners=1i 1493684035000000000 rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000 rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000 diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 2742fd9d9..5fdf1b51c 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -144,6 +144,7 @@ type Queue struct { IdleSince string `json:"idle_since"` SlaveNodes []string `json:"slave_nodes"` SynchronisedSlaveNodes []string `json:"synchronised_slave_nodes"` + HeadMessageTimestamp *int64 `json:"head_message_timestamp"` } // Node ... @@ -585,36 +586,42 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { "auto_delete": strconv.FormatBool(queue.AutoDelete), } + fields := map[string]interface{}{ + // common information + "consumers": queue.Consumers, + "consumer_utilisation": queue.ConsumerUtilisation, + "idle_since": queue.IdleSince, + "slave_nodes": len(queue.SlaveNodes), + "synchronised_slave_nodes": len(queue.SynchronisedSlaveNodes), + "memory": queue.Memory, + // messages information + "message_bytes": queue.MessageBytes, + "message_bytes_ready": queue.MessageBytesReady, + "message_bytes_unacked": queue.MessageBytesUnacknowledged, + "message_bytes_ram": queue.MessageRAM, + "message_bytes_persist": queue.MessagePersistent, + "messages": queue.Messages, + "messages_ready": queue.MessagesReady, + "messages_unack": queue.MessagesUnacknowledged, + "messages_ack": queue.MessageStats.Ack, + "messages_ack_rate": queue.MessageStats.AckDetails.Rate, + "messages_deliver": queue.MessageStats.Deliver, + "messages_deliver_rate": queue.MessageStats.DeliverDetails.Rate, + "messages_deliver_get": queue.MessageStats.DeliverGet, + "messages_deliver_get_rate": queue.MessageStats.DeliverGetDetails.Rate, + "messages_publish": queue.MessageStats.Publish, + "messages_publish_rate": queue.MessageStats.PublishDetails.Rate, + "messages_redeliver": queue.MessageStats.Redeliver, + "messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate, + } + + if queue.HeadMessageTimestamp != nil { + fields["head_message_timestamp"] = *queue.HeadMessageTimestamp + } + acc.AddFields( "rabbitmq_queue", - map[string]interface{}{ - // common information - "consumers": queue.Consumers, - "consumer_utilisation": queue.ConsumerUtilisation, - "idle_since": queue.IdleSince, - "slave_nodes": len(queue.SlaveNodes), - "synchronised_slave_nodes": len(queue.SynchronisedSlaveNodes), - "memory": queue.Memory, - // messages information - "message_bytes": queue.MessageBytes, - "message_bytes_ready": queue.MessageBytesReady, - "message_bytes_unacked": queue.MessageBytesUnacknowledged, - "message_bytes_ram": queue.MessageRAM, - "message_bytes_persist": queue.MessagePersistent, - "messages": queue.Messages, - "messages_ready": queue.MessagesReady, - "messages_unack": queue.MessagesUnacknowledged, - "messages_ack": queue.MessageStats.Ack, - "messages_ack_rate": queue.MessageStats.AckDetails.Rate, - "messages_deliver": queue.MessageStats.Deliver, - "messages_deliver_rate": queue.MessageStats.DeliverDetails.Rate, - "messages_deliver_get": queue.MessageStats.DeliverGet, - "messages_deliver_get_rate": queue.MessageStats.DeliverGetDetails.Rate, - "messages_publish": queue.MessageStats.Publish, - "messages_publish_rate": queue.MessageStats.PublishDetails.Rate, - "messages_redeliver": queue.MessageStats.Redeliver, - "messages_redeliver_rate": queue.MessageStats.RedeliverDetails.Rate, - }, + fields, tags, ) } diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index e867b1e2d..e006236eb 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -83,6 +83,7 @@ func TestRabbitMQGeneratesMetricsSet1(t *testing.T) { map[string]interface{}{ "consumers": int64(3), "consumer_utilisation": float64(1.0), + "head_message_timestamp": int64(1446362534), "memory": int64(143776), "message_bytes": int64(3), "message_bytes_ready": int64(4), diff --git a/plugins/inputs/rabbitmq/testdata/set1/queues.json b/plugins/inputs/rabbitmq/testdata/set1/queues.json index 294f78872..cbf58a74a 100644 --- a/plugins/inputs/rabbitmq/testdata/set1/queues.json +++ b/plugins/inputs/rabbitmq/testdata/set1/queues.json @@ -84,7 +84,7 @@ "q4": 0, "target_ram_count": 0 }, - "head_message_timestamp": null, + "head_message_timestamp": 1446362534, "message_bytes_persistent": 7, "message_bytes_ram": 6, "message_bytes_unacknowledged": 5,