From 0818b0921ef6cda07f41b56d2ef19b2849dfefd1 Mon Sep 17 00:00:00 2001 From: Jeff Brown Date: Fri, 16 Mar 2012 22:18:39 -0700 Subject: [PATCH] Add a simple work queue abstraction. Makes it easy to schedule a bunch of work to happen in parallel. Change-Id: Id9c0e52fc8b6d78d2b9ed4c2ee47abce0a01775c --- include/utils/AndroidThreads.h | 2 + include/utils/WorkQueue.h | 119 +++++++++++++++++++++++ libs/utils/Android.mk | 1 + libs/utils/Threads.cpp | 2 + libs/utils/WorkQueue.cpp | 171 +++++++++++++++++++++++++++++++++ 5 files changed, 295 insertions(+) create mode 100644 include/utils/WorkQueue.h create mode 100644 libs/utils/WorkQueue.cpp diff --git a/include/utils/AndroidThreads.h b/include/utils/AndroidThreads.h index f9f7aa41a..5bda0fd63 100644 --- a/include/utils/AndroidThreads.h +++ b/include/utils/AndroidThreads.h @@ -73,6 +73,7 @@ extern void androidSetCreateThreadFunc(android_create_thread_fn func); // Get pid for the current thread. extern pid_t androidGetTid(); +#ifdef HAVE_ANDROID_OS // Change the scheduling group of a particular thread. The group // should be one of the ANDROID_TGROUP constants. Returns BAD_VALUE if // grp is out of range, else another non-zero value with errno set if @@ -95,6 +96,7 @@ extern int androidGetThreadPriority(pid_t tid); // scheduling groups are disabled. Returns INVALID_OPERATION if unexpected error. // Thread ID zero means current thread. extern int androidGetThreadSchedulingGroup(pid_t tid); +#endif #ifdef __cplusplus } // extern "C" diff --git a/include/utils/WorkQueue.h b/include/utils/WorkQueue.h new file mode 100644 index 000000000..e3c75b219 --- /dev/null +++ b/include/utils/WorkQueue.h @@ -0,0 +1,119 @@ +/*] + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _LIBS_UTILS_WORK_QUEUE_H +#define _LIBS_UTILS_WORK_QUEUE_H + +#include +#include +#include + +namespace android { + +/* + * A threaded work queue. + * + * This class is designed to make it easy to run a bunch of isolated work + * units in parallel, using up to the specified number of threads. + * To use it, write a loop to post work units to the work queue, then synchronize + * on the queue at the end. + */ +class WorkQueue { +public: + class WorkUnit { + public: + WorkUnit() { } + virtual ~WorkUnit() { } + + /* + * Runs the work unit. + * If the result is 'true' then the work queue continues scheduling work as usual. + * If the result is 'false' then the work queue is canceled. + */ + virtual bool run() = 0; + }; + + /* Creates a work queue with the specified maximum number of work threads. */ + WorkQueue(size_t maxThreads, bool canCallJava = true); + + /* Destroys the work queue. + * Cancels pending work and waits for all remaining threads to complete. + */ + ~WorkQueue(); + + /* Posts a work unit to run later. + * If the work queue has been canceled or is already finished, returns INVALID_OPERATION + * and does not take ownership of the work unit (caller must destroy it itself). + * Otherwise, returns OK and takes ownership of the work unit (the work queue will + * destroy it automatically). + * + * For flow control, this method blocks when the size of the pending work queue is more + * 'backlog' times the number of threads. This condition reduces the rate of entry into + * the pending work queue and prevents it from growing much more rapidly than the + * work threads can actually handle. + * + * If 'backlog' is 0, then no throttle is applied. + */ + status_t schedule(WorkUnit* workUnit, size_t backlog = 2); + + /* Cancels all pending work. + * If the work queue is already finished, returns INVALID_OPERATION. + * If the work queue is already canceled, returns OK and does nothing else. + * Otherwise, returns OK, discards all pending work units and prevents additional + * work units from being scheduled. + * + * Call finish() after cancel() to wait for all remaining work to complete. + */ + status_t cancel(); + + /* Waits for all work to complete. + * If the work queue is already finished, returns INVALID_OPERATION. + * Otherwise, waits for all work to complete and returns OK. + */ + status_t finish(); + +private: + class WorkThread : public Thread { + public: + WorkThread(WorkQueue* workQueue, bool canCallJava); + virtual ~WorkThread(); + + private: + virtual bool threadLoop(); + + WorkQueue* const mWorkQueue; + }; + + status_t cancelLocked(); + bool threadLoop(); // called from each work thread + + const size_t mMaxThreads; + const bool mCanCallJava; + + Mutex mLock; + Condition mWorkChangedCondition; + Condition mWorkDequeuedCondition; + + bool mCanceled; + bool mFinished; + size_t mIdleThreads; + Vector > mWorkThreads; + Vector mWorkUnits; +}; + +}; // namespace android + +#endif // _LIBS_UTILS_WORK_QUEUE_H diff --git a/libs/utils/Android.mk b/libs/utils/Android.mk index 87d0fb22c..b4fc994c0 100644 --- a/libs/utils/Android.mk +++ b/libs/utils/Android.mk @@ -41,6 +41,7 @@ commonSources:= \ Tokenizer.cpp \ Unicode.cpp \ VectorImpl.cpp \ + WorkQueue.cpp \ misc.cpp host_commonCflags := -DLIBUTILS_NATIVE=1 $(TOOL_CFLAGS) diff --git a/libs/utils/Threads.cpp b/libs/utils/Threads.cpp index ab207f565..f9277de73 100644 --- a/libs/utils/Threads.cpp +++ b/libs/utils/Threads.cpp @@ -323,6 +323,7 @@ pid_t androidGetTid() #endif } +#ifdef HAVE_ANDROID_OS int androidSetThreadSchedulingGroup(pid_t tid, int grp) { if (grp > ANDROID_TGROUP_MAX || grp < 0) { @@ -425,6 +426,7 @@ int androidGetThreadSchedulingGroup(pid_t tid) return ret; } +#endif namespace android { diff --git a/libs/utils/WorkQueue.cpp b/libs/utils/WorkQueue.cpp new file mode 100644 index 000000000..3bb99a1be --- /dev/null +++ b/libs/utils/WorkQueue.cpp @@ -0,0 +1,171 @@ +/* + * Copyright (C) 2012 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// #define LOG_NDEBUG 0 +#define LOG_TAG "WorkQueue" + +#include +#include + +namespace android { + +// --- WorkQueue --- + +WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) : + mMaxThreads(maxThreads), mCanCallJava(canCallJava), + mCanceled(false), mFinished(false), mIdleThreads(0) { +} + +WorkQueue::~WorkQueue() { + if (!cancel()) { + finish(); + } +} + +status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) { + AutoMutex _l(mLock); + + if (mFinished || mCanceled) { + return INVALID_OPERATION; + } + + if (mWorkThreads.size() < mMaxThreads + && mIdleThreads < mWorkUnits.size() + 1) { + sp workThread = new WorkThread(this, mCanCallJava); + status_t status = workThread->run("WorkQueue::WorkThread"); + if (status) { + return status; + } + mWorkThreads.add(workThread); + mIdleThreads += 1; + } else if (backlog) { + while (mWorkUnits.size() >= mMaxThreads * backlog) { + mWorkDequeuedCondition.wait(mLock); + if (mFinished || mCanceled) { + return INVALID_OPERATION; + } + } + } + + mWorkUnits.add(workUnit); + mWorkChangedCondition.broadcast(); + return OK; +} + +status_t WorkQueue::cancel() { + AutoMutex _l(mLock); + + return cancelLocked(); +} + +status_t WorkQueue::cancelLocked() { + if (mFinished) { + return INVALID_OPERATION; + } + + if (!mCanceled) { + mCanceled = true; + + size_t count = mWorkUnits.size(); + for (size_t i = 0; i < count; i++) { + delete mWorkUnits.itemAt(i); + } + mWorkUnits.clear(); + mWorkChangedCondition.broadcast(); + mWorkDequeuedCondition.broadcast(); + } + return OK; +} + +status_t WorkQueue::finish() { + { // acquire lock + AutoMutex _l(mLock); + + if (mFinished) { + return INVALID_OPERATION; + } + + mFinished = true; + mWorkChangedCondition.broadcast(); + } // release lock + + // It is not possible for the list of work threads to change once the mFinished + // flag has been set, so we can access mWorkThreads outside of the lock here. + size_t count = mWorkThreads.size(); + for (size_t i = 0; i < count; i++) { + mWorkThreads.itemAt(i)->join(); + } + mWorkThreads.clear(); + return OK; +} + +bool WorkQueue::threadLoop() { + WorkUnit* workUnit; + { // acquire lock + AutoMutex _l(mLock); + + for (;;) { + if (mCanceled) { + return false; + } + + if (!mWorkUnits.isEmpty()) { + workUnit = mWorkUnits.itemAt(0); + mWorkUnits.removeAt(0); + mIdleThreads -= 1; + mWorkDequeuedCondition.broadcast(); + break; + } + + if (mFinished) { + return false; + } + + mWorkChangedCondition.wait(mLock); + } + } // release lock + + bool shouldContinue = workUnit->run(); + delete workUnit; + + { // acquire lock + AutoMutex _l(mLock); + + mIdleThreads += 1; + + if (!shouldContinue) { + cancelLocked(); + return false; + } + } // release lock + + return true; +} + +// --- WorkQueue::WorkThread --- + +WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) : + Thread(canCallJava), mWorkQueue(workQueue) { +} + +WorkQueue::WorkThread::~WorkThread() { +} + +bool WorkQueue::WorkThread::threadLoop() { + return mWorkQueue->threadLoop(); +} + +}; // namespace android