From 4c8b951f8d6a121e758bd3905fa8987c77863790 Mon Sep 17 00:00:00 2001 From: Aravind Akella Date: Thu, 5 Sep 2013 17:03:38 -0700 Subject: [PATCH] Sensor Batching Bug fixes. i) SensorService dropping events. Increase SOCKET_BUFFER_SIZE in BitTube ctor. ii) Call flush before every activate. iii) Emulate flush for older devices. Add a trivial flush complete event when flush is called. Bug: 10641596 Change-Id: I30d0f3948e830457143f16e157b6ad81908687ce --- services/sensorservice/SensorService.cpp | 161 +++++++++++++++++++++-- services/sensorservice/SensorService.h | 26 +++- 2 files changed, 173 insertions(+), 14 deletions(-) diff --git a/services/sensorservice/SensorService.cpp b/services/sensorservice/SensorService.cpp index cb99531f9..b26e5722f 100644 --- a/services/sensorservice/SensorService.cpp +++ b/services/sensorservice/SensorService.cpp @@ -148,6 +148,21 @@ void SensorService::onFirstRef() // debugging sensor list mUserSensorListDebug = mSensorList; + mSocketBufferSize = SOCKET_BUFFER_SIZE_NON_BATCHED; + FILE *fp = fopen("/proc/sys/net/core/wmem_max", "r"); + char line[128]; + if (fp != NULL && fgets(line, sizeof(line), fp) != NULL) { + line[sizeof(line) - 1] = '\0'; + sscanf(line, "%u", &mSocketBufferSize); + if (mSocketBufferSize > MAX_SOCKET_BUFFER_SIZE_BATCHED) { + mSocketBufferSize = MAX_SOCKET_BUFFER_SIZE_BATCHED; + } + } + ALOGD("Max socket buffer size %u", mSocketBufferSize); + if (fp) { + fclose(fp); + } + run("SensorService", PRIORITY_URGENT_DISPLAY); mInitCheck = NO_ERROR; } @@ -255,7 +270,6 @@ status_t SensorService::dump(int fd, const Vector& args) SensorFusion::getInstance().dump(result); SensorDevice::getInstance().dump(result); - result.appendFormat("%d active connections\n", mActiveConnections.size()); result.append("Active sensors:\n"); for (size_t i=0 ; i& args) handle, mActiveSensors.valueAt(i)->getNumConnections()); } + + result.appendFormat("%u Max Socket Buffer size\n", mSocketBufferSize); + result.appendFormat("%d active connections\n", mActiveConnections.size()); + + for (size_t i=0 ; i < mActiveConnections.size() ; i++) { + sp connection(mActiveConnections[i].promote()); + if (connection != 0) { + result.appendFormat("Connection Number: %d \n", i); + connection->dump(result); + } + } } write(fd, result.string(), result.size()); return NO_ERROR; @@ -404,7 +429,6 @@ void SensorService::recordLastValue( sensors_event_t const * buffer, size_t count) { Mutex::Autolock _l(mLock); - // record the last event for each sensor int32_t prev = buffer[0].sensor; for (size_t i=1 ; i& connection, status_t err = sensor->batch(connection.get(), handle, reservedFlags, samplingPeriodNs, maxBatchReportLatencyNs); + if (err == NO_ERROR) { + connection->setFirstFlushPending(handle, true); + status_t err_flush = sensor->flush(connection.get(), handle); + // Flush may return error if the sensor is not activated or the underlying h/w sensor does + // not support flush. + if (err_flush != NO_ERROR) { + connection->setFirstFlushPending(handle, false); + } + } if (err == NO_ERROR) { ALOGD_IF(DEBUG_CONNECTIONS, "Calling activate on %d", handle); @@ -649,6 +682,21 @@ status_t SensorService::flushSensor(const sp& connection, if (sensor == NULL) { return BAD_VALUE; } + SensorDevice& dev(SensorDevice::getInstance()); + + if (dev.getHalDeviceVersion() < SENSORS_DEVICE_API_VERSION_1_1) { + // For older devices increment pending flush count, which will send a trivial flush complete + // event for all the connections which are registered for updates on this sensor. + const SortedVector< wp > activeConnections( + getActiveConnections()); + for (size_t i=0 ; i connection(activeConnections[i].promote()); + if (connection != 0) { + connection->incrementPendingFlushCount(handle); + } + } + return NO_ERROR; + } return sensor->flush(connection.get(), handle); } // --------------------------------------------------------------------------- @@ -683,8 +731,15 @@ bool SensorService::SensorRecord::removeConnection( SensorService::SensorEventConnection::SensorEventConnection( const sp& service, uid_t uid) - : mService(service), mChannel(new BitTube()), mUid(uid) + : mService(service), mUid(uid) { + const SensorDevice& device(SensorDevice::getInstance()); + if (device.getHalDeviceVersion() >= SENSORS_DEVICE_API_VERSION_1_1) { + // Increase socket buffer size to 1MB for batching capabilities. + mChannel = new BitTube(service->mSocketBufferSize); + } else { + mChannel = new BitTube(SOCKET_BUFFER_SIZE_NON_BATCHED); + } } SensorService::SensorEventConnection::~SensorEventConnection() @@ -697,10 +752,22 @@ void SensorService::SensorEventConnection::onFirstRef() { } +void SensorService::SensorEventConnection::dump(String8& result) { + Mutex::Autolock _l(mConnectionLock); + for (size_t i = 0; i < mSensorInfo.size(); ++i) { + const FlushInfo& flushInfo = mSensorInfo.valueAt(i); + result.appendFormat("\t %s | status: %s | pending flush events %d\n", + mService->getSensorName(mSensorInfo.keyAt(i)).string(), + flushInfo.mFirstFlushPending ? "First flush pending" : + "active", + flushInfo.mPendingFlushEventsToSend); + } +} + bool SensorService::SensorEventConnection::addSensor(int32_t handle) { Mutex::Autolock _l(mConnectionLock); - if (mSensorInfo.indexOf(handle) < 0) { - mSensorInfo.add(handle); + if (mSensorInfo.indexOfKey(handle) < 0) { + mSensorInfo.add(handle, FlushInfo()); return true; } return false; @@ -708,7 +775,7 @@ bool SensorService::SensorEventConnection::addSensor(int32_t handle) { bool SensorService::SensorEventConnection::removeSensor(int32_t handle) { Mutex::Autolock _l(mConnectionLock); - if (mSensorInfo.remove(handle) >= 0) { + if (mSensorInfo.removeItem(handle) >= 0) { return true; } return false; @@ -716,7 +783,7 @@ bool SensorService::SensorEventConnection::removeSensor(int32_t handle) { bool SensorService::SensorEventConnection::hasSensor(int32_t handle) const { Mutex::Autolock _l(mConnectionLock); - return mSensorInfo.indexOf(handle) >= 0; + return mSensorInfo.indexOfKey(handle) >= 0; } bool SensorService::SensorEventConnection::hasAnySensor() const { @@ -724,12 +791,32 @@ bool SensorService::SensorEventConnection::hasAnySensor() const { return mSensorInfo.size() ? true : false; } +void SensorService::SensorEventConnection::setFirstFlushPending(int32_t handle, + bool value) { + Mutex::Autolock _l(mConnectionLock); + ssize_t index = mSensorInfo.indexOfKey(handle); + if (index >= 0) { + FlushInfo& flushInfo = mSensorInfo.editValueAt(index); + flushInfo.mFirstFlushPending = value; + } +} + +void SensorService::SensorEventConnection::incrementPendingFlushCount(int32_t handle) { + Mutex::Autolock _l(mConnectionLock); + ssize_t index = mSensorInfo.indexOfKey(handle); + if (index >= 0) { + FlushInfo& flushInfo = mSensorInfo.editValueAt(index); + flushInfo.mPendingFlushEventsToSend++; + } +} + status_t SensorService::SensorEventConnection::sendEvents( sensors_event_t const* buffer, size_t numEvents, sensors_event_t* scratch) { // filter out events not for this connection size_t count = 0; + if (scratch) { Mutex::Autolock _l(mConnectionLock); size_t i=0; @@ -742,10 +829,18 @@ status_t SensorService::SensorEventConnection::sendEvents( // filtered correctly. buffer[i].sensor is zero for meta_data events. curr = buffer[i].meta_data.sensor; } - if (mSensorInfo.indexOf(curr) >= 0) { + ssize_t index = mSensorInfo.indexOfKey(curr); + if (index >= 0 && mSensorInfo[index].mFirstFlushPending == true && + buffer[i].type == SENSOR_TYPE_META_DATA) { + // This is the first flush before activate is called. Events can now be sent for + // this sensor on this connection. + ALOGD_IF(DEBUG_CONNECTIONS, "First flush event for sensor==%d ", + buffer[i].meta_data.sensor); + mSensorInfo.editValueAt(index).mFirstFlushPending = false; + } + if (index >= 0 && mSensorInfo[index].mFirstFlushPending == false) { do { - scratch[count] = buffer[i]; - ++count; ++i; + scratch[count++] = buffer[i++]; } while ((i 0) { + flushCompleteEvent.meta_data.sensor = mSensorInfo.keyAt(i); + ssize_t size = SensorEventQueue::write(mChannel, &flushCompleteEvent, 1); + if (size < 0) { + // ALOGW("dropping %d events on the floor", count); + countFlushCompleteEvents(scratch, count); + return size; + } + ALOGD_IF(DEBUG_CONNECTIONS, "sent dropped flush complete event==%d ", + flushCompleteEvent.meta_data.sensor); + flushInfo.mPendingFlushEventsToSend--; + } + } + } + // NOTE: ASensorEvent and sensors_event_t are the same type ssize_t size = SensorEventQueue::write(mChannel, reinterpret_cast(scratch), count); if (size == -EAGAIN) { // the destination doesn't accept events anymore, it's probably // full. For now, we just drop the events on the floor. - //ALOGW("dropping %d events on the floor", count); + // ALOGW("dropping %d events on the floor", count); + countFlushCompleteEvents(scratch, count); return size; } return size < 0 ? status_t(size) : status_t(NO_ERROR); } +void SensorService::SensorEventConnection::countFlushCompleteEvents( + sensors_event_t* scratch, const int numEventsDropped) { + ALOGD_IF(DEBUG_CONNECTIONS, "dropping %d events ", numEventsDropped); + Mutex::Autolock _l(mConnectionLock); + // Count flushComplete events in the events that are about to the dropped. These will be sent + // separately before the next batch of events. + for (int j = 0; j < numEventsDropped; ++j) { + if (scratch[j].type == SENSOR_TYPE_META_DATA) { + FlushInfo& flushInfo = mSensorInfo.editValueFor(scratch[j].meta_data.sensor); + flushInfo.mPendingFlushEventsToSend++; + ALOGD_IF(DEBUG_CONNECTIONS, "increment pendingFlushCount %d", + flushInfo.mPendingFlushEventsToSend); + } + } + return; +} + sp SensorService::SensorEventConnection::getSensorChannel() const { return mChannel; @@ -799,6 +937,7 @@ status_t SensorService::SensorEventConnection::setEventRate( status_t SensorService::SensorEventConnection::flushSensor(int handle) { return mService->flushSensor(this, handle); } + // --------------------------------------------------------------------------- }; // namespace android diff --git a/services/sensorservice/SensorService.h b/services/sensorservice/SensorService.h index 6267dd1a6..2311bff29 100644 --- a/services/sensorservice/SensorService.h +++ b/services/sensorservice/SensorService.h @@ -38,6 +38,9 @@ // --------------------------------------------------------------------------- #define DEBUG_CONNECTIONS false +// Max size is 1 MB which is enough to accept a batch of about 10k events. +#define MAX_SOCKET_BUFFER_SIZE_BATCHED 1024 * 1024 +#define SOCKET_BUFFER_SIZE_NON_BATCHED 4 * 1024 struct sensors_poll_device_t; struct sensors_module_t; @@ -77,14 +80,27 @@ class SensorService : nsecs_t maxBatchReportLatencyNs, int reservedFlags); virtual status_t setEventRate(int handle, nsecs_t samplingPeriodNs); virtual status_t flushSensor(int handle); + // Count the number of flush complete events which are about to be dropped in the buffer. + // Increment mPendingFlushEventsToSend in mSensorInfo. These flush complete events will be + // sent separately before the next batch of events. + void countFlushCompleteEvents(sensors_event_t* scratch, int numEventsDropped); sp const mService; - sp const mChannel; + sp mChannel; uid_t mUid; mutable Mutex mConnectionLock; - // protected by SensorService::mLock - SortedVector mSensorInfo; + struct FlushInfo { + // The number of flush complete events dropped for this sensor is stored here. + // They are sent separately before the next batch of events. + int mPendingFlushEventsToSend; + // Every activate is preceded by a flush. Only after the first flush complete is + // received, the events for the sensor are sent on that *connection*. + bool mFirstFlushPending; + FlushInfo() : mPendingFlushEventsToSend(0), mFirstFlushPending(false) {} + }; + // protected by SensorService::mLock. Key for this vector is the sensor handle. + KeyedVector mSensorInfo; public: SensorEventConnection(const sp& service, uid_t uid); @@ -95,6 +111,9 @@ class SensorService : bool hasAnySensor() const; bool addSensor(int32_t handle); bool removeSensor(int32_t handle); + void setFirstFlushPending(int32_t handle, bool value); + void incrementPendingFlushCount(int32_t handle); + void dump(String8& result); uid_t getUid() const { return mUid; } }; @@ -130,6 +149,7 @@ class SensorService : DefaultKeyedVector mSensorMap; Vector mVirtualSensorList; status_t mInitCheck; + size_t mSocketBufferSize; // protected by mLock mutable Mutex mLock;