Index: daemon/log-daemon/BUILD.bazel =================================================================== --- daemon/log-daemon/BUILD.bazel +++ daemon/log-daemon/BUILD.bazel @@ -345,6 +345,10 @@ "com.databricks.daemon.log.LogDaemonConfSuite", {"tags": ["cpu:5"]}, ), + ( + "com.databricks.daemon.log.FeatureFlagMonitorSuite", + {"tags": ["cpu:4"]}, + ), ( "com.databricks.daemon.log.LogDaemonSuite", { @@ -1024,6 +1028,7 @@ "src/main/scala/com/databricks/daemon/log/conf/CheckManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/DataVolumeCalculatorConf.scala", "src/main/scala/com/databricks/daemon/log/conf/EnvConf.scala", + "src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala", "src/main/scala/com/databricks/daemon/log/conf/FileManagerConf.scala", "src/main/scala/com/databricks/daemon/log/conf/K8sConf.scala", "src/main/scala/com/databricks/daemon/log/conf/LogDaemonConfParser.scala", @@ -1049,6 +1054,7 @@ deps = [ "{parent}/com.amazonaws/aws-java-sdk-core", "{parent}/com.typesafe/config", + "//maven/com.google.guava/guava", ], ) Index: daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/main/scala/com/databricks/daemon/log/conf/FeatureFlagMonitor.scala @@ -0,0 +1,127 @@ +package com.databricks.daemon.log.conf + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.util.Random +import scala.util.control.NonFatal + +import com.google.common.annotations.VisibleForTesting + +import com.databricks.featureflag.client.{DynamicConf, FeatureFlagDefinition} +import com.databricks.threading.NamedTimer + +/** Interface for objects that could reconfigure themselves during runtime */ +trait Reconfigurable { + def reconfigure(): Unit +} + +/** + * This trait periodically tracks changes to SAFE flags and notifies the registered observers to reconfigure themselves + * in accordance with these flag changes + * Thread-safe + */ +trait FeatureFlagMonitor extends DynamicConf { + + /** Collection of closures, one for each monitored SAFE flag, that returns True if the flag has been updated */ + private[this] val changeTrackers = new ArrayBuffer[() => Boolean] + private[this] val reconfigurables = new ArrayBuffer[Reconfigurable] + + /** + * Creates a SAFE flag that could be periodically monitored for changes + * @param flagName Name of SAFE flag to create + * @param defaultValue Default value of SAFE flag + * @tparam T Type of SAFE flag + * @return A FeatureFlag object + */ + protected def createMonitoredFlag[T: Manifest]( + flagName: String, + defaultValue: T): FeatureFlag[T] = { + val flag = FeatureFlag(flagName, defaultValue) + + var lastValue = flag.getCurrentValue() + val hasChanged = () => { + val currentValue = flag.getCurrentValue() + val changed = currentValue != lastValue + if (changed) { + logger.info(s"FeatureFlag ${flag.flagName} has changed") + lastValue = currentValue + } + changed + } + + this.synchronized { + changeTrackers.append(hasChanged) + } + + flag + } + + /** + * Registers an object to be notified for reconfigurations when any of the monitored SAFE flags changes + * @param obj A reconfigurable object + */ + def registerReconfigurable(obj: Reconfigurable): Unit = this.synchronized { + reconfigurables.append(obj) + } + + @FeatureFlagDefinition( + team = "eng-observability-team", + description = "Set this SAFE flag to True to enable reconfigurations upon FeatureFlag updates" + ) + @VisibleForTesting + private[log] val enableReconfig = + FeatureFlag("databricks.daemon.log.featureFlagMonitorEnableReconfig", false) + + @FeatureFlagDefinition( + team = "eng-observability-team", + description = + "SAFE flag to determine the interval (in seconds and subject to some jitter) between successive checks for flag changes" + ) + @VisibleForTesting + private[log] val flagCheckIntervalSec = + FeatureFlag("databricks.daemon.log.featureFlagMonitorInterval", 1) + + private val periodicThread = new NamedTimer(this.getClass.getName) + periodicThread.runOnceAfterDelay(computeMonitorDelay()) { + runOnce() + } + + /** Executes the reconfiguration logic as well as schedules the next round of reconfigurations */ + private def runOnce(): Unit = { + maybeReconfigure() + periodicThread.runOnceAfterDelay(computeMonitorDelay()) { runOnce() } + } + + /** + * Checks monitored SAFE flags for any recent changes and notifies observers to reconfigure themselves if any changes + * are detected + */ + @VisibleForTesting + private[log] def maybeReconfigure(): Unit = { + this.synchronized { + val needReconfig: Boolean = changeTrackers.exists(_()) + if (!needReconfig || !enableReconfig.getCurrentValue()) { + return + } + reconfigurables.foreach( + obj => + try { + obj.reconfigure() + } catch { + case NonFatal(ex) => + logger.error(s"Could not reconfigure object of type ${obj.getClass.getName}", ex) + } + ) + } + } + + /** + * @return Time delay before the next round of reconfigurations + */ + private def computeMonitorDelay(): FiniteDuration = { + val delay = flagCheckIntervalSec.getCurrentValue() + // Inject some jitter to prevent a large number of instantaneous reconfigurations across distributed subscribers to + // the same flag updates + (delay + new Random().nextInt(delay)).seconds + } +} Index: daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala =================================================================== --- /dev/null +++ daemon/log-daemon/src/test/scala/com/databricks/daemon/log/conf/FeatureFlagMonitorSuite.scala @@ -0,0 +1,97 @@ +package com.databricks.daemon.log + +import java.util.concurrent.CountDownLatch + +import com.databricks.backend.common.util.Project +import com.databricks.conf.trusted.ProjectConf +import com.databricks.daemon.log.conf.{FeatureFlagMonitor, Reconfigurable} +import com.databricks.featureflag.client.FeatureFlagDefinition +import com.databricks.featureflag.client.experimentation.MockFeatureFlagReaderProvider +import com.databricks.testing.DatabricksTest +import com.typesafe.config.ConfigFactory +import org.mockito.Mockito.when + +/** Simple reconfigurable object that counts how many times it is asked to reconfigure */ +case class DummyReconfigurable(var reconfigCount: Int = 0) extends Reconfigurable { + override def reconfigure(): Unit = { + reconfigCount += 1 + } +} + +class TestConfig + extends ProjectConf(Project.TestProject, ConfigFactory.parseString("")) + with MockFeatureFlagReaderProvider + with FeatureFlagMonitor { + @FeatureFlagDefinition(team = "eng-observability-team", description = "Test Flag") + private[log] val testFlag = + createMonitoredFlag("databricks.daemon.log.testFlag", defaultValue = 0) + + /** Updates the monitored SAFE flag and triggers a round of reconfigurations */ + def updateFlagAndReconfigure(): Unit = { + mockFeatureFlagReader.setMockValue(testFlag.flagName, testFlag.getCurrentValue() + 1) + maybeReconfigure() + } +} + +class FeatureFlagMonitorSuite extends DatabricksTest { + private[this] val conf = new TestConfig + + override def beforeEach(): Unit = { + conf.mockFeatureFlagReader.setMockValue(conf.enableReconfig.flagName, value = true) + } + + test("FeatureFlagMonitorSuite - reconfigure upon flag change") { + val reconfigurable = DummyReconfigurable() + conf.registerReconfigurable(reconfigurable) + + conf.maybeReconfigure() + reconfigurable.reconfigCount should be(0) + + conf.updateFlagAndReconfigure() + reconfigurable.reconfigCount should be(1) + + conf.updateFlagAndReconfigure() + reconfigurable.reconfigCount should be(2) + } + + test("FeatureFlagMonitorSuite - should not reconfigure if disabled") { + conf.mockFeatureFlagReader.setMockValue(conf.enableReconfig.flagName, value = false) + val reconfigurable = DummyReconfigurable() + conf.registerReconfigurable(reconfigurable) + + conf.updateFlagAndReconfigure() + reconfigurable.reconfigCount should be(0) + } + + test("FeatureFlagMonitorSuite - gracefully handle reconfig failures") { + val healthyReconfigurable = DummyReconfigurable() + conf.registerReconfigurable(healthyReconfigurable) + + val faultyReconfigurable = basicMock[DummyReconfigurable] + when(faultyReconfigurable.reconfigure()).thenThrow(new RuntimeException) + conf.registerReconfigurable(faultyReconfigurable) + + conf.updateFlagAndReconfigure() + healthyReconfigurable.reconfigCount should be(1) + } + + test("FeatureFlagMonitorSuite - periodically check flag updates") { + // Reduce the check interval for unit tests + conf.mockFeatureFlagReader.setMockValue(conf.flagCheckIntervalSec.flagName, value = 1) + + val reconfigurable = basicMock[Reconfigurable] + val reconfigLatch = new CountDownLatch(1) + var reconfigCount = 0 + when(reconfigurable.reconfigure()).thenAnswer(_ -> { + reconfigLatch.countDown() + reconfigCount += 1 + }) + conf.registerReconfigurable(reconfigurable) + + conf.mockFeatureFlagReader + .setMockValue(conf.testFlag.flagName, conf.testFlag.getCurrentValue() + 1) + reconfigLatch.await() + + reconfigCount should be(1) + } +}