From aea7d77de5cd5f0bb7d24c22db362a6f71ab7374 Mon Sep 17 00:00:00 2001 From: tidwall Date: Wed, 30 Jun 2021 14:18:44 -0700 Subject: [PATCH] Fix Memory Leak in Kafka Producer This commit addresses an issue where the sarama kafka library leaks memory when a connection closes unless the metrics configuration that was passed to new connection is also closed. Fixes #613 --- internal/endpoint/kafka.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/endpoint/kafka.go b/internal/endpoint/kafka.go index d0a04f6a..d84e0f72 100644 --- a/internal/endpoint/kafka.go +++ b/internal/endpoint/kafka.go @@ -24,6 +24,7 @@ type KafkaConn struct { mu sync.Mutex ep Endpoint conn sarama.SyncProducer + cfg *sarama.Config ex bool t time.Time } @@ -47,6 +48,8 @@ func (conn *KafkaConn) close() { if conn.conn != nil { conn.conn.Close() conn.conn = nil + conn.cfg.MetricRegistry.UnregisterAll() + conn.cfg = nil } } @@ -72,6 +75,7 @@ func (conn *KafkaConn) Send(msg string) error { log.Debugf("building kafka tls config") tlsConfig, err := newKafkaTLSConfig(conn.ep.Kafka.CertFile, conn.ep.Kafka.KeyFile, conn.ep.Kafka.CACertFile) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } cfg.Net.TLS.Enable = true @@ -103,10 +107,12 @@ func (conn *KafkaConn) Send(msg string) error { c, err := sarama.NewSyncProducer([]string{uri}, cfg) if err != nil { + cfg.MetricRegistry.UnregisterAll() return err } conn.conn = c + conn.cfg = cfg } // parse json again to get out info for our kafka key