Index: daemon/log-daemon/BUILD.bazel =================================================================== --- daemon/log-daemon/BUILD.bazel +++ daemon/log-daemon/BUILD.bazel @@ -345,6 +345,10 @@ "com.databricks.daemon.log.LogDaemonConfSuite", {"tags": ["cpu:5"]}, ), + ( + "com.databricks.daemon.log.FeatureFlagMonitorSuite", + {"tags": ["cpu:4"]}, + ), ( "com.databricks.daemon.log.LogDaemonSuite", { @@ -1024,6 +1028,7 @@ "src/main/scala/com/databricks/daemon/log/conf/CheckManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/DataVolumeCalculatorConf.scala", "src/main/scala/com/databricks/daemon/log/conf/EnvConf.scala", + "src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala", "src/main/scala/com/databricks/daemon/log/conf/FileManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/K8sConf.scala", "src/main/scala/com/databricks/daemon/log/conf/LogDaemonConfParser.scala", Index: daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala @@ -0,0 +1,53 @@ +package com.databricks.daemon.log.conf + +import com.databricks.featureflag.client.DynamicConf +import com.databricks.threading.NamedTimer + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ + +trait Reconfigurable { + def reconfigure(): Unit +} + +trait FeatureFlagMonitor extends DynamicConf { + private[this] val changeTrackers = new ArrayBuffer[() => Boolean] + private[this] val reconfigurables = new ArrayBuffer[Reconfigurable] + private[this] val periodicThread = new NamedTimer("DynamicFlagChecker") + // TODO(hieu.le): Make delay configurable and jitter initial delay + periodicThread.scheduleWithFixedDelayWithInitialDelay(delay = 1.seconds, interval = 1.seconds) { + maybeReconfigure() + } + + def createMonitoredFlag[T: Manifest](flagName: String, defaultValue: T): FeatureFlag[T] = { + val flag = FeatureFlag(flagName, defaultValue) + + var lastValue = flag.getCurrentValue() + val hasChanged = () => { + val currentValue = flag.getCurrentValue() + val changed = currentValue != lastValue + if (changed) { + logger.info(s"FeatureFlag ${flag.flagName} has changed") + lastValue = currentValue + } + changed + } + + this.synchronized { + changeTrackers.append(hasChanged) + } + + flag + } + + def registerReconfigurable(obj: Reconfigurable): Unit = this.synchronized { + reconfigurables.append(obj) + } + + private[this] def maybeReconfigure(): Unit = this.synchronized { + val needReconfig: Boolean = changeTrackers.exists(_()) + if (needReconfig) { + reconfigurables.foreach(_.reconfigure()) + } + } +} Index: daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala @@ -0,0 +1,40 @@ +package com.databricks.daemon.log + +import com.databricks.backend.common.util.Project +import com.databricks.conf.trusted.ProjectConf +import com.databricks.daemon.log.conf.{FeatureFlagMonitor, Reconfigurable} +import com.databricks.featureflag.client.FeatureFlagDefinition +import com.databricks.featureflag.client.experimentation.MockFeatureFlagReaderProvider +import com.databricks.testing.DatabricksTest +import com.typesafe.config.ConfigFactory + +import java.util.concurrent.CountDownLatch + +case class DummyReconfigurable(var reconfigCount: Int = 0) extends Reconfigurable { + var reconfigLatch = new CountDownLatch(1) + override def reconfigure(): Unit = { + reconfigCount += 1 + reconfigLatch.countDown() + } +} + +class TestConfig + extends ProjectConf(Project.TestProject, ConfigFactory.parseString("")) + with FeatureFlagMonitor + with MockFeatureFlagReaderProvider { + @FeatureFlagDefinition(team = "eng-observability-team", description = "Test Flag") + private[log] val testFlag = + createMonitoredFlag("databricks.daemon.log.testFlag", defaultValue = 0) +} + +class FeatureFlagMonitorSuite extends DatabricksTest { + test("FeatureFlagMonitorSuite - reconfigure upon flag change") { + val conf = new TestConfig + val reconfigurable = DummyReconfigurable() + conf.registerReconfigurable(reconfigurable) + + conf.mockFeatureFlagReader.setMockValue(conf.testFlag.flagName, value = 1) + reconfigurable.reconfigLatch.await() + reconfigurable.reconfigCount should be(1) + } +}