Stackstate-Apache Kafka Integration

Overview

Connect Kafka to Stackstate in order to:

  • Visualize the performance of your cluster in real time
  • Correlate the performance of Kafka with the rest of your applications

This check has a limit of 350 metrics per instance. The number of returned metrics is indicated in the info page. You can specify the metrics you are interested in by editing the configuration below. To learn how to customize the metrics to collect visit the JMX Checks documentation for more detailed instructions. If you need to monitor more metrics, please send an email to support.

Installation

Kafka metrics are captured using a JMX connection. We recommend the use of Oracle’s JDK for this integration.

Configuration

There are two configuration files to edit for this integration, both of which are in the standard conf.d directory under the agent installation directory:

  • kafka.yaml
  • kafka_consumer.yaml

The first step is to edit your kafka.yaml file. Kafka bean names depend on the exact Kafka version you’re running. You should always use the example that comes in your agent installation as a base since that will be the most up to date version. You can also find the latest versions on the GitHub repo, but note that the version on their may be for a newer version of the agent than what you have installed.

##########
# WARNING
##########
# This sample works only for Kafka >= 0.8.2.

instances:
  - host: localhost
    port: 9999 # This is the JMX port on which Kafka exposes its metrics (usually 9999)
    tags:
      kafka: broker

init_config:
  is_jmx: true

  # Metrics collected by this check. You should not have to modify this.
  conf:
    # v0.8.2.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.message_rate
    # v0.8.2.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.max_lag
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MinFetchRate,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.fetch_rate
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.messages_in

    # Offsets committed to ZooKeeper
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.zookeeper_commits
    # Offsets committed to Kafka
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.kafka_commits
    # v0.9.0.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          response-rate:
            metric_type: gauge
            alias: kafka.producer.response_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-rate:
            metric_type: gauge
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-latency-avg:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          outgoing-byte-rate:
            metric_type: gauge
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          io-wait-time-ns-avg:
            metric_type: gauge
            alias: kafka.producer.io_wait

    # v0.9.0.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          bytes-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          records-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.messages_in
    #
    # Aggregate cluster stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_out.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.messages_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_rejected.rate

    #
    # Request timings
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch.failed.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.failed.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.produce.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.produce.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_consumer.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_follower.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=UpdateMetadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Metadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Offsets'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.offsets.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.offsets.time.99percentile
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.handler.avg.idle.pct.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.producer_request_purgatory.size
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=FetchRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.fetch_request_purgatory.size

    #
    # Replication stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.under_replicated_partitions
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrShrinksPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_shrinks.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrExpandsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_expands.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.unclean_leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=OfflinePartitionsCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.offline_partitions_count
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=ActiveControllerCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.active_controller_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=PartitionCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.partition_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=LeaderCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.leader_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.max_lag

    #
    # Log flush stats
    #
    - include:
        domain: 'kafka.log'
        bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.log.flush_rate.rate

And edit conf.d/kafka_consumer.yaml

init_config:
#  Customize the ZooKeeper connection timeout here
#  zk_timeout: 5
#  Customize the Kafka connection timeout here
#  kafka_timeout: 5

instances:
  # - kafka_connect_str: localhost:9092
  #   zk_connect_str: localhost:2181
  #   zk_prefix: /0.8
  #   consumer_groups:
  #     my_consumer:
  #       my_topic: [0, 1, 4, 12]

For more details about configuring this integration refer to the following file(s) on GitHub:

Validation

To validate that the integration is working, restart the agent and then run the info command (For help on these steps, see Getting Started with the Agent. The output should contain a section similar to the following:

Checks
======

  [...]

  kafka-localhost-9999
  --------------------
      - instance #0 [OK]
      - Collected 8 metrics & 0 events

Metrics

kafka.broker_offset
(gauge)
Current message offset on broker.
shown as offset
kafka.consumer_lag
(gauge)
Lag in messages between consumer and broker.
shown as offset
kafka.consumer_offset
(gauge)
Current message offset on consumer.
shown as offset
kafka.net.bytes_out
(gauge every 10 seconds)
Outgoing byte rate.
shown as byte/second
kafka.net.bytes_in
(gauge every 10 seconds)
Incoming byte rate.
shown as byte/second
kafka.net.bytes_rejected
(gauge every 10 seconds)
Rejected byte rate.
shown as byte/second
kafka.messages_in
(gauge every 10 seconds)
Incoming message rate.
shown as message
kafka.request.fetch.failed
(gauge every 10 seconds)
Number of client fetch request failures.
shown as request
kafka.request.fetch.failed_per_second
(gauge every 10 seconds)
Rate of client fetch request failures per second.
shown as request/second
kafka.request.produce.time.avg
(gauge every 10 seconds)
Average time for a produce request.
shown as request/second
kafka.request.produce.time.99percentile
(gauge every 10 seconds)
Time for produce requests for 99th percentile.
shown as request/second
kafka.request.produce.failed_per_second
(gauge every 10 seconds)
Rate of failed produce requests per second.
shown as request/second
kafka.request.produce.failed
(gauge every 10 seconds)
Number of failed produce requests.
shown as request
kafka.request.fetch.time.avg
(gauge every 10 seconds)
Average time per fetch request.
shown as request/second
kafka.request.fetch.time.99percentile
(gauge every 10 seconds)
Time for fetch requests for 99th percentile.
shown as request/second
kafka.request.update_metadata.time.avg
(gauge every 10 seconds)
Average time for a request to update metadata.
shown as millisecond
kafka.request.update_metadata.time.99percentile
(gauge every 10 seconds)
Time for update metadata requests for 99th percentile.
shown as millisecond
kafka.request.metadata.time.avg
(gauge every 10 seconds)
Average time for metadata request.
shown as millisecond
kafka.request.metadata.time.99percentile
(gauge every 10 seconds)
Time for metadata requests for 99th percentile.
shown as millisecond
kafka.request.offsets.time.avg
(gauge every 10 seconds)
Average time for an offset request.
shown as millisecond
kafka.request.offsets.time.99percentile
(gauge every 10 seconds)
Time for offset requests for 99th percentile.
shown as millisecond
kafka.request.handler.avg.idle.pct
(gauge every 10 seconds)
Average fraction of time the request handler threads are idle.
shown as fraction
kafka.replication.isr_shrinks
(gauge every 10 seconds)
Rate of replicas leaving the ISR pool.
shown as node/second
kafka.replication.isr_expands
(gauge every 10 seconds)
Rate of replicas joining the ISR pool.
shown as node/second
kafka.replication.leader_elections
(gauge every 10 seconds)
Leader election rate.
shown as event/second
kafka.replication.unclean_leader_elections
(gauge every 10 seconds)
Unclean leader election rate.
shown as event/second
kafka.replication.under_replicated_partitions
(gauge every 10 seconds)
Number of unreplicated partitions.
kafka.log.flush_rate
(gauge every 10 seconds)
Log flush rate.
shown as flush/second
kafka.consumer.delayed_requests
(gauge every 10 seconds)
Number of delayed consumer requests.
shown as request
kafka.consumer.expires_per_second
(gauge every 10 seconds)
Rate of delayed consumer request expiration.
shown as eviction/second
kafka.expires_sec
(gauge every 10 seconds)
Rate of delayed producer request expiration.
shown as eviction/second
kafka.follower.expires_per_second
(gauge every 10 seconds)
Rate of request expiration on followers.
shown as eviction/second
kafka.producer.delayed_requests
(gauge every 10 seconds)
Number of producer requests delayed.
shown as request
kafka.producer.expires_per_seconds
(gauge every 10 seconds)
Rate of producer request expiration.
shown as eviction/second
kafka.producer.request_rate
(gauge every 10 seconds)
Number of producer requests per second.
shown as request/second
kafka.producer.response_rate
(gauge every 10 seconds)
Number of producer responses per second.
shown as response/second
kafka.producer.request_latency_avg
(gauge every 10 seconds)
Producer average request latency.
shown as millisecond
kafka.producer.bytes_out
(gauge every 10 seconds)
Producer bytes out rate.
shown as byte/second
kafka.producer.message_rate
(gauge every 10 seconds)
Producer message rate.
shown as message/second
kafka.producer.io_wait
(gauge every 10 seconds)
Producer I/O wait time.
shown as nanosecond
kafka.consumer.max_lag
(gauge every 10 seconds)
Maximum consumer lag.
shown as offset
kafka.consumer.fetch_rate
(gauge every 10 seconds)
The minimum rate at which the consumer sends fetch requests to a broker.
shown as request
kafka.consumer.bytes_in
(gauge every 10 seconds)
Consumer bytes in rate.
shown as byte/second
kafka.consumer.messages_in
(gauge every 10 seconds)
Rate of consumer message consumption.
shown as message/second
kafka.consumer.zookeeper_commits
(gauge every 10 seconds)
Rate of offset commits to ZooKeeper.
shown as write/second
kafka.consumer.kafka_commits
(gauge every 10 seconds)
Rate of offset commits to Kafka.
shown as write/second