Merge "Switch Looper back to using poll() instead of epoll()." into gingerbread

This commit is contained in:
Jeff Brown
2010-10-07 13:29:56 -07:00
committed by Android (Google) Code Review
6 changed files with 349 additions and 74 deletions

View File

@ -34,16 +34,19 @@ public class MessageQueue {
Message mMessages; Message mMessages;
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>(); private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
private IdleHandler[] mPendingIdleHandlers; private IdleHandler[] mPendingIdleHandlers;
private boolean mQuiting = false; private boolean mQuiting;
boolean mQuitAllowed = true; boolean mQuitAllowed = true;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
private boolean mBlocked;
@SuppressWarnings("unused") @SuppressWarnings("unused")
private int mPtr; // used by native code private int mPtr; // used by native code
private native void nativeInit(); private native void nativeInit();
private native void nativeDestroy(); private native void nativeDestroy();
private native boolean nativePollOnce(int timeoutMillis); private native void nativePollOnce(int ptr, int timeoutMillis);
private native void nativeWake(); private native void nativeWake(int ptr);
/** /**
* Callback interface for discovering when a thread is going to block * Callback interface for discovering when a thread is going to block
@ -113,7 +116,7 @@ public class MessageQueue {
if (nextPollTimeoutMillis != 0) { if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands(); Binder.flushPendingCommands();
} }
nativePollOnce(nextPollTimeoutMillis); nativePollOnce(mPtr, nextPollTimeoutMillis);
synchronized (this) { synchronized (this) {
// Try to retrieve the next message. Return if found. // Try to retrieve the next message. Return if found.
@ -122,7 +125,9 @@ public class MessageQueue {
if (msg != null) { if (msg != null) {
final long when = msg.when; final long when = msg.when;
if (now >= when) { if (now >= when) {
mBlocked = false;
mMessages = msg.next; mMessages = msg.next;
msg.next = null;
if (Config.LOGV) Log.v("MessageQueue", "Returning message: " + msg); if (Config.LOGV) Log.v("MessageQueue", "Returning message: " + msg);
return msg; return msg;
} else { } else {
@ -138,6 +143,7 @@ public class MessageQueue {
} }
if (pendingIdleHandlerCount == 0) { if (pendingIdleHandlerCount == 0) {
// No idle handlers to run. Loop and wait some more. // No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue; continue;
} }
@ -184,6 +190,7 @@ public class MessageQueue {
if (msg.target == null && !mQuitAllowed) { if (msg.target == null && !mQuitAllowed) {
throw new RuntimeException("Main thread not allowed to quit"); throw new RuntimeException("Main thread not allowed to quit");
} }
final boolean needWake;
synchronized (this) { synchronized (this) {
if (mQuiting) { if (mQuiting) {
RuntimeException e = new RuntimeException( RuntimeException e = new RuntimeException(
@ -200,6 +207,7 @@ public class MessageQueue {
if (p == null || when == 0 || when < p.when) { if (p == null || when == 0 || when < p.when) {
msg.next = p; msg.next = p;
mMessages = msg; mMessages = msg;
needWake = mBlocked; // new head, might need to wake up
} else { } else {
Message prev = null; Message prev = null;
while (p != null && p.when <= when) { while (p != null && p.when <= when) {
@ -208,9 +216,12 @@ public class MessageQueue {
} }
msg.next = prev.next; msg.next = prev.next;
prev.next = msg; prev.next = msg;
needWake = false; // still waiting on head, no need to wake up
} }
} }
nativeWake(); if (needWake) {
nativeWake(mPtr);
}
return true; return true;
} }

View File

@ -41,7 +41,7 @@ public:
inline sp<Looper> getLooper() { return mLooper; } inline sp<Looper> getLooper() { return mLooper; }
bool pollOnce(int timeoutMillis); void pollOnce(int timeoutMillis);
void wake(); void wake();
private: private:
@ -61,8 +61,8 @@ NativeMessageQueue::NativeMessageQueue() {
NativeMessageQueue::~NativeMessageQueue() { NativeMessageQueue::~NativeMessageQueue() {
} }
bool NativeMessageQueue::pollOnce(int timeoutMillis) { void NativeMessageQueue::pollOnce(int timeoutMillis) {
return mLooper->pollOnce(timeoutMillis) != ALOOPER_POLL_TIMEOUT; mLooper->pollOnce(timeoutMillis);
} }
void NativeMessageQueue::wake() { void NativeMessageQueue::wake() {
@ -112,24 +112,14 @@ static void throwQueueNotInitialized(JNIEnv* env) {
jniThrowException(env, "java/lang/IllegalStateException", "Message queue not initialized"); jniThrowException(env, "java/lang/IllegalStateException", "Message queue not initialized");
} }
static jboolean android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jint timeoutMillis) { jint ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
android_os_MessageQueue_getNativeMessageQueue(env, obj); nativeMessageQueue->pollOnce(timeoutMillis);
if (! nativeMessageQueue) {
throwQueueNotInitialized(env);
return false;
}
return nativeMessageQueue->pollOnce(timeoutMillis);
} }
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj) { static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
NativeMessageQueue* nativeMessageQueue = NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
android_os_MessageQueue_getNativeMessageQueue(env, obj);
if (! nativeMessageQueue) {
throwQueueNotInitialized(env);
return;
}
return nativeMessageQueue->wake(); return nativeMessageQueue->wake();
} }
@ -139,8 +129,8 @@ static JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */ /* name, signature, funcPtr */
{ "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit }, { "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy }, { "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(I)Z", (void*)android_os_MessageQueue_nativePollOnce }, { "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "()V", (void*)android_os_MessageQueue_nativeWake } { "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake }
}; };
#define FIND_CLASS(var, className) \ #define FIND_CLASS(var, className) \

View File

@ -20,9 +20,22 @@
#include <utils/threads.h> #include <utils/threads.h>
#include <utils/RefBase.h> #include <utils/RefBase.h>
#include <utils/KeyedVector.h> #include <utils/KeyedVector.h>
#include <utils/Timers.h>
#include <android/looper.h> #include <android/looper.h>
// Currently using poll() instead of epoll_wait() since it does a better job of meeting a
// timeout deadline. epoll_wait() typically causes additional delays of up to 10ms
// beyond the requested timeout.
//#define LOOPER_USES_EPOLL
//#define LOOPER_STATISTICS
#ifdef LOOPER_USES_EPOLL
#include <sys/epoll.h>
#else
#include <sys/poll.h>
#endif
/* /*
* Declare a concrete type for the NDK's looper forward declaration. * Declare a concrete type for the NDK's looper forward declaration.
*/ */
@ -190,13 +203,54 @@ private:
const bool mAllowNonCallbacks; // immutable const bool mAllowNonCallbacks; // immutable
int mEpollFd; // immutable
int mWakeReadPipeFd; // immutable int mWakeReadPipeFd; // immutable
int mWakeWritePipeFd; // immutable int mWakeWritePipeFd; // immutable
Mutex mLock;
#ifdef LOOPER_USES_EPOLL
int mEpollFd; // immutable
// Locked list of file descriptor monitoring requests. // Locked list of file descriptor monitoring requests.
Mutex mLock; KeyedVector<int, Request> mRequests; // guarded by mLock
KeyedVector<int, Request> mRequests; #else
// The lock guards state used to track whether there is a poll() in progress and whether
// there are any other threads waiting in wakeAndLock(). The condition variables
// are used to transfer control among these threads such that all waiters are
// serviced before a new poll can begin.
// The wakeAndLock() method increments mWaiters, wakes the poll, blocks on mAwake
// until mPolling becomes false, then decrements mWaiters again.
// The poll() method blocks on mResume until mWaiters becomes 0, then sets
// mPolling to true, blocks until the poll completes, then resets mPolling to false
// and signals mResume if there are waiters.
bool mPolling; // guarded by mLock
uint32_t mWaiters; // guarded by mLock
Condition mAwake; // guarded by mLock
Condition mResume; // guarded by mLock
Vector<struct pollfd> mRequestedFds; // must hold mLock and mPolling must be false to modify
Vector<Request> mRequests; // must hold mLock and mPolling must be false to modify
ssize_t getRequestIndexLocked(int fd);
void wakeAndLock();
#endif
#ifdef LOOPER_STATISTICS
static const int SAMPLED_WAKE_CYCLES_TO_AGGREGATE = 100;
static const int SAMPLED_POLLS_TO_AGGREGATE = 1000;
nsecs_t mPendingWakeTime;
int mPendingWakeCount;
int mSampledWakeCycles;
int mSampledWakeCountSum;
nsecs_t mSampledWakeLatencySum;
int mSampledPolls;
int mSampledZeroPollCount;
int mSampledZeroPollLatencySum;
int mSampledTimeoutPollCount;
int mSampledTimeoutPollLatencySum;
#endif
// This state is only used privately by pollOnce and does not require a lock since // This state is only used privately by pollOnce and does not require a lock since
// it runs on a single thread. // it runs on a single thread.
@ -204,6 +258,8 @@ private:
size_t mResponseIndex; size_t mResponseIndex;
int pollInner(int timeoutMillis); int pollInner(int timeoutMillis);
void awoken();
void pushResponse(int events, const Request& request);
static void initTLSKey(); static void initTLSKey();
static void threadDestructor(void *st); static void threadDestructor(void *st);

View File

@ -19,16 +19,17 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/epoll.h>
namespace android { namespace android {
#ifdef LOOPER_USES_EPOLL
// Hint for number of file descriptors to be associated with the epoll instance. // Hint for number of file descriptors to be associated with the epoll instance.
static const int EPOLL_SIZE_HINT = 8; static const int EPOLL_SIZE_HINT = 8;
// Maximum number of file descriptors for which to retrieve poll events each iteration. // Maximum number of file descriptors for which to retrieve poll events each iteration.
static const int EPOLL_MAX_EVENTS = 16; static const int EPOLL_MAX_EVENTS = 16;
#endif
static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT; static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0; static pthread_key_t gTLSKey = 0;
@ -36,9 +37,6 @@ static pthread_key_t gTLSKey = 0;
Looper::Looper(bool allowNonCallbacks) : Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mAllowNonCallbacks(allowNonCallbacks),
mResponseIndex(0) { mResponseIndex(0) {
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
int wakeFds[2]; int wakeFds[2];
int result = pipe(wakeFds); int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno); LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
@ -54,6 +52,11 @@ Looper::Looper(bool allowNonCallbacks) :
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d", LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno); errno);
#ifdef LOOPER_USES_EPOLL
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem; struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN; eventItem.events = EPOLLIN;
@ -61,12 +64,45 @@ Looper::Looper(bool allowNonCallbacks) :
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d", LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno); errno);
#else
// Add the wake pipe to the head of the request list with a null callback.
struct pollfd requestedFd;
requestedFd.fd = mWakeReadPipeFd;
requestedFd.events = POLLIN;
mRequestedFds.push(requestedFd);
Request request;
request.fd = mWakeReadPipeFd;
request.callback = NULL;
request.ident = 0;
request.data = NULL;
mRequests.push(request);
mPolling = false;
mWaiters = 0;
#endif
#ifdef LOOPER_STATISTICS
mPendingWakeTime = -1;
mPendingWakeCount = 0;
mSampledWakeCycles = 0;
mSampledWakeCountSum = 0;
mSampledWakeLatencySum = 0;
mSampledPolls = 0;
mSampledZeroPollCount = 0;
mSampledZeroPollLatencySum = 0;
mSampledTimeoutPollCount = 0;
mSampledTimeoutPollLatencySum = 0;
#endif
} }
Looper::~Looper() { Looper::~Looper() {
close(mWakeReadPipeFd); close(mWakeReadPipeFd);
close(mWakeWritePipeFd); close(mWakeWritePipeFd);
#ifdef LOOPER_USES_EPOLL
close(mEpollFd); close(mEpollFd);
#endif
} }
void Looper::initTLSKey() { void Looper::initTLSKey() {
@ -157,45 +193,61 @@ int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); LOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif #endif
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
#ifdef LOOPER_STATISTICS
nsecs_t pollStartTime = systemTime(SYSTEM_TIME_MONOTONIC);
#endif
#ifdef LOOPER_USES_EPOLL
struct epoll_event eventItems[EPOLL_MAX_EVENTS]; struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
bool acquiredLock = false;
#else
// Wait for wakeAndLock() waiters to run then set mPolling to true.
mLock.lock();
while (mWaiters != 0) {
mResume.wait(mLock);
}
mPolling = true;
mLock.unlock();
size_t requestedCount = mRequestedFds.size();
int eventCount = poll(mRequestedFds.editArray(), requestedCount, timeoutMillis);
#endif
if (eventCount < 0) { if (eventCount < 0) {
if (errno == EINTR) { if (errno == EINTR) {
return ALOOPER_POLL_WAKE; goto Done;
} }
LOGW("Poll failed with an unexpected error, errno=%d", errno); LOGW("Poll failed with an unexpected error, errno=%d", errno);
return ALOOPER_POLL_ERROR; result = ALOOPER_POLL_ERROR;
goto Done;
} }
if (eventCount == 0) { if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - timeout", this); LOGD("%p ~ pollOnce - timeout", this);
#endif #endif
return ALOOPER_POLL_TIMEOUT; result = ALOOPER_POLL_TIMEOUT;
goto Done;
} }
int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
#if DEBUG_POLL_AND_WAKE #if DEBUG_POLL_AND_WAKE
LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); LOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif #endif
bool acquiredLock = false;
#ifdef LOOPER_USES_EPOLL
for (int i = 0; i < eventCount; i++) { for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd; int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events; uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) { if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) { if (epollEvents & EPOLLIN) {
#if DEBUG_POLL_AND_WAKE awoken();
LOGD("%p ~ pollOnce - awoken", this);
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
} else { } else {
LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents); LOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
} }
@ -212,11 +264,7 @@ int Looper::pollInner(int timeoutMillis) {
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT; if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR; if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP; if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
Response response;
response.events = events;
response.request = mRequests.valueAt(requestIndex);
mResponses.push(response);
} else { } else {
LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " LOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd); "no longer registered.", epollEvents, fd);
@ -226,6 +274,66 @@ int Looper::pollInner(int timeoutMillis) {
if (acquiredLock) { if (acquiredLock) {
mLock.unlock(); mLock.unlock();
} }
Done: ;
#else
for (size_t i = 0; i < requestedCount; i++) {
const struct pollfd& requestedFd = mRequestedFds.itemAt(i);
short pollEvents = requestedFd.revents;
if (pollEvents) {
if (requestedFd.fd == mWakeReadPipeFd) {
if (pollEvents & POLLIN) {
awoken();
} else {
LOGW("Ignoring unexpected poll events 0x%x on wake read pipe.", pollEvents);
}
} else {
int events = 0;
if (pollEvents & POLLIN) events |= ALOOPER_EVENT_INPUT;
if (pollEvents & POLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (pollEvents & POLLERR) events |= ALOOPER_EVENT_ERROR;
if (pollEvents & POLLHUP) events |= ALOOPER_EVENT_HANGUP;
if (pollEvents & POLLNVAL) events |= ALOOPER_EVENT_INVALID;
pushResponse(events, mRequests.itemAt(i));
}
if (--eventCount == 0) {
break;
}
}
}
Done:
// Set mPolling to false and wake up the wakeAndLock() waiters.
mLock.lock();
mPolling = false;
if (mWaiters != 0) {
mAwake.broadcast();
}
mLock.unlock();
#endif
#ifdef LOOPER_STATISTICS
nsecs_t pollEndTime = systemTime(SYSTEM_TIME_MONOTONIC);
mSampledPolls += 1;
if (timeoutMillis == 0) {
mSampledZeroPollCount += 1;
mSampledZeroPollLatencySum += pollEndTime - pollStartTime;
} else if (timeoutMillis > 0 && result == ALOOPER_POLL_TIMEOUT) {
mSampledTimeoutPollCount += 1;
mSampledTimeoutPollLatencySum += pollEndTime - pollStartTime
- milliseconds_to_nanoseconds(timeoutMillis);
}
if (mSampledPolls == SAMPLED_POLLS_TO_AGGREGATE) {
LOGD("%p ~ poll latency statistics: %0.3fms zero timeout, %0.3fms non-zero timeout", this,
0.000001f * float(mSampledZeroPollLatencySum) / mSampledZeroPollCount,
0.000001f * float(mSampledTimeoutPollLatencySum) / mSampledTimeoutPollCount);
mSampledPolls = 0;
mSampledZeroPollCount = 0;
mSampledZeroPollLatencySum = 0;
mSampledTimeoutPollCount = 0;
mSampledTimeoutPollLatencySum = 0;
}
#endif
for (size_t i = 0; i < mResponses.size(); i++) { for (size_t i = 0; i < mResponses.size(); i++) {
const Response& response = mResponses.itemAt(i); const Response& response = mResponses.itemAt(i);
@ -278,6 +386,13 @@ void Looper::wake() {
LOGD("%p ~ wake", this); LOGD("%p ~ wake", this);
#endif #endif
#ifdef LOOPER_STATISTICS
// FIXME: Possible race with awoken() but this code is for testing only and is rarely enabled.
if (mPendingWakeCount++ == 0) {
mPendingWakeTime = systemTime(SYSTEM_TIME_MONOTONIC);
}
#endif
ssize_t nWrite; ssize_t nWrite;
do { do {
nWrite = write(mWakeWritePipeFd, "W", 1); nWrite = write(mWakeWritePipeFd, "W", 1);
@ -290,23 +405,51 @@ void Looper::wake() {
} }
} }
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
LOGD("%p ~ awoken", this);
#endif
#ifdef LOOPER_STATISTICS
if (mPendingWakeCount == 0) {
LOGD("%p ~ awoken: spurious!", this);
} else {
mSampledWakeCycles += 1;
mSampledWakeCountSum += mPendingWakeCount;
mSampledWakeLatencySum += systemTime(SYSTEM_TIME_MONOTONIC) - mPendingWakeTime;
mPendingWakeCount = 0;
mPendingWakeTime = -1;
if (mSampledWakeCycles == SAMPLED_WAKE_CYCLES_TO_AGGREGATE) {
LOGD("%p ~ wake statistics: %0.3fms wake latency, %0.3f wakes per cycle", this,
0.000001f * float(mSampledWakeLatencySum) / mSampledWakeCycles,
float(mSampledWakeCountSum) / mSampledWakeCycles);
mSampledWakeCycles = 0;
mSampledWakeCountSum = 0;
mSampledWakeLatencySum = 0;
}
}
#endif
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) { int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback, void* data) {
#if DEBUG_CALLBACKS #if DEBUG_CALLBACKS
LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, LOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback, data); events, callback, data);
#endif #endif
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
if (events & ALOOPER_EVENT_ERROR) epollEvents |= EPOLLERR;
if (events & ALOOPER_EVENT_HANGUP) epollEvents |= EPOLLHUP;
if (epollEvents == 0) {
LOGE("Invalid attempt to set a callback with no selected poll events.");
return -1;
}
if (! callback) { if (! callback) {
if (! mAllowNonCallbacks) { if (! mAllowNonCallbacks) {
LOGE("Invalid attempt to set NULL callback but not allowed for this looper."); LOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
@ -319,6 +462,11 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback,
} }
} }
#ifdef LOOPER_USES_EPOLL
int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT;
{ // acquire lock { // acquire lock
AutoMutex _l(mLock); AutoMutex _l(mLock);
@ -350,6 +498,33 @@ int Looper::addFd(int fd, int ident, int events, ALooper_callbackFunc callback,
mRequests.replaceValueAt(requestIndex, request); mRequests.replaceValueAt(requestIndex, request);
} }
} // release lock } // release lock
#else
int pollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) pollEvents |= POLLIN;
if (events & ALOOPER_EVENT_OUTPUT) pollEvents |= POLLOUT;
wakeAndLock(); // acquire lock
struct pollfd requestedFd;
requestedFd.fd = fd;
requestedFd.events = pollEvents;
Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data;
ssize_t index = getRequestIndexLocked(fd);
if (index < 0) {
mRequestedFds.push(requestedFd);
mRequests.push(request);
} else {
mRequestedFds.replaceAt(requestedFd, size_t(index));
mRequests.replaceAt(request, size_t(index));
}
mLock.unlock(); // release lock
#endif
return 1; return 1;
} }
@ -358,6 +533,7 @@ int Looper::removeFd(int fd) {
LOGD("%p ~ removeFd - fd=%d", this, fd); LOGD("%p ~ removeFd - fd=%d", this, fd);
#endif #endif
#ifdef LOOPER_USES_EPOLL
{ // acquire lock { // acquire lock
AutoMutex _l(mLock); AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd); ssize_t requestIndex = mRequests.indexOfKey(fd);
@ -372,8 +548,49 @@ int Looper::removeFd(int fd) {
} }
mRequests.removeItemsAt(requestIndex); mRequests.removeItemsAt(requestIndex);
} // request lock } // release lock
return 1; return 1;
#else
wakeAndLock(); // acquire lock
ssize_t index = getRequestIndexLocked(fd);
if (index >= 0) {
mRequestedFds.removeAt(size_t(index));
mRequests.removeAt(size_t(index));
}
mLock.unlock(); // release lock
return index >= 0;
#endif
} }
#ifndef LOOPER_USES_EPOLL
ssize_t Looper::getRequestIndexLocked(int fd) {
size_t requestCount = mRequestedFds.size();
for (size_t i = 0; i < requestCount; i++) {
if (mRequestedFds.itemAt(i).fd == fd) {
return i;
}
}
return -1;
}
void Looper::wakeAndLock() {
mLock.lock();
mWaiters += 1;
while (mPolling) {
wake();
mAwake.wait(mLock);
}
mWaiters -= 1;
if (mWaiters == 0) {
mResume.signal();
}
}
#endif
} // namespace android } // namespace android

View File

@ -354,14 +354,6 @@ TEST_F(LooperTest, AddFd_WhenCallbackAdded_ReturnsOne) {
<< "addFd should return 1 because FD was added"; << "addFd should return 1 because FD was added";
} }
TEST_F(LooperTest, AddFd_WhenEventsIsZero_ReturnsError) {
Pipe pipe;
int result = mLooper->addFd(pipe.receiveFd, 0, 0, NULL, NULL);
EXPECT_EQ(-1, result)
<< "addFd should return -1 because arguments were invalid";
}
TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) { TEST_F(LooperTest, AddFd_WhenIdentIsNegativeAndCallbackIsNull_ReturnsError) {
Pipe pipe; Pipe pipe;
int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL); int result = mLooper->addFd(pipe.receiveFd, -1, ALOOPER_EVENT_INPUT, NULL, NULL);

View File

@ -135,6 +135,15 @@ enum {
* to specify this event flag in the requested event set. * to specify this event flag in the requested event set.
*/ */
ALOOPER_EVENT_HANGUP = 1 << 3, ALOOPER_EVENT_HANGUP = 1 << 3,
/**
* The file descriptor is invalid.
* For example, the file descriptor was closed prematurely.
*
* The looper always sends notifications about invalid file descriptors; it is not necessary
* to specify this event flag in the requested event set.
*/
ALOOPER_EVENT_INVALID = 1 << 4,
}; };
/** /**