diff --git a/data/common/kafka.yaml b/data/common/kafka.yaml index 96c96a5b..486e68ea 100644 --- a/data/common/kafka.yaml +++ b/data/common/kafka.yaml @@ -1,93 +1,97 @@ --- zookeeper::clusters: rocquencourt: '1': kafka1.internal.softwareheritage.org '2': kafka2.internal.softwareheritage.org '3': kafka3.internal.softwareheritage.org '4': kafka4.internal.softwareheritage.org zookeeper::datastore: /var/lib/zookeeper zookeeper::client_port: 2181 zookeeper::election_port: 2888 zookeeper::leader_port: 3888 kafka::version: '2.6.0' kafka::scala_version: '2.13' kafka::mirror_url: https://archive.apache.org/dist/ kafka::cluster::heap_ops: "-Xmx6G -Xms6G" kafka::logdirs: - /srv/kafka/logdir kafka::broker_config: log.dirs: "%{alias('kafka::logdirs')}" num.recovery.threads.per.data.dir: 10 # Increase zookeeper and replication timeouts # https://cwiki.apache.org/confluence/display/KAFKA/KIP-537%3A+Increase+default+zookeeper+session+timeout will be default in 2.5.0 zookeeper.session.timeout.ms: 18000 replica.lag.time.max.ms: 30000 # Bump consumer offset retention to 30 days instead of the default of 7 days offsets.retention.minutes: 43200 # Increase the socket request max size to 200 MB socket.request.max.bytes: 209715200 # And the max message size to 100 MB message.max.bytes: 104857600 # For upgrades after 2.6 inter.broker.protocol.version: "2.6" # kafka::broker::password in private-data kafka::clusters: rocquencourt: zookeeper::chroot: '/kafka/softwareheritage' zookeeper::servers: - kafka1.internal.softwareheritage.org - kafka2.internal.softwareheritage.org - kafka3.internal.softwareheritage.org - kafka4.internal.softwareheritage.org brokers: kafka1.internal.softwareheritage.org: id: 1 public_hostname: broker1.journal.softwareheritage.org kafka2.internal.softwareheritage.org: id: 2 public_hostname: broker2.journal.softwareheritage.org kafka3.internal.softwareheritage.org: id: 3 public_hostname: broker3.journal.softwareheritage.org kafka4.internal.softwareheritage.org: id: 4 public_hostname: broker4.journal.softwareheritage.org superusers: - User:swh-admin-olasd # Users connecting in the plaintext endpoint are ANONYMOUS # TODO: remove when explicit ACLs are given to producers - User:ANONYMOUS broker::heap_opts: "%{lookup('kafka::cluster::heap_ops')}" tls: true plaintext_port: 9092 public_tls_port: 9093 internal_tls_port: 9094 public_listener_network: 128.93.166.0/26 + # to label the prometheus exporter metrics + environment: production rocquencourt_staging: zookeeper::chroot: '/kafka/softwareheritage' zookeeper::servers: - journal1.internal.staging.swh.network brokers: storage1.internal.staging.swh.network: id: 2 public_hostname: broker1.journal.staging.swh.network broker::heap_opts: "%{alias('kafka::broker::heap_opts')}" superusers: - User:swh-admin-olasd # Users connecting in the plaintext endpoint are ANONYMOUS # TODO: remove when explicit ACLs are given to producers - User:ANONYMOUS tls: true plaintext_port: 9092 public_tls_port: 9093 internal_tls_port: 9094 cluster_config_overrides: offsets.topic.replication.factor: 1 # this is mandatory with only one node public_listener_network: "%{alias('kafka::cluster::public_network')}" + # to label the prometheus exporter metrics + environment: staging diff --git a/site-modules/profile/manifests/kafka/prometheus_consumer_group_exporter.pp b/site-modules/profile/manifests/kafka/prometheus_consumer_group_exporter.pp index f7d6062f..2a8fdb55 100644 --- a/site-modules/profile/manifests/kafka/prometheus_consumer_group_exporter.pp +++ b/site-modules/profile/manifests/kafka/prometheus_consumer_group_exporter.pp @@ -1,59 +1,61 @@ # Configure prometheus-kafka-consumer-group-exporter class profile::kafka::prometheus_consumer_group_exporter { $pkg = 'prometheus-kafka-consumer-group-exporter' $defaults_dir = "/etc/default/${pkg}" package {$pkg: ensure => 'installed', } file {$defaults_dir: ensure => 'directory', purge => true, recurse => true, } $kafka_clusters = lookup('kafka::clusters', Hash) $listen_network = lookup('prometheus::kafka_consumer_group::listen_network', Optional[String], 'first', undef) $listen_address = lookup('prometheus::kafka_consumer_group::listen_address', Optional[String], 'first', undef) $actual_listen_address = pick($listen_address, ip_for_network($listen_network)) $base_port = lookup('prometheus::kafka_consumer_group::base_port', Integer) $kafka_clusters.keys.each |$index, $cluster| { $defaults_file = "${defaults_dir}/${cluster}" $service = "${pkg}@${cluster}" - $bootstrap_servers = $kafka_clusters[$cluster]["brokers"].keys.sort.join(',') + $bootstrap_servers = $kafka_clusters[$cluster]['brokers'].keys.sort.join(',') $port = $base_port + $index + $environment = $kafka_clusters[$cluster]['environment'] file {$defaults_file: ensure => present, owner => 'root', group => 'root', mode => '0644', content => template('profile/kafka/prometheus-kafka-consumer-group-exporter.default.erb'), notify => Service[$service], } service {$service: - ensure => 'running', - enable => true, + ensure => 'running', + enable => true, require => [ File[$defaults_file], Package[$pkg], ], } $target = "${actual_listen_address}:${port}" profile::prometheus::export_scrape_config {"kafka-consumer-group-${cluster}": job => 'kafka-consumer-group', target => $target, labels => { - cluster => $cluster, + cluster => $cluster, + environment => $environment } } } }