Index: daemon/log-daemon/BUILD.bazel =================================================================== --- daemon/log-daemon/BUILD.bazel +++ daemon/log-daemon/BUILD.bazel @@ -345,6 +345,10 @@ "com.databricks.daemon.log.LogDaemonConfSuite", {"tags": ["cpu:5"]}, ), + ( + "com.databricks.daemon.log.FeatureFlagMonitorSuite", + {"tags": ["cpu:4"]}, + ), ( "com.databricks.daemon.log.LogDaemonSuite", { @@ -1024,6 +1028,7 @@ "src/main/scala/com/databricks/daemon/log/conf/CheckManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/DataVolumeCalculatorConf.scala", "src/main/scala/com/databricks/daemon/log/conf/EnvConf.scala", + "src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala", "src/main/scala/com/databricks/daemon/log/conf/FileManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/K8sConf.scala", "src/main/scala/com/databricks/daemon/log/conf/LogDaemonConfParser.scala", Index: daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala @@ -0,0 +1,90 @@ +package com.databricks.daemon.log.conf + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.util.Random + +import com.databricks.featureflag.client.{DynamicConf, FeatureFlagDefinition} +import com.databricks.threading.NamedTimer + +trait Reconfigurable { + def reconfigure(): Unit +} + +trait FeatureFlagMonitor extends DynamicConf { + private[this] val changeTrackers = new ArrayBuffer[() => Boolean] + private[this] val reconfigurables = new ArrayBuffer[Reconfigurable] + + def createMonitoredFlag[T: Manifest](flagName: String, defaultValue: T): FeatureFlag[T] = { + val flag = FeatureFlag(flagName, defaultValue) + + var lastValue = flag.getCurrentValue() + val hasChanged = () => { + val currentValue = flag.getCurrentValue() + val changed = currentValue != lastValue + if (changed) { + logger.info(s"FeatureFlag ${flag.flagName} has changed") + lastValue = currentValue + } + changed + } + + this.synchronized { + changeTrackers.append(hasChanged) + } + + flag + } + + def registerReconfigurable(obj: Reconfigurable): Unit = this.synchronized { + reconfigurables.append(obj) + } + + @FeatureFlagDefinition( + team = "eng-observability-team", + description = + "SAFE flag to determine the interval (in seconds) between successive checks for flag changes" + ) + private[log] val flagCheckIntervalSec = + FeatureFlag("databricks.daemon.log.featureFlagMonitorInterval", 1) + + @FeatureFlagDefinition( + team = "eng-observability-team", + description = "Set this SAFE flag to True to enable reconfigurations upon FeatureFlag updates" + ) + private[log] val enableReconfig = + FeatureFlag("databricks.daemon.log.featureFlagMonitorEnableReconfig", false) + + private val periodicThread = new NamedTimer(this.getClass.getName) + periodicThread.runOnceAfterDelay(computeMonitorDelay()) { runOnce() } + + private def runOnce(): Unit = { + maybeReconfigure() + periodicThread.runOnceAfterDelay(computeMonitorDelay()) { runOnce() } + } + + private[log] def maybeReconfigure(): Unit = { + this.synchronized { + val needReconfig: Boolean = changeTrackers.exists(_()) + if (!needReconfig || !enableReconfig.getCurrentValue()) { + return + } + reconfigurables.foreach( + obj => + try { + obj.reconfigure() + } catch { + case ex: Exception => + logger.error(s"Could not reconfigure object of type ${obj.getClass.getName}", ex) + } + ) + } + } + + private def computeMonitorDelay(): FiniteDuration = { + val delay = flagCheckIntervalSec.getCurrentValue() + // Inject some jitter to prevent a large number of instantaneous reconfigurations across distributed subscribers to + // the same flag updates + (delay + new Random().nextInt(delay)).seconds + } +} Index: daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala @@ -0,0 +1,99 @@ +package com.databricks.daemon.log + +import java.util.concurrent.CountDownLatch + +import com.databricks.backend.common.util.Project +import com.databricks.conf.trusted.ProjectConf +import com.databricks.daemon.log.conf.{FeatureFlagMonitor, Reconfigurable} +import com.databricks.featureflag.client.FeatureFlagDefinition +import com.databricks.featureflag.client.experimentation.MockFeatureFlagReaderProvider +import com.databricks.testing.DatabricksTest +import com.typesafe.config.ConfigFactory +import org.mockito.Mockito.when + +case class DummyReconfigurable(var reconfigCount: Int = 0) extends Reconfigurable { + override def reconfigure(): Unit = { + reconfigCount += 1 + } +} + +class TestConfig + extends ProjectConf(Project.TestProject, ConfigFactory.parseString("")) + with MockFeatureFlagReaderProvider + with FeatureFlagMonitor { + @FeatureFlagDefinition(team = "eng-observability-team", description = "Test Flag") + private[log] val testFlag = + createMonitoredFlag("databricks.daemon.log.testFlag", defaultValue = 0) + + def updateFlagAndReconfigure(): Unit = { + mockFeatureFlagReader.setMockValue(testFlag.flagName, testFlag.getCurrentValue() + 1) + maybeReconfigure() + } +} + +class FeatureFlagMonitorSuite extends DatabricksTest { + test("FeatureFlagMonitorSuite - reconfigure upon flag change") { + val conf = new TestConfig + conf.mockFeatureFlagReader.setMockValue(conf.enableReconfig.flagName, value = true) + + val reconfigurable = DummyReconfigurable() + conf.registerReconfigurable(reconfigurable) + + conf.maybeReconfigure() + reconfigurable.reconfigCount should be(0) + + conf.updateFlagAndReconfigure() + reconfigurable.reconfigCount should be(1) + + conf.updateFlagAndReconfigure() + reconfigurable.reconfigCount should be(2) + } + + test("FeatureFlagMonitorSuite - should not reconfigure if disabled") { + val conf = new TestConfig + conf.mockFeatureFlagReader.setMockValue(conf.enableReconfig.flagName, value = false) + + val reconfigurable = DummyReconfigurable() + conf.registerReconfigurable(reconfigurable) + + conf.updateFlagAndReconfigure() + reconfigurable.reconfigCount should be(0) + } + + test("FeatureFlagMonitorSuite - gracefully handle reconfig failures") { + val conf = new TestConfig + conf.mockFeatureFlagReader.setMockValue(conf.enableReconfig.flagName, value = true) + + val healthyReconfigurable = DummyReconfigurable() + conf.registerReconfigurable(healthyReconfigurable) + + val faultyReconfigurable = basicMock[DummyReconfigurable] + when(faultyReconfigurable.reconfigure()).thenThrow(new RuntimeException) + conf.registerReconfigurable(faultyReconfigurable) + + conf.updateFlagAndReconfigure() + healthyReconfigurable.reconfigCount should be(1) + } + + test("FeatureFlagMonitorSuite - periodically check flag updates") { + val conf = new TestConfig + conf.mockFeatureFlagReader.setMockValue(conf.enableReconfig.flagName, value = true) + // Reduce the check interval for unit tests + conf.mockFeatureFlagReader.setMockValue(conf.flagCheckIntervalSec.flagName, value = 1) + + val reconfigurable = basicMock[DummyReconfigurable] + val reconfigLatch = new CountDownLatch(1) + var reconfigCount = 0 + when(reconfigurable.reconfigure()).thenAnswer(_ -> { + reconfigLatch.countDown() + reconfigCount += 1 + }) + conf.registerReconfigurable(reconfigurable) + + conf.mockFeatureFlagReader + .setMockValue(conf.testFlag.flagName, conf.testFlag.getCurrentValue() + 1) + reconfigLatch.await() + + reconfigCount should be(1) + } +}