Merge "Switch Looper back to using poll() instead of epoll()." into gingerbread

This commit is contained in:
Jeff Brown
2010-10-07 13:29:56 -07:00
committed by Android (Google) Code Review
6 changed files with 349 additions and 74 deletions

View File

@ -34,16 +34,19 @@ public class MessageQueue {
Message mMessages;
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private IdleHandler[] mPendingIdleHandlers;
private boolean mQuiting = false;
private boolean mQuiting;
boolean mQuitAllowed = true;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
@SuppressWarnings("unused")
private int mPtr; // used by native code
private native void nativeInit();
private native void nativeDestroy();
private native boolean nativePollOnce(int timeoutMillis);
private native void nativeWake();
private native void nativePollOnce(int ptr, int timeoutMillis);
private native void nativeWake(int ptr);
/**
* Callback interface for discovering when a thread is going to block
@ -113,7 +116,7 @@ public class MessageQueue {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(nextPollTimeoutMillis);
nativePollOnce(mPtr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
@ -122,7 +125,9 @@ public class MessageQueue {
if (msg != null) {
final long when = msg.when;
if (now >= when) {
mBlocked = false;
mMessages = msg.next;
msg.next = null;
if (Config.LOGV) Log.v("MessageQueue", "Returning message: " + msg);
return msg;
} else {
@ -138,6 +143,7 @@ public class MessageQueue {
}
if (pendingIdleHandlerCount == 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
@ -184,6 +190,7 @@ public class MessageQueue {
if (msg.target == null && !mQuitAllowed) {
throw new RuntimeException("Main thread not allowed to quit");
}
final boolean needWake;
synchronized (this) {
if (mQuiting) {
RuntimeException e = new RuntimeException(
@ -200,6 +207,7 @@ public class MessageQueue {
if (p == null || when == 0 || when < p.when) {
msg.next = p;
mMessages = msg;
needWake = mBlocked; // new head, might need to wake up
} else {
Message prev = null;
while (p != null && p.when <= when) {
@ -208,9 +216,12 @@ public class MessageQueue {
}
msg.next = prev.next;
prev.next = msg;
needWake = false; // still waiting on head, no need to wake up
}
}
nativeWake();
if (needWake) {
nativeWake(mPtr);
}
return true;
}

View File

@ -41,7 +41,7 @@ public:
inline sp<Looper> getLooper() { return mLooper; }
bool pollOnce(int timeoutMillis);
void pollOnce(int timeoutMillis);
void wake();
private:
@ -61,8 +61,8 @@ NativeMessageQueue::NativeMessageQueue() {
NativeMessageQueue::~NativeMessageQueue() {
}
bool NativeMessageQueue::pollOnce(int timeoutMillis) {
return mLooper->pollOnce(timeoutMillis) != ALOOPER_POLL_TIMEOUT;
void NativeMessageQueue::pollOnce(int timeoutMillis) {
mLooper->pollOnce(timeoutMillis);
}
void NativeMessageQueue::wake() {
@ -112,24 +112,14 @@ static void throwQueueNotInitialized(JNIEnv* env) {
jniThrowException(env, "java/lang/IllegalStateException", "Message queue not initialized");
}
static jboolean android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue =
android_os_MessageQueue_getNativeMessageQueue(env, obj);
if (! nativeMessageQueue) {
throwQueueNotInitialized(env);
return false;
}
return nativeMessageQueue->pollOnce(timeoutMillis);
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jint ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(timeoutMillis);
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue =
android_os_MessageQueue_getNativeMessageQueue(env, obj);
if (! nativeMessageQueue) {
throwQueueNotInitialized(env);
return;
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake();
}
@ -139,8 +129,8 @@ static JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(I)Z", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "()V", (void*)android_os_MessageQueue_nativeWake }
{ "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake }
};
#define FIND_CLASS(var, className) \

View File

@ -20,9 +20,22 @@
#include <utils/threads.h>
#include <utils/RefBase.h>
#include <utils/KeyedVector.h>
#include <utils/Timers.h>
#include <android/looper.h>
// Currently using poll() instead of epoll_wait() since it does a better job of meeting a
// timeout deadline. epoll_wait() typically causes additional delays of up to 10ms
// beyond the requested timeout.
//#define LOOPER_USES_EPOLL
//#define LOOPER_STATISTICS
#ifdef LOOPER_USES_EPOLL
#include <sys/epoll.h>
#else
#include <sys/poll.h>
#endif
/*
* Declare a concrete type for the NDK's looper forward declaration.
*/
@ -190,13 +203,54 @@ private:
const bool mAllowNonCallbacks; // immutable
int mEpollFd; // immutable
int mWakeReadPipeFd; // immutable
int mWakeWritePipeFd; // immutable
Mutex mLock;
#ifdef LOOPER_USES_EPOLL
int mEpollFd; // immutable
// Locked list of file descriptor monitoring requests.
Mutex mLock;
KeyedVector<int, Request> mRequests;
KeyedVector<int, Request> mRequests; // guarded by mLock
#else
// The lock guards state used to track whether there is a poll() in progress and whether
// there are any other threads waiting in wakeAndLock(). The condition variables
// are used to transfer control among these threads such that all waiters are
// serviced before a new poll can begin.
// The wakeAndLock() method increments mWaiters, wakes the poll, blocks on mAwake
// until mPolling becomes false, then decrements mWaiters again.
// The poll() method blocks on mResume until mWaiters becomes 0, then sets
// mPolling to true, blocks until the poll completes, then resets mPolling to false
// and signals mResume if there are waiters.
bool mPolling; // guarded by mLock
uint32_t mWaiters; // guarded by mLock
Condition mAwake; // guarded by mLock
Condition mResume; // guarded by mLock
Vector<struct pollfd> mRequestedFds; // must hold mLock and mPolling must be false to modify
Vector<Request> mRequests; // must hold mLock and mPolling must be false to modify
ssize_t getRequestIndexLocked(int fd);
void wakeAndLock();
#endif
#ifdef LOOPER_STATISTICS
static const int SAMPLED_WAKE_CYCLES_TO_AGGREGATE = 100;
static const int SAMPLED_POLLS_TO_AGGREGATE = 1000;
nsecs_t mPendingWakeTime;
int mPendingWakeCount;
int mSampledWakeCycles;
int mSampledWakeCountSum;
nsecs_t mSampledWakeLatencySum;
int mSampledPolls;
int mSampledZeroPollCount;
int mSampledZeroPollLatencySum;
int mSampledTimeoutPollCount;
int mSampledTimeoutPollLatencySum;
#endif
// This state is only used privately by pollOnce and does not require a lock since
// it runs on a single thread.
@ -204,6 +258,8 @@ private:
size_t mResponseIndex;
int pollInner(int timeoutMillis);
void awoken();
void pushResponse(int events, const Request& request);
static void initTLSKey();
static void threadDestructor(void *st);

View File

@ -19,16 +19,17 @@
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
namespace android {
#ifdef LOOPER_USES_EPOLL
// Hint for number of file descriptors to be associated with the epoll instance.
static const int EPOLL_SIZE_HINT = 8;
// Maximum number of file descriptors for which to retrieve poll events each iteration.
static const int EPOLL_MAX_EVENTS = 16;
#endif
static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0;
@ -36,9 +37,6 @@ static pthread_key_t gTLSKey = 0;
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks),
mResponseIndex(0) {
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
@ -54,6 +52,11 @@ Looper::Looper(bool allowNonCallbacks) :
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
#ifdef LOOPER_USES_EPOLL
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
@ -61,12 +64,45 @@ Looper::Looper(bool allowNonCallbacks) :
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
#else
// Add the wake pipe to the head of the request list with a null callback.
struct pollfd requestedFd;
requestedFd.fd = mWakeReadPipeFd;
requestedFd.events = POLLIN;
mRequestedFds.push(requestedFd);
Request request;
request.fd = mWakeReadPipeFd;
request.callback = NULL;
request.ident = 0;
request.data = NULL;
mRequests.push(request);
mPolling = false;
mWaiters = 0;
#endif
#ifdef LOOPER_STATISTICS
mPendingWakeTime = -1;
mPendingWakeCount = 0;
mSampledWakeCycles = 0;
mSampledWakeCountSum = 0;
mSampledWakeLatencySum = 0;
mSampledPolls = 0;
mSampledZeroPollCount = 0;
mSampledZeroPollLatencySum = 0;
mSampledTimeoutPollCount = 0;
mSampledTimeoutPollLatencySum = 0;
#endif
}
Looper::~Looper() {
close(mWakeReadPipeFd);
close(mWakeWritePipeFd);
#ifdef LOOPER_USES_EPOLL
close(mEpollFd);
#endif
}
void Looper::initTLSKey() {
@ -157,45 +193,61 @@ int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
#ifdef LOOPER_STATISTICS
nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
#endif
#ifdef LOOPER_USES_EPOLL
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
bool acquiredLock = false;
#else
// Wait for wakeAndLock() waiters to run then set mPolling to true.
mLock.lock();
while (mWaiters != 0) {
mResume.wait(mLock);
}
mPolling = true;
mLock.unlock();
size_t requestedCount = mRequestedFds.size();
int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
#endif
if (eventCount < 0) {
if (errno == EINTR) {
return ALOOPER_POLL_WAKE;
goto Done;
}
LOGW("Poll failed with an unexpected error, errno=%d", errno);
return ALOOPER_POLL_ERROR;
result = ALOOPER_POLL_ERROR;
goto Done;
}
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - timeout", this);
#endif
return ALOOPER_POLL_TIMEOUT;
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
bool acquiredLock = false;
#ifdef LOOPER_USES_EPOLL
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - awoken", this);
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
awoken();
} else {
LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
@ -212,11 +264,7 @@ int Looper::pollInner(int timeoutMillis) {
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
Response response;
response.events = events;
response.request = mRequests.valueAt(requestIndex);
mResponses.push(response);
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
@ -226,6 +274,66 @@ int Looper::pollInner(int timeoutMillis) {
if (acquiredLock) {
mLock.unlock();
}
Done: ;
#else
for (size_t i = 0; i < requestedCount; i++) {
const struct pollfd& requestedFd = mRequestedFds.itemAt(i);
short pollEvents = requestedFd.revents;
if (pollEvents) {
if (requestedFd.fd == mWakeReadPipeFd) {
if (pollEvents & POLLIN) {
awoken();
} else {
LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
}
} else {
int events = 0;
if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
pushResponse(events, mRequests.itemAt(i));
}
if (--eventCount == 0) {
break;
}
}
}
Done:
// Set mPolling to false and wake up the wakeAndLock() waiters.
mLock.lock();
mPolling = false;
if (mWaiters != 0) {
mAwake.broadcast();
}
mLock.unlock();
#endif
#ifdef LOOPER_STATISTICS
nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
mSampledPolls += 1;
if (timeoutMillis == 0) {
mSampledZeroPollCount += 1;
mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
} else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
mSampledTimeoutPollCount += 1;
mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
- milliseconds_to_nanoseconds(timeoutMillis);
}
if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
mSampledPolls = 0;
mSampledZeroPollCount = 0;
mSampledZeroPollLatencySum = 0;
mSampledTimeoutPollCount = 0;
mSampledTimeoutPollLatencySum = 0;
}
#endif
for (size_t i = 0; i < mResponses.size(); i++) {
const Response& response = mResponses.itemAt(i);
@ -278,6 +386,13 @@ void Looper::wake() {
LOGD("%p ~ wake", this);
#endif
#ifdef LOOPER_STATISTICS
// FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
if (mPendingWakeCount++ == 0) {
mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
}
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
@ -290,23 +405,51 @@ void Looper::wake() {
}
}
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ awoken", this);
#endif
#ifdef LOOPER_STATISTICS
if (mPendingWakeCount == 0) {
LOGD("%p ~ awoken: spurious!", this);
} else {
mSampledWakeCycles += 1;
mSampledWakeCountSum += mPendingWakeCount;
mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
mPendingWakeCount = 0;
mPendingWakeTime = -1;
if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
float(mSampledWakeCountSum) / mSampledWakeCycles);
mSampledWakeCycles = 0;
mSampledWakeCountSum = 0;
mSampledWakeLatencySum = 0;
}
}
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
#if DEBUG_CALLBACKS
LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback, data);
#endif
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
if (events & ALOOPER_EVENT_ERROR) epollEvents |= EPOLLERR;
if (events & ALOOPER_EVENT_HANGUP) epollEvents |= EPOLLHUP;
if (epollEvents == 0) {
LOGE("Invalid attempt to set a callback with no selected poll events.");
return -1;
}
if (! callback) {
if (! mAllowNonCallbacks) {
LOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
@ -319,6 +462,11 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback,
}
}
#ifdef LOOPER_USES_EPOLL
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
{ // acquire lock
AutoMutex _l(mLock);
@ -350,6 +498,33 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback,
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
#else
int pollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN;
if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT;
wakeAndLock(); // acquire lock
struct pollfd requestedFd;
requestedFd.fd = fd;
requestedFd.events = pollEvents;
Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;
ssize_t index = getRequestIndexLocked(fd);
if (index < 0) {
mRequestedFds.push(requestedFd);
mRequests.push(request);
} else {
mRequestedFds.replaceAt(requestedFd, size_t(index));
mRequests.replaceAt(request, size_t(index));
}
mLock.unlock(); // release lock
#endif
return 1;
}
@ -358,6 +533,7 @@ int Looper::removeFd(int fd) {
LOGD("%p ~ removeFd - fd=%d", this, fd);
#endif
#ifdef LOOPER_USES_EPOLL
{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
@ -372,8 +548,49 @@ int Looper::removeFd(int fd) {
}
mRequests.removeItemsAt(requestIndex);
} // request lock
} // release lock
return 1;
#else
wakeAndLock(); // acquire lock
ssize_t index = getRequestIndexLocked(fd);
if (index >= 0) {
mRequestedFds.removeAt(size_t(index));
mRequests.removeAt(size_t(index));
}
mLock.unlock(); // release lock
return index >= 0;
#endif
}
#ifndef LOOPER_USES_EPOLL
ssize_t Looper::getRequestIndexLocked(int fd) {
size_t requestCount = mRequestedFds.size();
for (size_t i = 0; i < requestCount; i++) {
if (mRequestedFds.itemAt(i).fd == fd) {
return i;
}
}
return -1;
}
void Looper::wakeAndLock() {
mLock.lock();
mWaiters += 1;
while (mPolling) {
wake();
mAwake.wait(mLock);
}
mWaiters -= 1;
if (mWaiters == 0) {
mResume.signal();
}
}
#endif
} // namespace android

View File

@ -354,14 +354,6 @@ TEST_F(LooperTest, AddFd_WhenCallbackAdded_ReturnsOne) {
<< "addFd should return 1 because FD was added";
}
TEST_F(LooperTest, AddFd_WhenEventsIsZero_ReturnsError) {
Pipe pipe;
int result = mLooper->addFd(pipe.receiveFd, 0, 0, NULL, NULL);
EXPECT_EQ(-1, result)
<< "addFd should return -1 because arguments were invalid";
}
TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) {
Pipe pipe;
int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL);

View File

@ -135,6 +135,15 @@ enum {
* to specify this event flag in the requested event set.
*/
ALOOPER_EVENT_HANGUP = 1 << 3,
/**
* The file descriptor is invalid.
* For example, the file descriptor was closed prematurely.
*
* The looper always sends notifications about invalid file descriptors; it is not necessary
* to specify this event flag in the requested event set.
*/
ALOOPER_EVENT_INVALID = 1 << 4,
};
/**