Add API to let metrics directly drop data without writing to an output.
+ Metrics will do flushIfNeeded() to correctly move the clock and informing AnomalyTracker the past bucket info, and then clear past buckets. + We will still keep the current bucket data for the validity of the future metrics. Bug: 70571383 Test: statsd_test Change-Id: Ib13c45574974e7b4e82bd8f305091dc93bda76f5
This commit is contained in:
@ -330,9 +330,7 @@ void StatsLogProcessor::flushIfNecessaryLocked(
|
||||
mLastByteSizeTimes[key] = timestampNs;
|
||||
if (totalBytes >
|
||||
StatsdStats::kMaxMetricsBytesPerConfig) { // Too late. We need to start clearing data.
|
||||
// TODO(b/70571383): By 12/15/2017 add API to drop data directly
|
||||
ProtoOutputStream proto;
|
||||
metricsManager.onDumpReport(timestampNs, &proto);
|
||||
metricsManager.dropData(timestampNs);
|
||||
StatsdStats::getInstance().noteDataDropped(key);
|
||||
VLOG("StatsD had to toss out metrics for %s", key.ToString().c_str());
|
||||
} else if (totalBytes > .9 * StatsdStats::kMaxMetricsBytesPerConfig) {
|
||||
|
@ -113,7 +113,6 @@ private:
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize);
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
|
||||
FRIEND_TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForSumDuration1);
|
||||
FRIEND_TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForSumDuration2);
|
||||
FRIEND_TEST(WakelockDurationE2eTest, TestAggregatedPredicateDimensionsForSumDuration3);
|
||||
|
@ -151,8 +151,11 @@ void CountMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
protoOutput->end(protoToken);
|
||||
|
||||
mPastBuckets.clear();
|
||||
}
|
||||
|
||||
// TODO: Clear mDimensionKeyMap once the report is dumped.
|
||||
void CountMetricProducer::dropDataLocked(const uint64_t dropTimeNs) {
|
||||
flushIfNeededLocked(dropTimeNs);
|
||||
mPastBuckets.clear();
|
||||
}
|
||||
|
||||
void CountMetricProducer::onConditionChangedLocked(const bool conditionMet,
|
||||
|
@ -69,6 +69,8 @@ private:
|
||||
|
||||
void dumpStatesLocked(FILE* out, bool verbose) const override{};
|
||||
|
||||
void dropDataLocked(const uint64_t dropTimeNs) override;
|
||||
|
||||
// Util function to flush the old packet.
|
||||
void flushIfNeededLocked(const uint64_t& newEventTime) override;
|
||||
|
||||
|
@ -221,6 +221,11 @@ void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet,
|
||||
}
|
||||
}
|
||||
|
||||
void DurationMetricProducer::dropDataLocked(const uint64_t dropTimeNs) {
|
||||
flushIfNeededLocked(dropTimeNs);
|
||||
mPastBuckets.clear();
|
||||
}
|
||||
|
||||
void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
ProtoOutputStream* protoOutput) {
|
||||
flushIfNeededLocked(dumpTimeNs);
|
||||
|
@ -74,6 +74,8 @@ private:
|
||||
|
||||
void dumpStatesLocked(FILE* out, bool verbose) const override;
|
||||
|
||||
void dropDataLocked(const uint64_t dropTimeNs) override;
|
||||
|
||||
// Util function to flush the old packet.
|
||||
void flushIfNeededLocked(const uint64_t& eventTime);
|
||||
|
||||
|
@ -75,6 +75,10 @@ EventMetricProducer::~EventMetricProducer() {
|
||||
VLOG("~EventMetricProducer() called");
|
||||
}
|
||||
|
||||
void EventMetricProducer::dropDataLocked(const uint64_t dropTimeNs) {
|
||||
mProto->clear();
|
||||
}
|
||||
|
||||
void EventMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,8 @@ private:
|
||||
// Internal interface to handle sliced condition change.
|
||||
void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override;
|
||||
|
||||
void dropDataLocked(const uint64_t dropTimeNs) override;
|
||||
|
||||
// Internal function to calculate the current used bytes.
|
||||
size_t byteSizeLocked() const override;
|
||||
|
||||
|
@ -353,6 +353,11 @@ void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
|
||||
}
|
||||
}
|
||||
|
||||
void GaugeMetricProducer::dropDataLocked(const uint64_t dropTimeNs) {
|
||||
flushIfNeededLocked(dropTimeNs);
|
||||
mPastBuckets.clear();
|
||||
}
|
||||
|
||||
// When a new matched event comes in, we check if event falls into the current
|
||||
// bucket. If not, flush the old counter to past buckets and initialize the new
|
||||
// bucket.
|
||||
|
@ -108,6 +108,8 @@ private:
|
||||
|
||||
void dumpStatesLocked(FILE* out, bool verbose) const override{};
|
||||
|
||||
void dropDataLocked(const uint64_t dropTimeNs) override;
|
||||
|
||||
// Util function to flush the old packet.
|
||||
void flushIfNeededLocked(const uint64_t& eventTime) override;
|
||||
|
||||
|
@ -148,6 +148,15 @@ public:
|
||||
return mMetricId;
|
||||
}
|
||||
|
||||
// Let MetricProducer drop in-memory data to save memory.
|
||||
// We still need to keep future data valid and anomaly tracking work, which means we will
|
||||
// have to flush old data, informing anomaly trackers then safely drop old data.
|
||||
// We still keep current bucket data for future metrics' validity.
|
||||
void dropData(const uint64_t dropTimeNs) {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
dropDataLocked(dropTimeNs);
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void onConditionChangedLocked(const bool condition, const uint64_t eventTime) = 0;
|
||||
virtual void onSlicedConditionMayChangeLocked(const uint64_t eventTime) = 0;
|
||||
@ -179,6 +188,8 @@ protected:
|
||||
return mStartTimeNs + (mCurrentBucketNum + 1) * mBucketSizeNs;
|
||||
}
|
||||
|
||||
virtual void dropDataLocked(const uint64_t dropTimeNs) = 0;
|
||||
|
||||
const int64_t mMetricId;
|
||||
|
||||
const ConfigKey mConfigKey;
|
||||
|
@ -173,6 +173,12 @@ void MetricsManager::dumpStates(FILE* out, bool verbose) {
|
||||
}
|
||||
}
|
||||
|
||||
void MetricsManager::dropData(const uint64_t dropTimeNs) {
|
||||
for (const auto& producer : mAllMetricProducers) {
|
||||
producer->dropData(dropTimeNs);
|
||||
}
|
||||
}
|
||||
|
||||
void MetricsManager::onDumpReport(const uint64_t dumpTimeStampNs, ProtoOutputStream* protoOutput) {
|
||||
VLOG("=========================Metric Reports Start==========================");
|
||||
// one StatsLogReport per MetricProduer
|
||||
|
@ -73,6 +73,8 @@ public:
|
||||
return mLastReportTimeNs;
|
||||
};
|
||||
|
||||
virtual void dropData(const uint64_t dropTimeNs);
|
||||
|
||||
// Config source owner can call onDumpReport() to get all the metrics collected.
|
||||
virtual void onDumpReport(const uint64_t dumpTimeNs,
|
||||
android::util::ProtoOutputStream* protoOutput);
|
||||
|
@ -129,6 +129,11 @@ void ValueMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventT
|
||||
VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
|
||||
}
|
||||
|
||||
void ValueMetricProducer::dropDataLocked(const uint64_t dropTimeNs) {
|
||||
flushIfNeededLocked(dropTimeNs);
|
||||
mPastBuckets.clear();
|
||||
}
|
||||
|
||||
void ValueMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
ProtoOutputStream* protoOutput) {
|
||||
VLOG("metric %lld dump report now...", (long long)mMetricId);
|
||||
|
@ -106,6 +106,8 @@ private:
|
||||
|
||||
void flushCurrentBucketLocked(const uint64_t& eventTimeNs) override;
|
||||
|
||||
void dropDataLocked(const uint64_t dropTimeNs) override;
|
||||
|
||||
const FieldMatcher mValueField;
|
||||
|
||||
std::shared_ptr<StatsPullerManager> mStatsPullerManager;
|
||||
|
@ -51,7 +51,8 @@ public:
|
||||
}
|
||||
|
||||
MOCK_METHOD0(byteSize, size_t());
|
||||
MOCK_METHOD2(onDumpReport, void(const uint64_t timeNs, ProtoOutputStream* output));
|
||||
|
||||
MOCK_METHOD1(dropData, void(const uint64_t dropTimeNs));
|
||||
};
|
||||
|
||||
TEST(StatsLogProcessorTest, TestRateLimitByteSize) {
|
||||
@ -114,7 +115,7 @@ TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge) {
|
||||
.Times(1)
|
||||
.WillRepeatedly(Return(int(StatsdStats::kMaxMetricsBytesPerConfig * 1.2)));
|
||||
|
||||
EXPECT_CALL(mockMetricsManager, onDumpReport(_, _)).Times(1);
|
||||
EXPECT_CALL(mockMetricsManager, dropData(_)).Times(1);
|
||||
|
||||
// Expect to call the onDumpReport and skip the broadcast.
|
||||
p.flushIfNecessaryLocked(1, key, mockMetricsManager);
|
||||
|
Reference in New Issue
Block a user