Index: daemon/log-daemon/BUILD.bazel =================================================================== --- daemon/log-daemon/BUILD.bazel +++ daemon/log-daemon/BUILD.bazel @@ -345,6 +345,10 @@ "com.databricks.daemon.log.LogDaemonConfSuite", {"tags": ["cpu:5"]}, ), + ( + "com.databricks.daemon.log.FeatureFlagMonitorSuite", + {"tags": ["cpu:4"]}, + ), ( "com.databricks.daemon.log.LogDaemonSuite", { @@ -1024,6 +1028,7 @@ "src/main/scala/com/databricks/daemon/log/conf/CheckManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/DataVolumeCalculatorConf.scala", "src/main/scala/com/databricks/daemon/log/conf/EnvConf.scala", + "src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala", "src/main/scala/com/databricks/daemon/log/conf/FileManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/K8sConf.scala", "src/main/scala/com/databricks/daemon/log/conf/LogDaemonConfParser.scala", Index: daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala @@ -0,0 +1,53 @@ +package com.databricks.daemon.log.conf + +import com.databricks.featureflag.client.DynamicConf +import com.databricks.threading.NamedTimer + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ + +trait Reconfigurable { + def reconfigure(): Unit +} + +trait FeatureFlagMonitor extends DynamicConf { + private[this] val changeTrackers = new ArrayBuffer[() => Boolean] + private[this] val reconfigurables = new ArrayBuffer[Reconfigurable] + private[this] val periodicThread = new NamedTimer("DynamicFlagChecker") + // TODO(hieu.le): Make delay configurable and jitter initial delay + periodicThread.scheduleWithFixedDelayWithInitialDelay(delay = 20.seconds, interval = 20.seconds) { + maybeReconfigure() + } + + def createMonitoredFlag[T: Manifest](flagName: String, defaultValue: T): FeatureFlag[T] = { + val flag = FeatureFlag(flagName, defaultValue) + + var lastValue = flag.getCurrentValue() + val hasChanged = () => { + val currentValue = flag.getCurrentValue() + val changed = currentValue != lastValue + if (changed) { + logger.info(s"FeatureFlag ${flag.flagName} has changed") + lastValue = currentValue + } + changed + } + + this.synchronized { + changeTrackers.append(hasChanged) + } + + flag + } + + def registerReconfigurable(obj: Reconfigurable): Unit = this.synchronized { + reconfigurables.append(obj) + } + + private[this] def maybeReconfigure(): Unit = this.synchronized { + val needReconfig: Boolean = changeTrackers.exists(_()) + if (needReconfig) { + reconfigurables.foreach(_.reconfigure()) + } + } +} Index: daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala @@ -0,0 +1,25 @@ +package com.databricks.daemon.log + +import com.databricks.backend.common.util.Project +import com.databricks.conf.trusted.ProjectConf +import com.databricks.daemon.log.conf.{FeatureFlagMonitor, Reconfigurable} +import com.databricks.testing.DatabricksTest +import com.typesafe.config.ConfigFactory + +case class DummyReconfigurable(var reconfigCount: Int = 0) extends Reconfigurable { + override def reconfigure(): Unit = { + reconfigCount += 1 + } +} + +class TestFeatureFlagMonitor + extends ProjectConf(Project.TestProject, ConfigFactory.parseString("")) + with FeatureFlagMonitor {} + +class FeatureFlagMonitorSuite extends DatabricksTest { + test("FeatureFlagMonitorSuite - reconfigure upon flag change") { + val monitor = new TestFeatureFlagMonitor + val reconfigurable = DummyReconfigurable() + monitor.registerReconfigurable(reconfigurable) + } +}