diff --git a/fdbserver/worker/MetricLogger.actor.cpp b/fdbserver/worker/MetricLogger.cpp similarity index 79% rename from fdbserver/worker/MetricLogger.actor.cpp rename to fdbserver/worker/MetricLogger.cpp index 0f2921a5029..c73df74c46a 100644 --- a/fdbserver/worker/MetricLogger.actor.cpp +++ b/fdbserver/worker/MetricLogger.cpp @@ -1,5 +1,5 @@ /* - * MetricLogger.actor.cpp + * MetricLogger.cpp * * This source file is part of the FoundationDB open source project * @@ -36,13 +36,12 @@ #include "fdbclient/DatabaseContext.h" #include "fdbclient/ReadYourWrites.h" #include "fdbclient/KeyBackedTypes.h" -#include "MetricLogger.actor.h" +#include "MetricLogger.h" #include "MetricClient.h" #include "flow/flow.h" #include "flow/network.h" #include "flow/IUDPSocket.h" #include "flow/IConnection.h" -#include "flow/actorcompiler.h" // This must be the last #include. namespace { struct MetricsRule { @@ -154,15 +153,15 @@ struct MetricsConfig { Efficiently select matching metrics and set config Go back to wait for rule change */ -ACTOR Future metricRuleUpdater(Database cx, MetricsConfig* config, TDMetricCollection* collection) { +Future metricRuleUpdater(Database cx, MetricsConfig* config, TDMetricCollection* collection) { + Reference tr(new ReadYourWritesTransaction(cx)); - state Reference tr(new ReadYourWritesTransaction(cx)); - - loop { - state Future newMetric = collection->metricAdded.onTrigger(); + while (true) { + Future newMetric = collection->metricAdded.onTrigger(); + Error err; try { tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); - MetricsConfig::RuleMapT::RangeResultType rules = wait(config->ruleMap.getRange(tr, 0, {}, 1e6)); + MetricsConfig::RuleMapT::RangeResultType rules = co_await config->ruleMap.getRange(tr, 0, {}, 1e6); for (auto& it : collection->metricMap) { it.value->setConfig(false); @@ -172,14 +171,15 @@ ACTOR Future metricRuleUpdater(Database cx, MetricsConfig* config, TDMetri } config->rules = std::move(rules); - state Future rulesChanged = tr->watch(config->ruleChangeKey); - wait(tr->commit()); - wait(rulesChanged || newMetric); + Future rulesChanged = tr->watch(config->ruleChangeKey); + co_await tr->commit(); + co_await (rulesChanged || newMetric); tr->reset(); - + continue; } catch (Error& e) { - wait(tr->onError(e)); + err = e; } + co_await tr->onError(err); } } @@ -190,12 +190,12 @@ class MetricDB : public IMetricDB { ~MetricDB() override {} // levelKey is the prefix for the entire level, no timestamp at the end - ACTOR static Future>> getLastBlock_impl(ReadYourWritesTransaction* tr, - Standalone levelKey) { - RangeResult results = wait(tr->getRange(normalKeys.withPrefix(levelKey), 1, Snapshot::True, Reverse::True)); + static Future>> getLastBlock_impl(ReadYourWritesTransaction* tr, + Standalone levelKey) { + RangeResult results = co_await tr->getRange(normalKeys.withPrefix(levelKey), 1, Snapshot::True, Reverse::True); if (results.size() == 1) - return results[0].value; - return Optional>(); + co_return results[0].value; + co_return Optional>(); } Future>> getLastBlock(Standalone key) override { @@ -205,14 +205,14 @@ class MetricDB : public IMetricDB { ReadYourWritesTransaction* tr; }; -ACTOR Future dumpMetrics(Database cx, MetricsConfig* config, TDMetricCollection* collection) { - state MetricBatch batch; - state Standalone mk; +Future dumpMetrics(Database cx, MetricsConfig* config, TDMetricCollection* collection) { + MetricBatch batch; + Standalone mk; ASSERT(collection != nullptr); mk.prefix = StringRef(mk.arena(), config->space.key()); mk.address = StringRef(mk.arena(), collection->address); - loop { + while (true) { batch.clear(); uint64_t rollTime = std::numeric_limits::max(); if (collection->rollTimes.size()) { @@ -221,7 +221,7 @@ ACTOR Future dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle } // Are any metrics enabled? - state bool enabled = false; + bool enabled = false; // Flush data for each metric, track if any are enabled. for (auto& it : collection->metricMap) { @@ -237,21 +237,21 @@ ACTOR Future dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle collection->currentTimeBytes = 0; } - state ReadYourWritesTransaction cbtr(cx); - state MetricDB mdb(&cbtr); + ReadYourWritesTransaction cbtr(cx); + MetricDB mdb(&cbtr); - state std::map> results; + std::map> results; // Call all of the callbacks, map each index to its resulting future for (int i = 0, iend = batch.scope.callbacks.size(); i < iend; ++i) results[i] = batch.scope.callbacks[i](&mdb, &batch.scope); - loop { - state std::map>::iterator cb = results.begin(); + while (true) { + auto cb = results.begin(); // Wait for each future, return the ones that succeeded - state Error lastError; + Error lastError; while (cb != results.end()) { try { - wait(cb->second); + co_await cb->second; cb = results.erase(cb); } catch (Error& e) { ++cb; @@ -264,14 +264,14 @@ ACTOR Future dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle break; // Otherwise, wait to retry - wait(cbtr.onError(lastError)); + co_await cbtr.onError(lastError); for (auto& cb : results) cb.second = batch.scope.callbacks[cb.first](&mdb, &batch.scope); } // If there are more rolltimes then next dump is now, otherwise if no metrics are enabled then it is // whenever the next metric is enabled but if there are metrics enabled then it is in 1 second. - state Future nextDump; + Future nextDump; if (collection->rollTimes.size() > 0) nextDump = Void(); else { @@ -280,9 +280,10 @@ ACTOR Future dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle nextDump = nextDump || delay(1.0); } - state Transaction tr(cx); - loop { + Transaction tr(cx); + while (true) { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + Error err; try { for (auto& i : batch.scope.inserts) { // fprintf(stderr, "%s: dump insert: %s\n", collection->address.toString().c_str(), @@ -302,30 +303,31 @@ ACTOR Future dumpMetrics(Database cx, MetricsConfig* config, TDMetricColle tr.set(u.first, u.second); } - wait(tr.commit()); + co_await tr.commit(); break; } catch (Error& e) { - wait(tr.onError(e)); + err = e; } + co_await tr.onError(err); } - wait(nextDump); + co_await nextDump; } } // Push metric field registrations to database. -ACTOR Future updateMetricRegistration(Database cx, MetricsConfig* config, TDMetricCollection* collection) { - state Standalone mk; +Future updateMetricRegistration(Database cx, MetricsConfig* config, TDMetricCollection* collection) { + Standalone mk; mk.prefix = StringRef(mk.arena(), config->space.key()); mk.address = StringRef(mk.arena(), collection->address); - state bool addressRegistered = false; + bool addressRegistered = false; - loop { - state Future registrationChange = collection->metricRegistrationChanged.onTrigger(); - state Future newMetric = collection->metricAdded.onTrigger(); - state std::vector> keys; - state bool fieldsChanged = false; - state bool enumsChanged = false; + while (true) { + Future registrationChange = collection->metricRegistrationChanged.onTrigger(); + Future newMetric = collection->metricAdded.onTrigger(); + std::vector> keys; + bool fieldsChanged = false; + bool enumsChanged = false; // Register each metric that isn't already registered for (auto& it : collection->metricMap) { @@ -357,9 +359,10 @@ ACTOR Future updateMetricRegistration(Database cx, MetricsConfig* config, keys.push_back(config->fieldChangeKey); // Write keys collected to database - state Transaction tr(cx); - loop { + Transaction tr(cx); + while (true) { tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS); + Error err; try { Value timestamp = BinaryWriter::toValue(CompressedInt(now()), AssumeVersion(g_network->protocolVersion())); @@ -369,45 +372,46 @@ ACTOR Future updateMetricRegistration(Database cx, MetricsConfig* config, tr.set(key, timestamp); } - wait(tr.commit()); + co_await tr.commit(); break; } catch (Error& e) { - wait(tr.onError(e)); + err = e; } + co_await tr.onError(err); } // Wait for a metric to require registration or a new metric to be added - wait(registrationChange || newMetric); + co_await (registrationChange || newMetric); } } } // namespace -ACTOR Future runMetrics(Future fcx, Key prefix) { +Future runMetrics(Future fcx, Key prefix) { // Never log to an empty prefix, it's pretty much always a bad idea. if (prefix.size() == 0) { TraceEvent(SevWarnAlways, "TDMetricsRefusingEmptyPrefix").log(); - return Void(); + co_return; } // Wait until the collection has been created and initialized. - state TDMetricCollection* metrics = nullptr; - loop { + TDMetricCollection* metrics = nullptr; + while (true) { metrics = TDMetricCollection::getTDMetrics(); if (metrics != nullptr) if (metrics->init()) break; - wait(delay(1.0)); + co_await delay(1.0); } - state MetricsConfig config(prefix); + MetricsConfig config(prefix); try { - Database cx = wait(fcx); + Database cx = co_await fcx; Future conf = metricRuleUpdater(cx, &config, metrics); Future dump = dumpMetrics(cx, &config, metrics); Future reg = updateMetricRegistration(cx, &config, metrics); - wait(conf || dump || reg); + co_await (conf || dump || reg); } catch (Error& e) { if (e.code() != error_code_actor_cancelled) { // Disable all metrics @@ -418,15 +422,14 @@ ACTOR Future runMetrics(Future fcx, Key prefix) { TraceEvent(SevWarnAlways, "TDMetricsStopped").error(e); throw e; } - return Void(); } namespace { -ACTOR Future startMetricsSimulationServer(MetricsDataModel model) { +Future startMetricsSimulationServer(MetricsDataModel model) { if (model == MetricsDataModel::NONE) { - return Void{}; + co_return; } - state uint32_t port = 0; + uint32_t port = 0; switch (model) { case MetricsDataModel::STATSD: port = FLOW_KNOBS->STATSD_UDP_EMISSION_PORT; @@ -436,14 +439,14 @@ ACTOR Future startMetricsSimulationServer(MetricsDataModel model) { port = 0; } TraceEvent(SevInfo, "MetricsUDPServerStarted").detail("Address", "127.0.0.1").detail("Port", port); - state NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(port)); - state Reference serverSocket = wait(INetworkConnections::net()->createUDPSocket(localAddress)); + NetworkAddress localAddress = NetworkAddress::parse("127.0.0.1:" + std::to_string(port)); + Reference serverSocket = co_await INetworkConnections::net()->createUDPSocket(localAddress); serverSocket->bind(localAddress); - state Standalone packetString = makeString(IUDPSocket::MAX_PACKET_SIZE); - state uint8_t* packet = mutateString(packetString); + Standalone packetString = makeString(IUDPSocket::MAX_PACKET_SIZE); + uint8_t* packet = mutateString(packetString); - loop { - int size = wait(serverSocket->receive(packet, packet + IUDPSocket::MAX_PACKET_SIZE)); + while (true) { + int size = co_await serverSocket->receive(packet, packet + IUDPSocket::MAX_PACKET_SIZE); auto message = packetString.substr(0, size); // Let's just focus on statsd for now. For statsd, the message is expected to be separated by newlines. We need @@ -462,24 +465,24 @@ ACTOR Future startMetricsSimulationServer(MetricsDataModel model) { } } // namespace -ACTOR Future runMetrics() { - state MetricCollection* metrics = nullptr; +Future runMetrics() { + MetricCollection* metrics = nullptr; MetricsDataModel model = knobToMetricModel(FLOW_KNOBS->METRICS_DATA_MODEL); if (model == MetricsDataModel::NONE) { - return Void{}; + co_return; } - state UDPMetricClient metricClient; - state Future metricsActor; + UDPMetricClient metricClient; + Future metricsActor; if (g_network->isSimulated()) { metricsActor = startMetricsSimulationServer(model); } - loop { + while (true) { metrics = MetricCollection::getMetricCollection(); if (metrics != nullptr) { metricClient.send(metrics); } - wait(delay(FLOW_KNOBS->METRICS_EMISSION_INTERVAL)); + co_await delay(FLOW_KNOBS->METRICS_EMISSION_INTERVAL); } } @@ -492,18 +495,19 @@ TEST_CASE("/fdbserver/metrics/TraceEvents") { std::string metricsPrefix = getenv2("METRICS_PREFIX"); if (metricsConnFile == "") { fprintf(stdout, "Metrics cluster file must be specified in environment variable METRICS_CONNFILE\n"); - return Void(); + co_return; } fprintf(stdout, "Using environment variables METRICS_CONNFILE and METRICS_PREFIX.\n"); - state Database metricsDb = Database::createDatabase(metricsConnFile, ApiVersion::LATEST_VERSION); + Database metricsDb = Database::createDatabase(metricsConnFile, ApiVersion::LATEST_VERSION); TDMetricCollection::getTDMetrics()->address = "0.0.0.0:0"_sr; - state Future metrics = runMetrics(metricsDb, KeyRef(metricsPrefix)); - state int64_t x = 0; + // Keep the metrics task alive while this test emits trace events. + [[maybe_unused]] Future metrics = runMetrics(metricsDb, KeyRef(metricsPrefix)); + int64_t x = 0; - state double w = 0.5; - state int chunk = 4000; - state int total = 200000; + double w = 0.5; + int chunk = 4000; + int total = 200000; fprintf(stdout, "Writing trace event named Dummy with fields a, b, c, d, j, k, s, x, y, z.\n"); fprintf(stdout, " There is a %f second pause every %d events\n", w, chunk); @@ -516,15 +520,15 @@ TEST_CASE("/fdbserver/metrics/TraceEvents") { fprintf(stdout, " d is always present, is a string, and rotates through the values 'one', 'two', and ''.\n"); fprintf(stdout, " Plotting j on the x axis and k on the y axis should look like x=sin(2t), y=sin(3t)\n"); - state Int64MetricHandle intMetric = Int64MetricHandle("DummyInt"_sr); - state BoolMetricHandle boolMetric = BoolMetricHandle("DummyBool"_sr); - state StringMetricHandle stringMetric = StringMetricHandle("DummyString"_sr); + Int64MetricHandle intMetric = Int64MetricHandle("DummyInt"_sr); + BoolMetricHandle boolMetric = BoolMetricHandle("DummyBool"_sr); + StringMetricHandle stringMetric = StringMetricHandle("DummyString"_sr); static const char* dStrings[] = { "one", "two", "" }; - state const char** d = dStrings; - state Arena arena; + const char** d = dStrings; + Arena arena; - loop { + while (true) { { double sstart = x; for (int i = 0; i < chunk; ++i, ++x) { @@ -542,7 +546,7 @@ TEST_CASE("/fdbserver/metrics/TraceEvents") { .detail("K", sin(3.0 * x)) .detail("S", sstart + (double)chunk * sin(10.0 * i / chunk)); } - wait(delay(w)); + co_await delay(w); } { @@ -560,7 +564,7 @@ TEST_CASE("/fdbserver/metrics/TraceEvents") { .detail("K", sin(3.0 * x)) .detail("S", sstart + (double)chunk * sin(40.0 * i / chunk)); } - wait(delay(w)); + co_await delay(w); } { @@ -578,10 +582,10 @@ TEST_CASE("/fdbserver/metrics/TraceEvents") { .detail("K", sin(3.0 * x)) .detail("S", sstart + (double)chunk * sin(160.0 * i / chunk)); } - wait(delay(w)); + co_await delay(w); if (x >= total) - return Void(); + co_return; } } } diff --git a/fdbserver/worker/MetricLogger.actor.h b/fdbserver/worker/MetricLogger.h similarity index 61% rename from fdbserver/worker/MetricLogger.actor.h rename to fdbserver/worker/MetricLogger.h index 1b5c05125f5..9f7e1fd0288 100644 --- a/fdbserver/worker/MetricLogger.actor.h +++ b/fdbserver/worker/MetricLogger.h @@ -1,5 +1,5 @@ /* - * MetricLogger.actor.h + * MetricLogger.h * * This source file is part of the FoundationDB open source project * @@ -20,21 +20,10 @@ #pragma once -#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_WORKER_METRICLOGGER_ACTOR_G_H) -#define FDBSERVER_WORKER_METRICLOGGER_ACTOR_G_H -#include "MetricLogger.actor.g.h" -#elif !defined(FDBSERVER_WORKER_METRICLOGGER_ACTOR_H) -#define FDBSERVER_WORKER_METRICLOGGER_ACTOR_H - #include "fdbclient/FDBTypes.h" #include "flow/flow.h" class Database; -#include "flow/actorcompiler.h" // This must be the last #include - -ACTOR Future runMetrics(Future fcx, Key metricsPrefix); -ACTOR Future runMetrics(); - -#include "flow/unactorcompiler.h" -#endif +Future runMetrics(Future fcx, Key metricsPrefix); +Future runMetrics(); diff --git a/fdbserver/worker/worker.actor.cpp b/fdbserver/worker/worker.actor.cpp index 50bf37daf58..0deb4a79209 100644 --- a/fdbserver/worker/worker.actor.cpp +++ b/fdbserver/worker/worker.actor.cpp @@ -47,7 +47,7 @@ #include "flow/TDMetric.h" #include "fdbrpc/simulator.h" #include "fdbclient/NativeAPI.actor.h" -#include "MetricLogger.actor.h" +#include "MetricLogger.h" #include "fdbserver/backupworker/BackupWorker.h" #include "fdbserver/backupworker/BackupWorkerRangePartitioned.h" #include "fdbserver/clustercontroller/ClusterController.h"