From 8d96cab8bfc1b40a8e05c6f43b485750a5ae0a52 Mon Sep 17 00:00:00 2001 From: Todd Poynor Date: Tue, 25 Jun 2013 19:12:18 -0700 Subject: [PATCH] binder: add polling / single-threaded operation This is currently safe to do only for processes that disallow any binder threads to be created: setThreadPoolMaxThreadCount(0). Change-Id: I8a27f3cf26f4d51edb7f222af487ac256cbcab65 --- include/binder/IPCThreadState.h | 4 + libs/binder/IPCThreadState.cpp | 139 ++++++++++++++++++++------------ 2 files changed, 92 insertions(+), 51 deletions(-) diff --git a/include/binder/IPCThreadState.h b/include/binder/IPCThreadState.h index 3378d9725..ad0daee74 100644 --- a/include/binder/IPCThreadState.h +++ b/include/binder/IPCThreadState.h @@ -51,6 +51,8 @@ public: int64_t clearCallingIdentity(); void restoreCallingIdentity(int64_t token); + int setupPolling(int* fd); + status_t handlePolledCommands(); void flushCommands(); void joinThreadPool(bool isMain = true); @@ -96,7 +98,9 @@ private: uint32_t code, const Parcel& data, status_t* statusBuffer); + status_t getAndExecuteCommand(); status_t executeCommand(int32_t command); + void processPendingDerefs(); void clearCaller(); diff --git a/libs/binder/IPCThreadState.cpp b/libs/binder/IPCThreadState.cpp index 28b74baad..5a38b9559 100644 --- a/libs/binder/IPCThreadState.cpp +++ b/libs/binder/IPCThreadState.cpp @@ -418,6 +418,60 @@ void IPCThreadState::flushCommands() talkWithDriver(false); } +status_t IPCThreadState::getAndExecuteCommand() +{ + status_t result; + int32_t cmd; + + result = talkWithDriver(); + if (result >= NO_ERROR) { + size_t IN = mIn.dataAvail(); + if (IN < sizeof(int32_t)) return result; + cmd = mIn.readInt32(); + IF_LOG_COMMANDS() { + alog << "Processing top-level Command: " + << getReturnString(cmd) << endl; + } + + result = executeCommand(cmd); + + // After executing the command, ensure that the thread is returned to the + // foreground cgroup before rejoining the pool. The driver takes care of + // restoring the priority, but doesn't do anything with cgroups so we + // need to take care of that here in userspace. Note that we do make + // sure to go in the foreground after executing a transaction, but + // there are other callbacks into user code that could have changed + // our group so we want to make absolutely sure it is put back. + set_sched_policy(mMyThreadId, SP_FOREGROUND); + } + + return result; +} + +// When we've cleared the incoming command queue, process any pending derefs +void IPCThreadState::processPendingDerefs() +{ + if (mIn.dataPosition() >= mIn.dataSize()) { + size_t numPending = mPendingWeakDerefs.size(); + if (numPending > 0) { + for (size_t i = 0; i < numPending; i++) { + RefBase::weakref_type* refs = mPendingWeakDerefs[i]; + refs->decWeak(mProcess.get()); + } + mPendingWeakDerefs.clear(); + } + + numPending = mPendingStrongDerefs.size(); + if (numPending > 0) { + for (size_t i = 0; i < numPending; i++) { + BBinder* obj = mPendingStrongDerefs[i]; + obj->decStrong(mProcess.get()); + } + mPendingStrongDerefs.clear(); + } + } +} + void IPCThreadState::joinThreadPool(bool isMain) { LOG_THREADPOOL("**** THREAD %p (PID %d) IS JOINING THE THREAD POOL\n", (void*)pthread_self(), getpid()); @@ -431,57 +485,16 @@ void IPCThreadState::joinThreadPool(bool isMain) status_t result; do { - int32_t cmd; - - // When we've cleared the incoming command queue, process any pending derefs - if (mIn.dataPosition() >= mIn.dataSize()) { - size_t numPending = mPendingWeakDerefs.size(); - if (numPending > 0) { - for (size_t i = 0; i < numPending; i++) { - RefBase::weakref_type* refs = mPendingWeakDerefs[i]; - refs->decWeak(mProcess.get()); - } - mPendingWeakDerefs.clear(); - } - - numPending = mPendingStrongDerefs.size(); - if (numPending > 0) { - for (size_t i = 0; i < numPending; i++) { - BBinder* obj = mPendingStrongDerefs[i]; - obj->decStrong(mProcess.get()); - } - mPendingStrongDerefs.clear(); - } - } - + processPendingDerefs(); // now get the next command to be processed, waiting if necessary - result = talkWithDriver(); - if (result >= NO_ERROR) { - size_t IN = mIn.dataAvail(); - if (IN < sizeof(int32_t)) continue; - cmd = mIn.readInt32(); - IF_LOG_COMMANDS() { - alog << "Processing top-level Command: " - << getReturnString(cmd) << endl; - } + result = getAndExecuteCommand(); - - result = executeCommand(cmd); - } else if (result != TIMED_OUT && result != -ECONNREFUSED && result != -EBADF) { - ALOGE("talkWithDriver(fd=%d) returned unexpected error %d, aborting", + if (result < NO_ERROR && result != TIMED_OUT && result != -ECONNREFUSED && result != -EBADF) { + ALOGE("getAndExecuteCommand(fd=%d) returned unexpected error %d, aborting", mProcess->mDriverFD, result); abort(); } - // After executing the command, ensure that the thread is returned to the - // foreground cgroup before rejoining the pool. The driver takes care of - // restoring the priority, but doesn't do anything with cgroups so we - // need to take care of that here in userspace. Note that we do make - // sure to go in the foreground after executing a transaction, but - // there are other callbacks into user code that could have changed - // our group so we want to make absolutely sure it is put back. - set_sched_policy(mMyThreadId, SP_FOREGROUND); - // Let this thread exit the thread pool if it is no longer // needed and it is not the main process thread. if(result == TIMED_OUT && !isMain) { @@ -496,6 +509,30 @@ void IPCThreadState::joinThreadPool(bool isMain) talkWithDriver(false); } +int IPCThreadState::setupPolling(int* fd) +{ + if (mProcess->mDriverFD <= 0) { + return -EBADF; + } + + mOut.writeInt32(BC_ENTER_LOOPER); + *fd = mProcess->mDriverFD; + return 0; +} + +status_t IPCThreadState::handlePolledCommands() +{ + status_t result; + + do { + result = getAndExecuteCommand(); + } while (mIn.dataPosition() < mIn.dataSize()); + + processPendingDerefs(); + flushCommands(); + return result; +} + void IPCThreadState::stopProcess(bool immediate) { //ALOGI("**** STOPPING PROCESS"); @@ -826,7 +863,7 @@ status_t IPCThreadState::talkWithDriver(bool doReceive) IF_LOG_COMMANDS() { alog << "Our err: " << (void*)err << ", write consumed: " << bwr.write_consumed << " (of " << mOut.dataSize() - << "), read consumed: " << bwr.read_consumed << endl; + << "), read consumed: " << bwr.read_consumed << endl; } if (err >= NO_ERROR) { @@ -1104,16 +1141,16 @@ status_t IPCThreadState::executeCommand(int32_t cmd) void IPCThreadState::threadDestructor(void *st) { - IPCThreadState* const self = static_cast(st); - if (self) { - self->flushCommands(); + IPCThreadState* const self = static_cast(st); + if (self) { + self->flushCommands(); #if defined(HAVE_ANDROID_OS) if (self->mProcess->mDriverFD > 0) { ioctl(self->mProcess->mDriverFD, BINDER_THREAD_EXIT, 0); } #endif - delete self; - } + delete self; + } }