Add a simple work queue abstraction.

Makes it easy to schedule a bunch of work to happen in parallel.

Change-Id: Id9c0e52fc8b6d78d2b9ed4c2ee47abce0a01775c
This commit is contained in:
Jeff Brown 2012-03-16 22:18:39 -07:00
parent e6d77c593d
commit 0818b0921e
5 changed files with 295 additions and 0 deletions

View File

@ -73,6 +73,7 @@ extern void androidSetCreateThreadFunc(android_create_thread_fn func);
// Get pid for the current thread.
extern pid_t androidGetTid();
#ifdef HAVE_ANDROID_OS
// Change the scheduling group of a particular thread. The group
// should be one of the ANDROID_TGROUP constants. Returns BAD_VALUE if
// grp is out of range, else another non-zero value with errno set if
@ -95,6 +96,7 @@ extern int androidGetThreadPriority(pid_t tid);
// scheduling groups are disabled. Returns INVALID_OPERATION if unexpected error.
// Thread ID zero means current thread.
extern int androidGetThreadSchedulingGroup(pid_t tid);
#endif
#ifdef __cplusplus
} // extern "C"

119
include/utils/WorkQueue.h Normal file
View File

@ -0,0 +1,119 @@
/*]
* Copyright (C) 2012 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _LIBS_UTILS_WORK_QUEUE_H
#define _LIBS_UTILS_WORK_QUEUE_H
#include <utils/Errors.h>
#include <utils/Vector.h>
#include <utils/threads.h>
namespace android {
/*
* A threaded work queue.
*
* This class is designed to make it easy to run a bunch of isolated work
* units in parallel, using up to the specified number of threads.
* To use it, write a loop to post work units to the work queue, then synchronize
* on the queue at the end.
*/
class WorkQueue {
public:
class WorkUnit {
public:
WorkUnit() { }
virtual ~WorkUnit() { }
/*
* Runs the work unit.
* If the result is 'true' then the work queue continues scheduling work as usual.
* If the result is 'false' then the work queue is canceled.
*/
virtual bool run() = 0;
};
/* Creates a work queue with the specified maximum number of work threads. */
WorkQueue(size_t maxThreads, bool canCallJava = true);
/* Destroys the work queue.
* Cancels pending work and waits for all remaining threads to complete.
*/
~WorkQueue();
/* Posts a work unit to run later.
* If the work queue has been canceled or is already finished, returns INVALID_OPERATION
* and does not take ownership of the work unit (caller must destroy it itself).
* Otherwise, returns OK and takes ownership of the work unit (the work queue will
* destroy it automatically).
*
* For flow control, this method blocks when the size of the pending work queue is more
* 'backlog' times the number of threads. This condition reduces the rate of entry into
* the pending work queue and prevents it from growing much more rapidly than the
* work threads can actually handle.
*
* If 'backlog' is 0, then no throttle is applied.
*/
status_t schedule(WorkUnit* workUnit, size_t backlog = 2);
/* Cancels all pending work.
* If the work queue is already finished, returns INVALID_OPERATION.
* If the work queue is already canceled, returns OK and does nothing else.
* Otherwise, returns OK, discards all pending work units and prevents additional
* work units from being scheduled.
*
* Call finish() after cancel() to wait for all remaining work to complete.
*/
status_t cancel();
/* Waits for all work to complete.
* If the work queue is already finished, returns INVALID_OPERATION.
* Otherwise, waits for all work to complete and returns OK.
*/
status_t finish();
private:
class WorkThread : public Thread {
public:
WorkThread(WorkQueue* workQueue, bool canCallJava);
virtual ~WorkThread();
private:
virtual bool threadLoop();
WorkQueue* const mWorkQueue;
};
status_t cancelLocked();
bool threadLoop(); // called from each work thread
const size_t mMaxThreads;
const bool mCanCallJava;
Mutex mLock;
Condition mWorkChangedCondition;
Condition mWorkDequeuedCondition;
bool mCanceled;
bool mFinished;
size_t mIdleThreads;
Vector<sp<WorkThread> > mWorkThreads;
Vector<WorkUnit*> mWorkUnits;
};
}; // namespace android
#endif // _LIBS_UTILS_WORK_QUEUE_H

View File

@ -41,6 +41,7 @@ commonSources:= \
Tokenizer.cpp \
Unicode.cpp \
VectorImpl.cpp \
WorkQueue.cpp \
misc.cpp
host_commonCflags := -DLIBUTILS_NATIVE=1 $(TOOL_CFLAGS)

View File

@ -323,6 +323,7 @@ pid_t androidGetTid()
#endif
}
#ifdef HAVE_ANDROID_OS
int androidSetThreadSchedulingGroup(pid_t tid, int grp)
{
if (grp > ANDROID_TGROUP_MAX || grp < 0) {
@ -425,6 +426,7 @@ int androidGetThreadSchedulingGroup(pid_t tid)
return ret;
}
#endif
namespace android {

171
libs/utils/WorkQueue.cpp Normal file
View File

@ -0,0 +1,171 @@
/*
* Copyright (C) 2012 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// #define LOG_NDEBUG 0
#define LOG_TAG "WorkQueue"
#include <utils/Log.h>
#include <utils/WorkQueue.h>
namespace android {
// --- WorkQueue ---
WorkQueue::WorkQueue(size_t maxThreads, bool canCallJava) :
mMaxThreads(maxThreads), mCanCallJava(canCallJava),
mCanceled(false), mFinished(false), mIdleThreads(0) {
}
WorkQueue::~WorkQueue() {
if (!cancel()) {
finish();
}
}
status_t WorkQueue::schedule(WorkUnit* workUnit, size_t backlog) {
AutoMutex _l(mLock);
if (mFinished || mCanceled) {
return INVALID_OPERATION;
}
if (mWorkThreads.size() < mMaxThreads
&& mIdleThreads < mWorkUnits.size() + 1) {
sp<WorkThread> workThread = new WorkThread(this, mCanCallJava);
status_t status = workThread->run("WorkQueue::WorkThread");
if (status) {
return status;
}
mWorkThreads.add(workThread);
mIdleThreads += 1;
} else if (backlog) {
while (mWorkUnits.size() >= mMaxThreads * backlog) {
mWorkDequeuedCondition.wait(mLock);
if (mFinished || mCanceled) {
return INVALID_OPERATION;
}
}
}
mWorkUnits.add(workUnit);
mWorkChangedCondition.broadcast();
return OK;
}
status_t WorkQueue::cancel() {
AutoMutex _l(mLock);
return cancelLocked();
}
status_t WorkQueue::cancelLocked() {
if (mFinished) {
return INVALID_OPERATION;
}
if (!mCanceled) {
mCanceled = true;
size_t count = mWorkUnits.size();
for (size_t i = 0; i < count; i++) {
delete mWorkUnits.itemAt(i);
}
mWorkUnits.clear();
mWorkChangedCondition.broadcast();
mWorkDequeuedCondition.broadcast();
}
return OK;
}
status_t WorkQueue::finish() {
{ // acquire lock
AutoMutex _l(mLock);
if (mFinished) {
return INVALID_OPERATION;
}
mFinished = true;
mWorkChangedCondition.broadcast();
} // release lock
// It is not possible for the list of work threads to change once the mFinished
// flag has been set, so we can access mWorkThreads outside of the lock here.
size_t count = mWorkThreads.size();
for (size_t i = 0; i < count; i++) {
mWorkThreads.itemAt(i)->join();
}
mWorkThreads.clear();
return OK;
}
bool WorkQueue::threadLoop() {
WorkUnit* workUnit;
{ // acquire lock
AutoMutex _l(mLock);
for (;;) {
if (mCanceled) {
return false;
}
if (!mWorkUnits.isEmpty()) {
workUnit = mWorkUnits.itemAt(0);
mWorkUnits.removeAt(0);
mIdleThreads -= 1;
mWorkDequeuedCondition.broadcast();
break;
}
if (mFinished) {
return false;
}
mWorkChangedCondition.wait(mLock);
}
} // release lock
bool shouldContinue = workUnit->run();
delete workUnit;
{ // acquire lock
AutoMutex _l(mLock);
mIdleThreads += 1;
if (!shouldContinue) {
cancelLocked();
return false;
}
} // release lock
return true;
}
// --- WorkQueue::WorkThread ---
WorkQueue::WorkThread::WorkThread(WorkQueue* workQueue, bool canCallJava) :
Thread(canCallJava), mWorkQueue(workQueue) {
}
WorkQueue::WorkThread::~WorkThread() {
}
bool WorkQueue::WorkThread::threadLoop() {
return mWorkQueue->threadLoop();
}
}; // namespace android