use running sum for ValueMetricProducer bucket
simplify ValueMetricProducer logic for pulled data Test: unit test Change-Id: Ic0a21a543166cc5c34c1fa505dba08d1fc2f510a
This commit is contained in:
parent
c9f1a9e3dd
commit
a7259abde4
@ -105,6 +105,11 @@ public:
|
||||
*/
|
||||
void init();
|
||||
|
||||
/**
|
||||
* Set timestamp if the original timestamp is missing.
|
||||
*/
|
||||
void setTimestampNs(uint64_t timestampNs) {mTimestampNs = timestampNs;}
|
||||
|
||||
private:
|
||||
/**
|
||||
* Don't copy, it's slower. If we really need this we can add it but let's try to
|
||||
|
@ -188,7 +188,7 @@ bool CountMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey)
|
||||
void CountMetricProducer::onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const map<string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) {
|
||||
const LogEvent& event) {
|
||||
uint64_t eventTimeNs = event.GetTimestampNs();
|
||||
|
||||
flushIfNeededLocked(eventTimeNs);
|
||||
|
@ -58,7 +58,7 @@ protected:
|
||||
void onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) override;
|
||||
const LogEvent& event) override;
|
||||
|
||||
private:
|
||||
void onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
|
@ -251,7 +251,7 @@ bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newK
|
||||
void DurationMetricProducer::onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const map<string, HashableDimensionKey>& conditionKeys, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) {
|
||||
const LogEvent& event) {
|
||||
flushIfNeededLocked(event.GetTimestampNs());
|
||||
|
||||
if (matcherIndex == mStopAllIndex) {
|
||||
|
@ -57,7 +57,7 @@ protected:
|
||||
void onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const std::map<std::string, HashableDimensionKey>& conditionKeys, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) override;
|
||||
const LogEvent& event) override;
|
||||
|
||||
private:
|
||||
void onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
|
@ -122,7 +122,7 @@ void EventMetricProducer::onConditionChangedLocked(const bool conditionMet,
|
||||
void EventMetricProducer::onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) {
|
||||
const LogEvent& event) {
|
||||
if (!condition) {
|
||||
return;
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ private:
|
||||
void onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) override;
|
||||
const LogEvent& event) override;
|
||||
|
||||
void onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
android::util::ProtoOutputStream* protoOutput) override;
|
||||
|
@ -187,7 +187,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
|
||||
return;
|
||||
}
|
||||
for (const auto& data : allData) {
|
||||
onMatchedLogEventLocked(0, *data, false /*scheduledPull*/);
|
||||
onMatchedLogEventLocked(0, *data);
|
||||
}
|
||||
flushIfNeededLocked(eventTime);
|
||||
}
|
||||
@ -211,7 +211,7 @@ void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
|
||||
for (const auto& data : allData) {
|
||||
onMatchedLogEventLocked(0, *data, true /*scheduledPull*/);
|
||||
onMatchedLogEventLocked(0, *data);
|
||||
}
|
||||
}
|
||||
|
||||
@ -238,7 +238,7 @@ bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey)
|
||||
void GaugeMetricProducer::onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const map<string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) {
|
||||
const LogEvent& event) {
|
||||
if (condition == false) {
|
||||
return;
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ protected:
|
||||
void onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) override;
|
||||
const LogEvent& event) override;
|
||||
|
||||
private:
|
||||
void onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
|
@ -21,8 +21,7 @@ namespace statsd {
|
||||
|
||||
using std::map;
|
||||
|
||||
void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
|
||||
bool scheduledPull) {
|
||||
void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event) {
|
||||
uint64_t eventTimeNs = event.GetTimestampNs();
|
||||
// this is old event, maybe statsd restarted?
|
||||
if (eventTimeNs < mStartTimeNs) {
|
||||
@ -60,8 +59,7 @@ void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const Lo
|
||||
condition = mCondition;
|
||||
}
|
||||
|
||||
onMatchedLogEventInternalLocked(matcherIndex, eventKey, conditionKeys, condition, event,
|
||||
scheduledPull);
|
||||
onMatchedLogEventInternalLocked(matcherIndex, eventKey, conditionKeys, condition, event);
|
||||
}
|
||||
|
||||
} // namespace statsd
|
||||
|
@ -54,9 +54,9 @@ public:
|
||||
virtual ~MetricProducer(){};
|
||||
|
||||
// Consume the parsed stats log entry that already matched the "what" of the metric.
|
||||
void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull) {
|
||||
void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event) {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
onMatchedLogEventLocked(matcherIndex, event, scheduledPull);
|
||||
onMatchedLogEventLocked(matcherIndex, event);
|
||||
}
|
||||
|
||||
void onConditionChanged(const bool condition, const uint64_t eventTime) {
|
||||
@ -155,11 +155,10 @@ protected:
|
||||
virtual void onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) = 0;
|
||||
const LogEvent& event) = 0;
|
||||
|
||||
// Consume the parsed stats log entry that already matched the "what" of the metric.
|
||||
void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
|
||||
bool scheduledPull);
|
||||
void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event);
|
||||
|
||||
mutable std::mutex mMutex;
|
||||
};
|
||||
|
@ -162,8 +162,7 @@ void MetricsManager::onLogEvent(const LogEvent& event) {
|
||||
auto& metricList = pair->second;
|
||||
for (const int metricIndex : metricList) {
|
||||
// pushed metrics are never scheduled pulls
|
||||
mAllMetricProducers[metricIndex]->onMatchedLogEvent(i, event,
|
||||
false /* schedulePull */);
|
||||
mAllMetricProducers[metricIndex]->onMatchedLogEvent(i, event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -185,9 +185,13 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, const u
|
||||
mCondition = condition;
|
||||
|
||||
if (eventTime < mCurrentBucketStartTimeNs) {
|
||||
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTime,
|
||||
(long long)mCurrentBucketStartTimeNs);
|
||||
return;
|
||||
}
|
||||
|
||||
flushIfNeededLocked(eventTime);
|
||||
|
||||
if (mPullTagId != -1) {
|
||||
if (mCondition == true) {
|
||||
mStatsPullerManager->RegisterReceiver(mPullTagId, this,
|
||||
@ -202,9 +206,8 @@ void ValueMetricProducer::onConditionChangedLocked(const bool condition, const u
|
||||
return;
|
||||
}
|
||||
for (const auto& data : allData) {
|
||||
onMatchedLogEventLocked(0, *data, false);
|
||||
onMatchedLogEventLocked(0, *data);
|
||||
}
|
||||
flushIfNeededLocked(eventTime);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -217,15 +220,22 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
|
||||
if (allData.size() == 0) {
|
||||
return;
|
||||
}
|
||||
uint64_t eventTime = allData.at(0)->GetTimestampNs();
|
||||
// alarm is not accurate and might drift.
|
||||
if (eventTime > mCurrentBucketStartTimeNs + mBucketSizeNs * 3 / 2) {
|
||||
flushIfNeededLocked(eventTime);
|
||||
}
|
||||
// For scheduled pulled data, the effective event time is snap to the nearest
|
||||
// bucket boundary to make bucket finalize.
|
||||
uint64_t realEventTime = allData.at(0)->GetTimestampNs();
|
||||
uint64_t eventTime = mStartTimeNs + ((realEventTime - mStartTimeNs)/mBucketSizeNs) * mBucketSizeNs;
|
||||
|
||||
mCondition = false;
|
||||
for (const auto& data : allData) {
|
||||
onMatchedLogEventLocked(0, *data, true);
|
||||
data->setTimestampNs(eventTime-1);
|
||||
onMatchedLogEventLocked(0, *data);
|
||||
}
|
||||
|
||||
mCondition = true;
|
||||
for (const auto& data : allData) {
|
||||
data->setTimestampNs(eventTime);
|
||||
onMatchedLogEventLocked(0, *data);
|
||||
}
|
||||
flushIfNeededLocked(eventTime);
|
||||
}
|
||||
}
|
||||
|
||||
@ -253,7 +263,7 @@ bool ValueMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey)
|
||||
void ValueMetricProducer::onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const map<string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) {
|
||||
const LogEvent& event) {
|
||||
uint64_t eventTimeNs = event.GetTimestampNs();
|
||||
if (eventTimeNs < mCurrentBucketStartTimeNs) {
|
||||
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
|
||||
@ -261,6 +271,8 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(
|
||||
return;
|
||||
}
|
||||
|
||||
flushIfNeededLocked(eventTimeNs);
|
||||
|
||||
if (hitGuardRailLocked(eventKey)) {
|
||||
return;
|
||||
}
|
||||
@ -268,36 +280,21 @@ void ValueMetricProducer::onMatchedLogEventInternalLocked(
|
||||
|
||||
long value = get_value(event);
|
||||
|
||||
if (mPullTagId != -1) {
|
||||
if (scheduledPull) {
|
||||
// scheduled pull always sets beginning of current bucket and end
|
||||
// of next bucket
|
||||
if (interval.raw.size() > 0) {
|
||||
interval.raw.back().second = value;
|
||||
} else {
|
||||
interval.raw.push_back(make_pair(value, value));
|
||||
}
|
||||
Interval& nextInterval = mNextSlicedBucket[eventKey];
|
||||
if (nextInterval.raw.size() == 0) {
|
||||
nextInterval.raw.push_back(make_pair(value, 0));
|
||||
} else {
|
||||
nextInterval.raw.front().first = value;
|
||||
}
|
||||
if (mPullTagId != -1) { // for pulled events
|
||||
if (mCondition == true) {
|
||||
interval.start = value;
|
||||
interval.startUpdated = true;
|
||||
} else {
|
||||
if (mCondition == true) {
|
||||
interval.raw.push_back(make_pair(value, 0));
|
||||
if (interval.startUpdated) {
|
||||
interval.sum += (value - interval.start);
|
||||
interval.startUpdated = false;
|
||||
} else {
|
||||
if (interval.raw.size() != 0) {
|
||||
interval.raw.back().second = value;
|
||||
} else {
|
||||
interval.tainted = true;
|
||||
VLOG("Data on condition true missing!");
|
||||
}
|
||||
VLOG("No start for matching end %ld", value);
|
||||
interval.tainted += 1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
flushIfNeededLocked(eventTimeNs);
|
||||
interval.raw.push_back(make_pair(value, 0));
|
||||
} else { // for pushed events
|
||||
interval.sum += value;
|
||||
}
|
||||
}
|
||||
|
||||
@ -327,27 +324,16 @@ void ValueMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
|
||||
|
||||
int tainted = 0;
|
||||
for (const auto& slice : mCurrentSlicedBucket) {
|
||||
long value = 0;
|
||||
if (mPullTagId != -1) {
|
||||
for (const auto& pair : slice.second.raw) {
|
||||
value += (pair.second - pair.first);
|
||||
}
|
||||
} else {
|
||||
for (const auto& pair : slice.second.raw) {
|
||||
value += pair.first;
|
||||
}
|
||||
}
|
||||
tainted += slice.second.tainted;
|
||||
info.mValue = value;
|
||||
VLOG(" %s, %ld, %d", slice.first.c_str(), value, tainted);
|
||||
info.mValue = slice.second.sum;
|
||||
// it will auto create new vector of ValuebucketInfo if the key is not found.
|
||||
auto& bucketList = mPastBuckets[slice.first];
|
||||
bucketList.push_back(info);
|
||||
}
|
||||
VLOG("%d tainted pairs in the bucket", tainted);
|
||||
|
||||
// Reset counters
|
||||
mCurrentSlicedBucket.swap(mNextSlicedBucket);
|
||||
mNextSlicedBucket.clear();
|
||||
mCurrentSlicedBucket.clear();
|
||||
|
||||
int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
|
||||
mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
|
||||
|
@ -56,7 +56,7 @@ protected:
|
||||
void onMatchedLogEventInternalLocked(
|
||||
const size_t matcherIndex, const HashableDimensionKey& eventKey,
|
||||
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
|
||||
const LogEvent& event, bool scheduledPull) override;
|
||||
const LogEvent& event) override;
|
||||
|
||||
private:
|
||||
void onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
@ -89,14 +89,19 @@ private:
|
||||
|
||||
// internal state of a bucket.
|
||||
typedef struct {
|
||||
std::vector<std::pair<long, long>> raw;
|
||||
bool tainted;
|
||||
// Pulled data always come in pair of <start, end>. This holds the value
|
||||
// for start. The diff (end - start) is added to sum.
|
||||
long start;
|
||||
// Whether the start data point is updated
|
||||
bool startUpdated;
|
||||
// If end data point comes before the start, record this pair as tainted
|
||||
// and the value is not added to the running sum.
|
||||
int tainted;
|
||||
// Running sum of known pairs in this bucket
|
||||
long sum;
|
||||
} Interval;
|
||||
|
||||
std::unordered_map<HashableDimensionKey, Interval> mCurrentSlicedBucket;
|
||||
// If condition is true and pulling on schedule, the previous bucket value needs to be carried
|
||||
// over to the next bucket.
|
||||
std::unordered_map<HashableDimensionKey, Interval> mNextSlicedBucket;
|
||||
|
||||
// Save the past buckets and we can clear when the StatsLogReport is dumped.
|
||||
// TODO: Add a lock to mPastBuckets.
|
||||
|
@ -54,8 +54,8 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) {
|
||||
bucketStartTimeNs);
|
||||
|
||||
// 2 events in bucket 1.
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
|
||||
|
||||
// Flushes at event #2.
|
||||
countProducer.flushIfNeededLocked(bucketStartTimeNs + 2);
|
||||
@ -74,7 +74,7 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) {
|
||||
|
||||
// 1 matched event happens in bucket 2.
|
||||
LogEvent event3(tagId, bucketStartTimeNs + bucketSizeNs + 2);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3);
|
||||
countProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
|
||||
EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
|
||||
EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
|
||||
@ -111,12 +111,12 @@ TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition) {
|
||||
CountMetricProducer countProducer(kConfigKey, metric, 1, wizard, bucketStartTimeNs);
|
||||
|
||||
countProducer.onConditionChanged(true, bucketStartTimeNs);
|
||||
countProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
|
||||
countProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
|
||||
EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
|
||||
|
||||
countProducer.onConditionChanged(false /*new condition*/, bucketStartTimeNs + 2);
|
||||
// Upon this match event, the matched event1 is flushed.
|
||||
countProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
|
||||
countProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
|
||||
EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
|
||||
|
||||
countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
|
||||
@ -166,11 +166,11 @@ TEST(CountMetricProducerTest, TestEventsWithSlicedCondition) {
|
||||
CountMetricProducer countProducer(kConfigKey, metric, 1 /*condition tracker index*/, wizard,
|
||||
bucketStartTimeNs);
|
||||
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
|
||||
countProducer.flushIfNeededLocked(bucketStartTimeNs + 1);
|
||||
EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
|
||||
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
|
||||
countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
|
||||
EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
|
||||
EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
|
||||
@ -217,29 +217,29 @@ TEST(CountMetricProducerTest, TestAnomalyDetection) {
|
||||
LogEvent event7(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 3 + NS_PER_SEC);
|
||||
|
||||
// Two events in bucket #0.
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
|
||||
|
||||
EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
|
||||
EXPECT_EQ(2L, countProducer.mCurrentSlicedCounter->begin()->second);
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL);
|
||||
|
||||
// One event in bucket #2. No alarm as bucket #0 is trashed out.
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3);
|
||||
EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
|
||||
EXPECT_EQ(1L, countProducer.mCurrentSlicedCounter->begin()->second);
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL);
|
||||
|
||||
// Two events in bucket #3.
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event4, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event5, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event6, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event4);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event5);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event6);
|
||||
EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
|
||||
EXPECT_EQ(3L, countProducer.mCurrentSlicedCounter->begin()->second);
|
||||
// Anomaly at event 6 is within refractory period. The alarm is at event 5 timestamp not event 6
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event5.GetTimestampNs());
|
||||
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event7, false);
|
||||
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event7);
|
||||
EXPECT_EQ(1UL, countProducer.mCurrentSlicedCounter->size());
|
||||
EXPECT_EQ(4L, countProducer.mCurrentSlicedCounter->begin()->second);
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event7.GetTimestampNs());
|
||||
|
@ -56,8 +56,8 @@ TEST(DurationMetricTrackerTest, TestNoCondition) {
|
||||
kConfigKey, metric, -1 /*no condition*/, 1 /* start index */, 2 /* stop index */,
|
||||
3 /* stop_all index */, false /*nesting*/, wizard, {}, bucketStartTimeNs);
|
||||
|
||||
durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */);
|
||||
durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */);
|
||||
durationProducer.onMatchedLogEvent(1 /* start index*/, event1);
|
||||
durationProducer.onMatchedLogEvent(2 /* stop index*/, event2);
|
||||
durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
|
||||
EXPECT_EQ(1UL, durationProducer.mPastBuckets.size());
|
||||
EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
|
||||
@ -94,14 +94,14 @@ TEST(DurationMetricTrackerTest, TestNonSlicedCondition) {
|
||||
EXPECT_FALSE(durationProducer.mCondition);
|
||||
EXPECT_FALSE(durationProducer.isConditionSliced());
|
||||
|
||||
durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */);
|
||||
durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */);
|
||||
durationProducer.onMatchedLogEvent(1 /* start index*/, event1);
|
||||
durationProducer.onMatchedLogEvent(2 /* stop index*/, event2);
|
||||
durationProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
|
||||
EXPECT_EQ(0UL, durationProducer.mPastBuckets.size());
|
||||
|
||||
durationProducer.onMatchedLogEvent(1 /* start index*/, event3, false /* scheduledPull */);
|
||||
durationProducer.onMatchedLogEvent(1 /* start index*/, event3);
|
||||
durationProducer.onConditionChanged(true /* condition */, bucketStartTimeNs + bucketSizeNs + 2);
|
||||
durationProducer.onMatchedLogEvent(2 /* stop index*/, event4, false /* scheduledPull */);
|
||||
durationProducer.onMatchedLogEvent(2 /* stop index*/, event4);
|
||||
durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
|
||||
EXPECT_EQ(1UL, durationProducer.mPastBuckets.size());
|
||||
EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
|
||||
|
@ -50,8 +50,8 @@ TEST(EventMetricProducerTest, TestNoCondition) {
|
||||
EventMetricProducer eventProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
|
||||
bucketStartTimeNs);
|
||||
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
|
||||
|
||||
// TODO: get the report and check the content after the ProtoOutputStream change is done.
|
||||
// eventProducer.onDumpReport();
|
||||
@ -74,11 +74,11 @@ TEST(EventMetricProducerTest, TestEventsWithNonSlicedCondition) {
|
||||
EventMetricProducer eventProducer(kConfigKey, metric, 1, wizard, bucketStartTimeNs);
|
||||
|
||||
eventProducer.onConditionChanged(true /*condition*/, bucketStartTimeNs);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
|
||||
|
||||
eventProducer.onConditionChanged(false /*condition*/, bucketStartTimeNs + 2);
|
||||
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
|
||||
|
||||
// TODO: get the report and check the content after the ProtoOutputStream change is done.
|
||||
// eventProducer.onDumpReport();
|
||||
@ -115,8 +115,8 @@ TEST(EventMetricProducerTest, TestEventsWithSlicedCondition) {
|
||||
|
||||
EventMetricProducer eventProducer(kConfigKey, metric, 1, wizard, bucketStartTimeNs);
|
||||
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1, false /*pulled*/);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event1);
|
||||
eventProducer.onMatchedLogEvent(1 /*matcher index*/, event2);
|
||||
|
||||
// TODO: get the report and check the content after the ProtoOutputStream change is done.
|
||||
// eventProducer.onDumpReport();
|
||||
|
@ -35,23 +35,23 @@ namespace os {
|
||||
namespace statsd {
|
||||
|
||||
const ConfigKey kConfigKey(0, "test");
|
||||
const int tagId = 1;
|
||||
const string metricName = "test_metric";
|
||||
const int64_t bucketStartTimeNs = 10000000000;
|
||||
const int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
|
||||
const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
|
||||
const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
|
||||
|
||||
/*
|
||||
* Tests pulled atoms with no conditions
|
||||
*/
|
||||
TEST(ValueMetricProducerTest, TestNonDimensionalEvents) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
|
||||
int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
|
||||
int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
|
||||
|
||||
ValueMetric metric;
|
||||
metric.set_name("1");
|
||||
metric.set_name(metricName);
|
||||
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
|
||||
metric.set_value_field(2);
|
||||
|
||||
int tagId = 1;
|
||||
|
||||
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
|
||||
// TODO: pending refactor of StatsPullerManager
|
||||
// For now we still need this so that it doesn't do real pulling.
|
||||
@ -65,8 +65,8 @@ TEST(ValueMetricProducerTest, TestNonDimensionalEvents) {
|
||||
|
||||
vector<shared_ptr<LogEvent>> allData;
|
||||
allData.clear();
|
||||
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
|
||||
event->write(1);
|
||||
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
|
||||
event->write(tagId);
|
||||
event->write(11);
|
||||
event->init();
|
||||
allData.push_back(event);
|
||||
@ -75,76 +75,60 @@ TEST(ValueMetricProducerTest, TestNonDimensionalEvents) {
|
||||
// has one slice
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(1UL, curInterval.raw.size());
|
||||
// value is 11, 11
|
||||
EXPECT_EQ(11, curInterval.raw.front().first);
|
||||
EXPECT_EQ(11, curInterval.raw.front().second);
|
||||
ValueMetricProducer::Interval nextInterval = valueProducer.mNextSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(1UL, nextInterval.raw.size());
|
||||
// value is 11, 0
|
||||
EXPECT_EQ(11, nextInterval.raw.front().first);
|
||||
EXPECT_EQ(0, nextInterval.raw.front().second);
|
||||
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
|
||||
// startUpdated:true tainted:0 sum:0 start:11
|
||||
EXPECT_EQ(true, curInterval.startUpdated);
|
||||
EXPECT_EQ(0, curInterval.tainted);
|
||||
EXPECT_EQ(0, curInterval.sum);
|
||||
EXPECT_EQ(11, curInterval.start);
|
||||
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second.back().mValue);
|
||||
|
||||
allData.clear();
|
||||
event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
|
||||
event->write(1);
|
||||
event->write(22);
|
||||
event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
|
||||
event->write(tagId);
|
||||
event->write(23);
|
||||
event->init();
|
||||
allData.push_back(event);
|
||||
valueProducer.onDataPulled(allData);
|
||||
// has one slice
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(1UL, curInterval.raw.size());
|
||||
// value is 22, 0
|
||||
EXPECT_EQ(22, curInterval.raw.front().first);
|
||||
EXPECT_EQ(0, curInterval.raw.front().second);
|
||||
EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
|
||||
// tartUpdated:false tainted:0 sum:12
|
||||
EXPECT_EQ(true, curInterval.startUpdated);
|
||||
EXPECT_EQ(0, curInterval.tainted);
|
||||
EXPECT_EQ(0, curInterval.sum);
|
||||
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(11, valueProducer.mPastBuckets.begin()->second.back().mValue);
|
||||
EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
|
||||
|
||||
allData.clear();
|
||||
event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
|
||||
event->write(1);
|
||||
event->write(33);
|
||||
event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
|
||||
event->write(tagId);
|
||||
event->write(36);
|
||||
event->init();
|
||||
allData.push_back(event);
|
||||
valueProducer.onDataPulled(allData);
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
EXPECT_EQ(1UL, curInterval.raw.size());
|
||||
// value is 33, 0
|
||||
EXPECT_EQ(33, curInterval.raw.front().first);
|
||||
EXPECT_EQ(0, curInterval.raw.front().second);
|
||||
EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
|
||||
// startUpdated:false tainted:0 sum:12
|
||||
EXPECT_EQ(true, curInterval.startUpdated);
|
||||
EXPECT_EQ(0, curInterval.tainted);
|
||||
EXPECT_EQ(0, curInterval.sum);
|
||||
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(11, valueProducer.mPastBuckets.begin()->second.back().mValue);
|
||||
EXPECT_EQ(3UL, valueProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValue);
|
||||
}
|
||||
|
||||
/*
|
||||
* Test pulled event with non sliced condition.
|
||||
*/
|
||||
TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
|
||||
int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
|
||||
int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
|
||||
|
||||
ValueMetric metric;
|
||||
metric.set_name("1");
|
||||
metric.set_name(metricName);
|
||||
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
|
||||
metric.set_value_field(2);
|
||||
metric.set_condition("SCREEN_ON");
|
||||
|
||||
int tagId = 1;
|
||||
|
||||
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
|
||||
shared_ptr<MockStatsPullerManager> pullerManager =
|
||||
make_shared<StrictMock<MockStatsPullerManager>>();
|
||||
@ -153,28 +137,18 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
|
||||
|
||||
EXPECT_CALL(*pullerManager, Pull(tagId, _))
|
||||
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
|
||||
int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
|
||||
int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
|
||||
data->clear();
|
||||
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
|
||||
event->write(1);
|
||||
event->write(tagId);
|
||||
event->write(100);
|
||||
event->init();
|
||||
data->push_back(event);
|
||||
return true;
|
||||
}))
|
||||
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
|
||||
int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
|
||||
int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
|
||||
data->clear();
|
||||
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10);
|
||||
event->write(1);
|
||||
event->write(tagId);
|
||||
event->write(120);
|
||||
event->init();
|
||||
data->push_back(event);
|
||||
@ -184,17 +158,16 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
|
||||
ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
|
||||
pullerManager);
|
||||
|
||||
valueProducer.onConditionChanged(true, bucketStartTimeNs + 10);
|
||||
valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
|
||||
|
||||
// has one slice
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(1UL, curInterval.raw.size());
|
||||
// value is 100, 0
|
||||
EXPECT_EQ(100, curInterval.raw.front().first);
|
||||
EXPECT_EQ(0, curInterval.raw.front().second);
|
||||
EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
|
||||
// startUpdated:false tainted:0 sum:0 start:100
|
||||
EXPECT_EQ(100, curInterval.start);
|
||||
EXPECT_EQ(true, curInterval.startUpdated);
|
||||
EXPECT_EQ(0, curInterval.tainted);
|
||||
EXPECT_EQ(0, curInterval.sum);
|
||||
EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
|
||||
|
||||
vector<shared_ptr<LogEvent>> allData;
|
||||
@ -209,11 +182,8 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
|
||||
// has one slice
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(1UL, curInterval.raw.size());
|
||||
// value is 110, 0
|
||||
EXPECT_EQ(110, curInterval.raw.front().first);
|
||||
EXPECT_EQ(0, curInterval.raw.front().second);
|
||||
// startUpdated:false tainted:0 sum:0 start:110
|
||||
EXPECT_EQ(110, curInterval.start);
|
||||
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValue);
|
||||
@ -223,27 +193,17 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
|
||||
// has one slice
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(1UL, curInterval.raw.size());
|
||||
// value is 110, 120
|
||||
EXPECT_EQ(110, curInterval.raw.front().first);
|
||||
EXPECT_EQ(120, curInterval.raw.front().second);
|
||||
// startUpdated:false tainted:0 sum:0 start:110
|
||||
EXPECT_EQ(10, curInterval.sum);
|
||||
EXPECT_EQ(false, curInterval.startUpdated);
|
||||
}
|
||||
|
||||
TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
|
||||
int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
|
||||
int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
|
||||
|
||||
ValueMetric metric;
|
||||
metric.set_name("1");
|
||||
metric.set_name(metricName);
|
||||
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
|
||||
metric.set_value_field(2);
|
||||
|
||||
int tagId = 1;
|
||||
|
||||
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
|
||||
shared_ptr<MockStatsPullerManager> pullerManager =
|
||||
make_shared<StrictMock<MockStatsPullerManager>>();
|
||||
@ -255,32 +215,22 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
|
||||
event1->write(1);
|
||||
event1->write(10);
|
||||
event1->init();
|
||||
shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
|
||||
shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
|
||||
event2->write(1);
|
||||
event2->write(20);
|
||||
event2->init();
|
||||
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1, false);
|
||||
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
|
||||
// has one slice
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(1UL, curInterval.raw.size());
|
||||
// value is 10, 0
|
||||
EXPECT_EQ(10, curInterval.raw.front().first);
|
||||
EXPECT_EQ(0, curInterval.raw.front().second);
|
||||
EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
|
||||
EXPECT_EQ(10, curInterval.sum);
|
||||
|
||||
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2, false);
|
||||
valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
|
||||
|
||||
// has one slice
|
||||
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
|
||||
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
|
||||
// has one raw pair
|
||||
EXPECT_EQ(2UL, curInterval.raw.size());
|
||||
// value is 10, 20
|
||||
EXPECT_EQ(10, curInterval.raw.front().first);
|
||||
EXPECT_EQ(20, curInterval.raw.back().first);
|
||||
EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
|
||||
EXPECT_EQ(30, curInterval.sum);
|
||||
|
||||
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
|
||||
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
|
||||
|
Loading…
x
Reference in New Issue
Block a user